33import java .util .List ;
44import java .util .Properties ;
55import java .util .concurrent .CompletableFuture ;
6+ import java .util .concurrent .ExecutionException ;
67import java .util .stream .Collectors ;
78import java .util .stream .IntStream ;
89
@@ -28,60 +29,66 @@ class VariableFetchSizeKafkaListenerLiveTest {
2829 @ Test
2930 void whenUsingDefaultConfiguration_thenProcessInBatchesOf () throws Exception {
3031 String topic = "engine.sensors.temperature" ;
31- publishSensorData (300 , topic );
32+ publishTestData (300 , topic );
3233
33- Properties props = commonConsumerProperties ();
34+ Properties props = new Properties ();
35+ props .setProperty (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , KAFKA_CONTAINER .getBootstrapServers ());
36+ props .setProperty (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
37+ props .setProperty (ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class .getName ());
38+ props .setProperty (ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class .getName ());
3439 props .setProperty (ConsumerConfig .GROUP_ID_CONFIG , "default_config" );
3540 KafkaConsumer <String , String > kafkaConsumer = new KafkaConsumer <>(props );
3641
3742 CompletableFuture .runAsync (
3843 new VariableFetchSizeKafkaListener (topic , kafkaConsumer )
3944 );
4045
41- Thread .sleep (10_000L );
46+ Thread .sleep (5_000L );
4247 }
4348
4449 @ Test
4550 void whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling () throws Exception {
46- publishSensorData (300 , "engine.sensors.temperature" );
51+ String topic = "engine.sensors.temperature" ;
52+ publishTestData (300 , topic );
53+ Thread .sleep (1_000L );
4754
4855 // max.partition.fetch.bytes = 500 Bytes
4956 Properties fetchSize_500B = commonConsumerProperties ();
5057 fetchSize_500B .setProperty (ConsumerConfig .GROUP_ID_CONFIG , "max_fetch_size_500B" );
51- fetchSize_500B .setProperty (ConsumerConfig .MAX_PARTITION_FETCH_BYTES_CONFIG , 500 + " " );
58+ fetchSize_500B .setProperty (ConsumerConfig .MAX_PARTITION_FETCH_BYTES_CONFIG , "500 " );
5259 CompletableFuture .runAsync (
53- new VariableFetchSizeKafkaListener ("engine.sensors.temperature" , new KafkaConsumer <>(fetchSize_500B ))
60+ new VariableFetchSizeKafkaListener (topic , new KafkaConsumer <>(fetchSize_500B ))
5461 );
5562
5663 // max.partition.fetch.bytes = 5.000 Bytes
5764 Properties fetchSize_5KB = commonConsumerProperties ();
5865 fetchSize_5KB .setProperty (ConsumerConfig .GROUP_ID_CONFIG , "max_fetch_size_5KB" );
59- fetchSize_5KB .setProperty (ConsumerConfig .MAX_PARTITION_FETCH_BYTES_CONFIG , 5_000 + " " );
66+ fetchSize_5KB .setProperty (ConsumerConfig .MAX_PARTITION_FETCH_BYTES_CONFIG , "5000 " );
6067 CompletableFuture .runAsync (
61- new VariableFetchSizeKafkaListener ("engine.sensors.temperature" , new KafkaConsumer <>(fetchSize_5KB ))
68+ new VariableFetchSizeKafkaListener (topic , new KafkaConsumer <>(fetchSize_5KB ))
6269 );
6370
6471 Thread .sleep (10_000L );
6572 }
6673
6774 @ Test
6875 void whenChangingMinFetchBytesProperty_thenAdjustWaitTimeWhilePolling () throws Exception {
69- publishSensorData (300 , "engine.sensors.temperature" , 100L );
76+ String topic = "engine.sensors.temperature" ;
77+ publishTestData (300 , topic , 100L );
7078
7179 // fetch.min.bytes = 1 byte (default)
7280 Properties minFetchSize_1B = commonConsumerProperties ();
7381 minFetchSize_1B .setProperty (ConsumerConfig .GROUP_ID_CONFIG , "min_fetch_size_1B" );
74- minFetchSize_1B .setProperty (ConsumerConfig .FETCH_MIN_BYTES_CONFIG , 1 + "" );
7582 CompletableFuture .runAsync (
76- new VariableFetchSizeKafkaListener ("engine.sensors.temperature" , new KafkaConsumer <>(minFetchSize_1B ))
83+ new VariableFetchSizeKafkaListener (topic , new KafkaConsumer <>(minFetchSize_1B ))
7784 );
7885
7986 // fetch.min.bytes = 500 bytes
8087 Properties minFetchSize_500B = commonConsumerProperties ();
8188 minFetchSize_500B .setProperty (ConsumerConfig .GROUP_ID_CONFIG , "mim_fetch_size_500B" );
82- minFetchSize_500B .setProperty (ConsumerConfig .FETCH_MIN_BYTES_CONFIG , 500 + " " );
89+ minFetchSize_500B .setProperty (ConsumerConfig .FETCH_MIN_BYTES_CONFIG , "500 " );
8390 CompletableFuture .runAsync (
84- new VariableFetchSizeKafkaListener ("engine.sensors.temperature" , new KafkaConsumer <>(minFetchSize_500B ))
91+ new VariableFetchSizeKafkaListener (topic , new KafkaConsumer <>(minFetchSize_500B ))
8592 );
8693
8794 Thread .sleep (10_000L );
@@ -97,25 +104,29 @@ private static Properties commonConsumerProperties() {
97104 return props ;
98105 }
99106
100- private void publishSensorData (int measurementsCount , String topic ) {
101- publishSensorData (measurementsCount , topic , 0L );
107+ private void publishTestData (int measurementsCount , String topic ) {
108+ publishTestData (measurementsCount , topic , 0L );
102109 }
103110
104- private void publishSensorData (int measurementsCount , String topic , long delayInMillis ) {
111+ private void publishTestData (int measurementsCount , String topic , long delayInMillis ) {
105112 List <ProducerRecord <String , String >> records = IntStream .range (0 , measurementsCount )
106113 .mapToObj (__ -> new ProducerRecord <>(topic , "key1" , "temperature=255F" ))
107114 .collect (Collectors .toList ());
108115
109116 CompletableFuture .runAsync (() -> {
110117 try (KafkaProducer <String , String > producer = testKafkaProducer ()) {
111118 for (ProducerRecord <String , String > rec : records ) {
112- producer .send (rec );
119+ producer .send (rec ). get () ;
113120 sleep (delayInMillis );
114121 }
122+ } catch (ExecutionException | InterruptedException e ) {
123+ throw new RuntimeException (e );
115124 }
116125 });
117126 }
118127
128+
129+
119130 private static KafkaProducer <String , String > testKafkaProducer () {
120131 Properties props = new Properties ();
121132 props .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , KAFKA_CONTAINER .getBootstrapServers ());
0 commit comments