forked from testcontainers/testcontainers-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_redis.py
More file actions
28 lines (23 loc) · 763 Bytes
/
test_redis.py
File metadata and controls
28 lines (23 loc) · 763 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
from testcontainers.redis import RedisContainer
def test_docker_run_redis():
config = RedisContainer()
with config as redis:
client = redis.get_client()
p = client.pubsub()
p.subscribe('test')
client.publish('test', 'new_msg')
msg = wait_for_message(p)
assert 'data' in msg
assert b'new_msg', msg['data']
def wait_for_message(pubsub, timeout=1, ignore_subscribe_messages=True):
now = time.time()
timeout = now + timeout
while now < timeout:
message = pubsub.get_message(
ignore_subscribe_messages=ignore_subscribe_messages)
if message is not None:
return message
time.sleep(0.01)
now = time.time()
return None