Skip to content

Commit 40ef28a

Browse files
committed
BAEL-7258: renaming
1 parent 5d13422 commit 40ef28a

2 files changed

Lines changed: 14 additions & 16 deletions

File tree

apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/CustomKafkaListener.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import org.apache.kafka.clients.consumer.KafkaConsumer;
1313
import org.apache.kafka.common.serialization.StringDeserializer;
1414

15-
public class CustomKafkaListener implements Runnable, Closeable {
15+
public class CustomKafkaListener implements Runnable, AutoCloseable {
1616

1717
private static final Logger log = Logger.getLogger(CustomKafkaListener.class.getName());
1818

@@ -43,7 +43,7 @@ private static KafkaConsumer<String, String> defaultKafkaConsumer(String boostra
4343
return new KafkaConsumer<>(props);
4444
}
4545

46-
public CustomKafkaListener doForEach(Consumer<String> newConsumer) {
46+
public CustomKafkaListener onEach(Consumer<String> newConsumer) {
4747
recordConsumer = recordConsumer.andThen(newConsumer);
4848
return this;
4949
}
@@ -52,10 +52,9 @@ public CustomKafkaListener doForEach(Consumer<String> newConsumer) {
5252
public void run() {
5353
running.set(true);
5454
consumer.subscribe(Arrays.asList(topic));
55-
5655
while (running.get()) {
5756
consumer.poll(Duration.ofMillis(100))
58-
.forEach(record -> recordConsumer.accept(record.value()));
57+
.forEach(record -> recordConsumer.accept(record.value()));
5958
}
6059
}
6160

apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/KafkaListenerWithoutSpringLiveTest.java renamed to apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/CustomKafkaListenerLiveTest.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
88

99
import java.util.ArrayList;
10+
import java.util.Arrays;
1011
import java.util.List;
1112
import java.util.Properties;
1213
import java.util.concurrent.CompletableFuture;
@@ -23,7 +24,7 @@
2324
import org.testcontainers.utility.DockerImageName;
2425

2526
@Testcontainers
26-
class KafkaListenerWithoutSpringLiveTest {
27+
class CustomKafkaListenerLiveTest {
2728

2829
@Container
2930
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@@ -34,30 +35,28 @@ class KafkaListenerWithoutSpringLiveTest {
3435
}
3536

3637
@Test
37-
void test() {
38+
void givenANewCustomKafkaListener_thenConsumesAllMessages() {
3839
// given
3940
String topic = "baeldung.articles.published";
4041
String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
4142
List<String> consumedMessages = new ArrayList<>();
4243

4344
// when
44-
try (CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers)) {
45-
CompletableFuture.runAsync(() ->
46-
listener.doForEach(consumedMessages::add).run()
47-
);
45+
try (CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add)) {
46+
CompletableFuture.runAsync(listener);
4847
}
4948
// and
50-
publishArticles(topic, asList(
49+
publishArticles(topic,
5150
"Introduction to Kafka",
5251
"Kotlin for Java Developers",
5352
"Reactive Spring Boot",
5453
"Deploying Spring Boot Applications",
5554
"Spring Security"
56-
));
55+
);
5756

5857
// then
59-
await().untilAsserted(() -> assertThat(consumedMessages)
60-
.containsExactlyInAnyOrder(
58+
await().untilAsserted(() ->
59+
assertThat(consumedMessages).containsExactlyInAnyOrder(
6160
"Introduction to Kafka",
6261
"Kotlin for Java Developers",
6362
"Reactive Spring Boot",
@@ -66,9 +65,9 @@ void test() {
6665
));
6766
}
6867

69-
private void publishArticles(String topic, List<String> articles) {
68+
private void publishArticles(String topic, String... articles) {
7069
try (KafkaProducer<String, String> producer = testKafkaProducer()) {
71-
articles.stream()
70+
Arrays.stream(articles)
7271
.map(article -> new ProducerRecord<String,String>(topic, article))
7372
.forEach(producer::send);
7473
}

0 commit comments

Comments
 (0)