forked from testcontainers/testcontainers-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample_basic.py
More file actions
117 lines (94 loc) · 3.69 KB
/
example_basic.py
File metadata and controls
117 lines (94 loc) · 3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import json
from datetime import datetime
import boto3
from testcontainers.aws import AwsContainer
def basic_example():
with AwsContainer() as aws:
# Get connection parameters
host = aws.get_container_host_ip()
port = aws.get_exposed_port(aws.port)
access_key = aws.access_key
secret_key = aws.secret_key
region = aws.region
# Initialize AWS clients
s3 = boto3.client(
"s3",
endpoint_url=f"http://{host}:{port}",
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
dynamodb = boto3.resource(
"dynamodb",
endpoint_url=f"http://{host}:{port}",
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
sqs = boto3.client(
"sqs",
endpoint_url=f"http://{host}:{port}",
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
print("Connected to AWS services")
# Test S3
bucket_name = f"test-bucket-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
s3.create_bucket(Bucket=bucket_name)
print(f"\nCreated S3 bucket: {bucket_name}")
# Upload a file
s3.put_object(Bucket=bucket_name, Key="test.txt", Body="Hello, S3!")
print("Uploaded test file")
# List objects
objects = s3.list_objects(Bucket=bucket_name)
print("\nObjects in bucket:")
for obj in objects.get("Contents", []):
print(f"- {obj['Key']}")
# Test DynamoDB
table_name = "test_table"
table = dynamodb.create_table(
TableName=table_name,
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
print(f"\nCreated DynamoDB table: {table_name}")
# Wait for table to be created
table.meta.client.get_waiter("table_exists").wait(TableName=table_name)
# Insert items
table.put_item(Item={"id": "1", "name": "Test Item", "value": 42, "timestamp": datetime.utcnow().isoformat()})
print("Inserted test item")
# Query items
response = table.scan()
print("\nDynamoDB items:")
for item in response["Items"]:
print(json.dumps(item, indent=2))
# Test SQS
queue_name = "test-queue"
queue = sqs.create_queue(QueueName=queue_name)
queue_url = queue["QueueUrl"]
print(f"\nCreated SQS queue: {queue_name}")
# Send message
response = sqs.send_message(QueueUrl=queue_url, MessageBody="Hello, SQS!")
print(f"Sent message: {response['MessageId']}")
# Receive message
messages = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=1)
print("\nReceived messages:")
for message in messages.get("Messages", []):
print(json.dumps(message, indent=2))
# Clean up
# Delete S3 bucket and its contents
objects = s3.list_objects(Bucket=bucket_name)
for obj in objects.get("Contents", []):
s3.delete_object(Bucket=bucket_name, Key=obj["Key"])
s3.delete_bucket(Bucket=bucket_name)
print("\nDeleted S3 bucket")
# Delete DynamoDB table
table.delete()
print("Deleted DynamoDB table")
# Delete SQS queue
sqs.delete_queue(QueueUrl=queue_url)
print("Deleted SQS queue")
if __name__ == "__main__":
basic_example()