Skip to content

Commit c5892ee

Browse files
committed
BAEL-6552: small changes
1 parent 1ac6667 commit c5892ee

1 file changed

Lines changed: 28 additions & 17 deletions

File tree

apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/VariableFetchSizeKafkaListenerLiveTest.java

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.List;
44
import java.util.Properties;
55
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.ExecutionException;
67
import java.util.stream.Collectors;
78
import java.util.stream.IntStream;
89

@@ -28,60 +29,66 @@ class VariableFetchSizeKafkaListenerLiveTest {
2829
@Test
2930
void whenUsingDefaultConfiguration_thenProcessInBatchesOf() throws Exception {
3031
String topic = "engine.sensors.temperature";
31-
publishSensorData(300, topic);
32+
publishTestData(300, topic);
3233

33-
Properties props = commonConsumerProperties();
34+
Properties props = new Properties();
35+
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
36+
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
37+
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
38+
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
3439
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
3540
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
3641

3742
CompletableFuture.runAsync(
3843
new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
3944
);
4045

41-
Thread.sleep(10_000L);
46+
Thread.sleep(5_000L);
4247
}
4348

4449
@Test
4550
void whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling() throws Exception {
46-
publishSensorData(300, "engine.sensors.temperature");
51+
String topic = "engine.sensors.temperature";
52+
publishTestData(300, topic);
53+
Thread.sleep(1_000L);
4754

4855
// max.partition.fetch.bytes = 500 Bytes
4956
Properties fetchSize_500B = commonConsumerProperties();
5057
fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
51-
fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 500 + "");
58+
fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500");
5259
CompletableFuture.runAsync(
53-
new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(fetchSize_500B))
60+
new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_500B))
5461
);
5562

5663
// max.partition.fetch.bytes = 5.000 Bytes
5764
Properties fetchSize_5KB = commonConsumerProperties();
5865
fetchSize_5KB.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_5KB");
59-
fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5_000 + "");
66+
fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5000");
6067
CompletableFuture.runAsync(
61-
new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(fetchSize_5KB))
68+
new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_5KB))
6269
);
6370

6471
Thread.sleep(10_000L);
6572
}
6673

6774
@Test
6875
void whenChangingMinFetchBytesProperty_thenAdjustWaitTimeWhilePolling() throws Exception {
69-
publishSensorData(300, "engine.sensors.temperature", 100L);
76+
String topic = "engine.sensors.temperature";
77+
publishTestData(300, topic, 100L);
7078

7179
// fetch.min.bytes = 1 byte (default)
7280
Properties minFetchSize_1B = commonConsumerProperties();
7381
minFetchSize_1B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "min_fetch_size_1B");
74-
minFetchSize_1B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1 + "");
7582
CompletableFuture.runAsync(
76-
new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(minFetchSize_1B))
83+
new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_1B))
7784
);
7885

7986
// fetch.min.bytes = 500 bytes
8087
Properties minFetchSize_500B = commonConsumerProperties();
8188
minFetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mim_fetch_size_500B");
82-
minFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 500 + "");
89+
minFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500");
8390
CompletableFuture.runAsync(
84-
new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(minFetchSize_500B))
91+
new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_500B))
8592
);
8693

8794
Thread.sleep(10_000L);
@@ -97,25 +104,29 @@ private static Properties commonConsumerProperties() {
97104
return props;
98105
}
99106

100-
private void publishSensorData(int measurementsCount, String topic) {
101-
publishSensorData(measurementsCount, topic, 0L);
107+
private void publishTestData(int measurementsCount, String topic) {
108+
publishTestData(measurementsCount, topic, 0L);
102109
}
103110

104-
private void publishSensorData(int measurementsCount, String topic, long delayInMillis) {
111+
private void publishTestData(int measurementsCount, String topic, long delayInMillis) {
105112
List<ProducerRecord<String, String>> records = IntStream.range(0, measurementsCount)
106113
.mapToObj(__ -> new ProducerRecord<>(topic, "key1", "temperature=255F"))
107114
.collect(Collectors.toList());
108115

109116
CompletableFuture.runAsync(() -> {
110117
try (KafkaProducer<String, String> producer = testKafkaProducer()) {
111118
for (ProducerRecord<String, String> rec : records) {
112-
producer.send(rec);
119+
producer.send(rec).get();
113120
sleep(delayInMillis);
114121
}
122+
} catch (ExecutionException | InterruptedException e) {
123+
throw new RuntimeException(e);
115124
}
116125
});
117126
}
118127

128+
129+
119130
private static KafkaProducer<String, String> testKafkaProducer() {
120131
Properties props = new Properties();
121132
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());

0 commit comments

Comments
 (0)