76 lines
2.5 KiB
Python
76 lines
2.5 KiB
Python
"""Integration: the cleanup primitives the start-flow trap depends on
|
|
are idempotent. The original orphan-network bug was a trap-ordering
|
|
issue; the fix moved the install earlier. The trap is only safe if
|
|
network_remove and pipelock_stop are no-ops against missing resources."""
|
|
|
|
import os
|
|
import subprocess
|
|
import unittest
|
|
|
|
from claude_bottle.network import (
|
|
network_create_egress,
|
|
network_create_internal,
|
|
network_remove,
|
|
)
|
|
from claude_bottle.pipelock import pipelock_stop
|
|
from tests._docker import skip_unless_docker
|
|
|
|
|
|
@skip_unless_docker()
|
|
class TestOrphanCleanup(unittest.TestCase):
|
|
def setUp(self):
|
|
self.slug = f"cb-test-orphan-{os.getpid()}"
|
|
self.internal_name = ""
|
|
self.egress_name = ""
|
|
|
|
def tearDown(self):
|
|
for n in (self.internal_name, self.egress_name):
|
|
if n:
|
|
subprocess.run(
|
|
["docker", "network", "rm", n],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
def test_remove_missing_is_noop(self):
|
|
# Returning True == idempotent success.
|
|
self.assertTrue(network_remove(f"claude-bottle-net-{self.slug}-does-not-exist"))
|
|
|
|
@unittest.skipIf(
|
|
os.environ.get("GITEA_ACTIONS") == "true",
|
|
"skipped under act_runner: docker socket mount topology breaks "
|
|
"in-process visibility of networks created on the host daemon",
|
|
)
|
|
def test_create_and_remove(self):
|
|
self.internal_name = network_create_internal(self.slug)
|
|
self.egress_name = network_create_egress(self.slug)
|
|
|
|
nets = subprocess.run(
|
|
["docker", "network", "ls", "--format", "{{.Name}}"],
|
|
capture_output=True, text=True,
|
|
).stdout.splitlines()
|
|
self.assertIn(self.internal_name, nets)
|
|
self.assertIn(self.egress_name, nets)
|
|
|
|
self.assertTrue(network_remove(self.internal_name))
|
|
self.assertTrue(network_remove(self.egress_name))
|
|
|
|
nets_after = subprocess.run(
|
|
["docker", "network", "ls", "--format", "{{.Name}}"],
|
|
capture_output=True, text=True,
|
|
).stdout.splitlines()
|
|
self.assertNotIn(self.internal_name, nets_after)
|
|
self.assertNotIn(self.egress_name, nets_after)
|
|
|
|
# Idempotent on already-removed.
|
|
self.assertTrue(network_remove(self.internal_name))
|
|
self.assertTrue(network_remove(self.egress_name))
|
|
|
|
def test_pipelock_stop_missing_sidecar(self):
|
|
# Should not raise.
|
|
pipelock_stop(f"missing-{self.slug}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|