Skip to content

Commit 0203ff1

Browse files
committed
BAEL-7258: increase pom versions
1 parent 40ef28a commit 0203ff1

3 files changed

Lines changed: 38 additions & 34 deletions

File tree

apache-kafka-2/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@
6161

6262
<properties>
6363
<jna.version>5.7.0</jna.version>
64-
<kafka.version>2.8.0</kafka.version>
64+
<kafka.version>3.6.1</kafka.version>
6565
<testcontainers-kafka.version>1.19.3</testcontainers-kafka.version>
66-
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
66+
<testcontainers-jupiter.version>1.19.3</testcontainers-jupiter.version>
6767
<jackson.databind.version>2.15.2</jackson.databind.version>
6868
</properties>
6969

apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/CustomKafkaListener.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,10 @@
1212
import org.apache.kafka.clients.consumer.KafkaConsumer;
1313
import org.apache.kafka.common.serialization.StringDeserializer;
1414

15-
public class CustomKafkaListener implements Runnable, AutoCloseable {
16-
15+
public class CustomKafkaListener implements Runnable {
1716
private static final Logger log = Logger.getLogger(CustomKafkaListener.class.getName());
18-
1917
private final String topic;
2018
private final KafkaConsumer<String, String> consumer;
21-
22-
private final AtomicBoolean running = new AtomicBoolean(false);
2319
private Consumer<String> recordConsumer;
2420

2521

@@ -50,18 +46,11 @@ public CustomKafkaListener onEach(Consumer<String> newConsumer) {
5046

5147
@Override
5248
public void run() {
53-
running.set(true);
5449
consumer.subscribe(Arrays.asList(topic));
55-
while (running.get()) {
50+
while (true) {
5651
consumer.poll(Duration.ofMillis(100))
5752
.forEach(record -> recordConsumer.accept(record.value()));
5853
}
5954
}
6055

61-
@Override
62-
public void close() {
63-
running.set(false);
64-
consumer.close();
65-
}
66-
6756
}

apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/CustomKafkaListenerLiveTest.java

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,32 @@
44
import static java.time.Duration.ofSeconds;
55
import static java.util.Arrays.asList;
66
import static org.assertj.core.api.Assertions.assertThat;
7+
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
import static org.junit.jupiter.api.Assertions.assertNotNull;
79
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
810

9-
import java.util.ArrayList;
10-
import java.util.Arrays;
11-
import java.util.List;
12-
import java.util.Properties;
11+
import java.time.Duration;
12+
import java.util.*;
1313
import java.util.concurrent.CompletableFuture;
14+
import java.util.concurrent.ExecutionException;
15+
import java.util.concurrent.Future;
1416

17+
import org.apache.kafka.clients.admin.Admin;
18+
import org.apache.kafka.clients.admin.AdminClientConfig;
19+
import org.apache.kafka.clients.admin.NewTopic;
20+
import org.apache.kafka.clients.consumer.ConsumerConfig;
21+
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.apache.kafka.clients.consumer.ConsumerRecords;
23+
import org.apache.kafka.clients.consumer.KafkaConsumer;
1524
import org.apache.kafka.clients.producer.KafkaProducer;
1625
import org.apache.kafka.clients.producer.ProducerConfig;
1726
import org.apache.kafka.clients.producer.ProducerRecord;
27+
import org.apache.kafka.clients.producer.RecordMetadata;
28+
import org.apache.kafka.common.header.Header;
29+
import org.apache.kafka.common.header.Headers;
30+
import org.apache.kafka.common.serialization.StringDeserializer;
1831
import org.apache.kafka.common.serialization.StringSerializer;
32+
import org.junit.jupiter.api.BeforeAll;
1933
import org.junit.jupiter.api.Test;
2034
import org.testcontainers.containers.KafkaContainer;
2135
import org.testcontainers.junit.jupiter.Container;
@@ -30,7 +44,7 @@ class CustomKafkaListenerLiveTest {
3044
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
3145

3246
static {
33-
Awaitility.setDefaultTimeout(ofSeconds(1L));
47+
Awaitility.setDefaultTimeout(ofSeconds(5L));
3448
Awaitility.setDefaultPollInterval(ofMillis(50L));
3549
}
3650

@@ -42,33 +56,34 @@ void givenANewCustomKafkaListener_thenConsumesAllMessages() {
4256
List<String> consumedMessages = new ArrayList<>();
4357

4458
// when
45-
try (CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add)) {
46-
CompletableFuture.runAsync(listener);
47-
}
59+
CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);
60+
CompletableFuture.runAsync(listener);
61+
4862
// and
4963
publishArticles(topic,
64+
"Introduction to Kafka",
65+
"Kotlin for Java Developers",
66+
"Reactive Spring Boot",
67+
"Deploying Spring Boot Applications",
68+
"Spring Security"
69+
);
70+
71+
// then
72+
await().untilAsserted(() ->
73+
assertThat(consumedMessages).containsExactlyInAnyOrder(
5074
"Introduction to Kafka",
5175
"Kotlin for Java Developers",
5276
"Reactive Spring Boot",
5377
"Deploying Spring Boot Applications",
5478
"Spring Security"
55-
);
79+
));
5680

57-
// then
58-
await().untilAsserted(() ->
59-
assertThat(consumedMessages).containsExactlyInAnyOrder(
60-
"Introduction to Kafka",
61-
"Kotlin for Java Developers",
62-
"Reactive Spring Boot",
63-
"Deploying Spring Boot Applications",
64-
"Spring Security"
65-
));
6681
}
6782

6883
private void publishArticles(String topic, String... articles) {
6984
try (KafkaProducer<String, String> producer = testKafkaProducer()) {
7085
Arrays.stream(articles)
71-
.map(article -> new ProducerRecord<String,String>(topic, article))
86+
.map(article -> new ProducerRecord<>(topic, "key-1", article))
7287
.forEach(producer::send);
7388
}
7489
}

0 commit comments

Comments
 (0)