forked from testcontainers/testcontainers-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample_basic.py
More file actions
84 lines (66 loc) · 2.61 KB
/
example_basic.py
File metadata and controls
84 lines (66 loc) · 2.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from datetime import timedelta
import redis
from testcontainers.redis import RedisContainer
def basic_example():
with RedisContainer() as redis_container:
# Get connection parameters
host = redis_container.get_container_host_ip()
port = redis_container.get_exposed_port(redis_container.port)
# Create Redis client
client = redis.Redis(host=host, port=port, decode_responses=True)
print("Connected to Redis")
# String operations
client.set("greeting", "Hello, Redis!")
value = client.get("greeting")
print(f"\nString value: {value}")
# List operations
client.lpush("tasks", "task1", "task2", "task3")
tasks = client.lrange("tasks", 0, -1)
print("\nTasks list:")
for task in tasks:
print(f"- {task}")
# Set operations
client.sadd("tags", "python", "redis", "docker", "testing")
tags = client.smembers("tags")
print("\nTags set:")
for tag in tags:
print(f"- {tag}")
# Hash operations
user_data = {"name": "John Doe", "email": "john@example.com", "age": "30"}
client.hset("user:1", mapping=user_data)
user = client.hgetall("user:1")
print("\nUser hash:")
for field, value in user.items():
print(f"{field}: {value}")
# Sorted set operations
scores = {"player1": 100, "player2": 200, "player3": 150}
client.zadd("leaderboard", scores)
leaderboard = client.zrevrange("leaderboard", 0, -1, withscores=True)
print("\nLeaderboard:")
for player, score in leaderboard:
print(f"{player}: {score}")
# Key expiration
client.setex("temp_key", timedelta(seconds=10), "This will expire")
ttl = client.ttl("temp_key")
print(f"\nTemp key TTL: {ttl} seconds")
# Pipeline operations
with client.pipeline() as pipe:
pipe.set("pipeline_key1", "value1")
pipe.set("pipeline_key2", "value2")
pipe.set("pipeline_key3", "value3")
pipe.execute()
print("\nPipeline operations completed")
# Pub/Sub operations
pubsub = client.pubsub()
pubsub.subscribe("test_channel")
# Publish a message
client.publish("test_channel", "Hello from Redis!")
# Get the message
message = pubsub.get_message()
if message and message["type"] == "message":
print(f"\nReceived message: {message['data']}")
# Clean up
pubsub.unsubscribe()
pubsub.close()
if __name__ == "__main__":
basic_example()