forked from testcontainers/testcontainers-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_kafka.py
More file actions
30 lines (22 loc) · 1.01 KB
/
test_kafka.py
File metadata and controls
30 lines (22 loc) · 1.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from testcontainers.kafka import KafkaContainer
def test_kafka_producer_consumer():
with KafkaContainer() as container:
produce_and_consume_kafka_message(container)
def test_kafka_producer_consumer_custom_port():
with KafkaContainer(port_to_expose=9888) as container:
assert container.port_to_expose == 9888
produce_and_consume_kafka_message(container)
def produce_and_consume_kafka_message(container):
topic = 'test-topic'
bootstrap_server = container.get_bootstrap_server()
producer = KafkaProducer(bootstrap_servers=[bootstrap_server])
producer.send(topic, b"verification message")
producer.flush()
producer.close()
consumer = KafkaConsumer(bootstrap_servers=[bootstrap_server])
tp = TopicPartition(topic, 0)
consumer.assign([tp])
consumer.seek_to_beginning()
assert consumer.end_offsets([tp])[tp] == 1, \
"Expected exactly one test message to be present on test topic !"