forked from testcontainers/testcontainers-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_ryuk.py
More file actions
63 lines (45 loc) · 2.11 KB
/
test_ryuk.py
File metadata and controls
63 lines (45 loc) · 2.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from time import sleep
import pytest
from pytest import MonkeyPatch
from docker import DockerClient
from docker.errors import NotFound
from testcontainers.core.config import testcontainers_config
from testcontainers.core.container import Reaper
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
@pytest.mark.inside_docker_check
def test_wait_for_reaper(monkeypatch: MonkeyPatch):
Reaper.delete_instance()
monkeypatch.setattr(testcontainers_config, "ryuk_reconnection_timeout", "0.1s")
container = DockerContainer("hello-world")
container.start()
docker_client = container.get_docker_client().client
container_id = container.get_wrapped_container().short_id
reaper_id = Reaper._container.get_wrapped_container().short_id
assert docker_client.containers.get(container_id) is not None
assert docker_client.containers.get(reaper_id) is not None
wait_for_logs(container, "Hello from Docker!")
Reaper._socket.close()
sleep(0.6) # Sleep until Ryuk reaps all dangling containers. 0.5 extra seconds for good measure.
with pytest.raises(NotFound):
docker_client.containers.get(container_id)
with pytest.raises(NotFound):
docker_client.containers.get(reaper_id)
# Cleanup Ryuk class fields after manual Ryuk shutdown
Reaper.delete_instance()
@pytest.mark.inside_docker_check
def test_container_without_ryuk(monkeypatch: MonkeyPatch):
Reaper.delete_instance()
monkeypatch.setattr(testcontainers_config, "ryuk_disabled", True)
with DockerContainer("hello-world") as container:
wait_for_logs(container, "Hello from Docker!")
assert Reaper._instance is None
@pytest.mark.inside_docker_check
def test_ryuk_is_reused_in_same_process():
with DockerContainer("hello-world") as container:
wait_for_logs(container, "Hello from Docker!")
reaper_instance = Reaper._instance
assert reaper_instance is not None
with DockerContainer("hello-world") as container:
wait_for_logs(container, "Hello from Docker!")
assert reaper_instance is Reaper._instance