2525import pytest
2626
2727
28- BOOTSTRAP_SERVER = ' localhost:9092'
29- TOPIC_NAME = f' topic-{ uuid .uuid4 ()} '
30- CONTAINER_IMAGE_NAME = ' kafka-pipeline:1'
28+ BOOTSTRAP_SERVER = " localhost:9092"
29+ TOPIC_NAME = f" topic-{ uuid .uuid4 ()} "
30+ CONTAINER_IMAGE_NAME = " kafka-pipeline:1"
3131
3232
33- @pytest .fixture (scope = ' module' , autouse = True )
33+ @pytest .fixture (scope = " module" , autouse = True )
3434def kafka_container () -> None :
3535 # Start a containerized Kafka server.
3636 docker_client = docker .from_env ()
37- container = docker_client .containers .run ('apache/kafka:3.7.0' , network_mode = 'host' , detach = True )
37+ container = docker_client .containers .run (
38+ "apache/kafka:3.7.0" , network_mode = "host" , detach = True
39+ )
3840 try :
3941 create_topic ()
4042 yield
@@ -48,41 +50,43 @@ def create_topic() -> None:
4850 try :
4951 client = KafkaAdminClient (bootstrap_servers = BOOTSTRAP_SERVER )
5052 topics = []
51- topics .append (NewTopic (name = TOPIC_NAME , num_partitions = 1 , replication_factor = 1 ))
53+ topics .append (
54+ NewTopic (name = TOPIC_NAME , num_partitions = 1 , replication_factor = 1 )
55+ )
5256 client .create_topics (topics )
5357 break
5458 except NoBrokersAvailable :
5559 time .sleep (5 )
5660
5761
5862def test_read_from_kafka (tmp_path : Path ) -> None :
59-
60- file_name_prefix = f'output-{ uuid .uuid4 ()} '
61- file_name = f'{ tmp_path } /{ file_name_prefix } -00000-of-00001.txt'
63+ file_name_prefix = f"output-{ uuid .uuid4 ()} "
64+ file_name = f"{ tmp_path } /{ file_name_prefix } -00000-of-00001.txt"
6265
6366 # Send some messages to Kafka
6467 producer = KafkaProducer (bootstrap_servers = BOOTSTRAP_SERVER )
6568 for i in range (0 , 5 ):
66- message = f' event-{ i } '
69+ message = f" event-{ i } "
6770 producer .send (TOPIC_NAME , message .encode ())
6871
6972 # Build a container image for the pipeline.
7073 client = docker .from_env ()
71- client .images .build (path = './' , tag = CONTAINER_IMAGE_NAME )
74+ client .images .build (path = "./" , tag = CONTAINER_IMAGE_NAME )
7275
7376 # Run the pipeline.
7477 client .containers .run (
7578 image = CONTAINER_IMAGE_NAME ,
76- command = f'/pipeline/read_kafka.py --output /out/{ file_name_prefix } --bootstrap_server { BOOTSTRAP_SERVER } --topic { TOPIC_NAME } ' ,
77- volumes = ['/var/run/docker.sock:/var/run/docker.sock' , f'{ tmp_path } /:/out' ],
78- network_mode = 'host' ,
79- entrypoint = 'python' )
79+ command = f"/pipeline/read_kafka.py --output /out/{ file_name_prefix } --bootstrap_server { BOOTSTRAP_SERVER } --topic { TOPIC_NAME } " ,
80+ volumes = ["/var/run/docker.sock:/var/run/docker.sock" , f"{ tmp_path } /:/out" ],
81+ network_mode = "host" ,
82+ entrypoint = "python" ,
83+ )
8084
8185 # Verify the pipeline wrote the Kafka messages to the output file.
82- with open (file_name , 'r' ) as f :
86+ with open (file_name , "r" ) as f :
8387 text = f .read ()
8488 for i in range (0 , 5 ):
85- assert f' event-{ i } ' in text
89+ assert f" event-{ i } " in text
8690
8791
8892if __name__ == "__main__" :
0 commit comments