77import static org .testcontainers .shaded .org .awaitility .Awaitility .await ;
88
99import java .util .ArrayList ;
10+ import java .util .Arrays ;
1011import java .util .List ;
1112import java .util .Properties ;
1213import java .util .concurrent .CompletableFuture ;
2324import org .testcontainers .utility .DockerImageName ;
2425
2526@ Testcontainers
26- class KafkaListenerWithoutSpringLiveTest {
27+ class CustomKafkaListenerLiveTest {
2728
2829 @ Container
2930 private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer (DockerImageName .parse ("confluentinc/cp-kafka:latest" ));
@@ -34,30 +35,28 @@ class KafkaListenerWithoutSpringLiveTest {
3435 }
3536
3637 @ Test
37- void test () {
38+ void givenANewCustomKafkaListener_thenConsumesAllMessages () {
3839 // given
3940 String topic = "baeldung.articles.published" ;
4041 String bootstrapServers = KAFKA_CONTAINER .getBootstrapServers ();
4142 List <String > consumedMessages = new ArrayList <>();
4243
4344 // when
44- try (CustomKafkaListener listener = new CustomKafkaListener (topic , bootstrapServers )) {
45- CompletableFuture .runAsync (() ->
46- listener .doForEach (consumedMessages ::add ).run ()
47- );
45+ try (CustomKafkaListener listener = new CustomKafkaListener (topic , bootstrapServers ).onEach (consumedMessages ::add )) {
46+ CompletableFuture .runAsync (listener );
4847 }
4948 // and
50- publishArticles (topic , asList (
49+ publishArticles (topic ,
5150 "Introduction to Kafka" ,
5251 "Kotlin for Java Developers" ,
5352 "Reactive Spring Boot" ,
5453 "Deploying Spring Boot Applications" ,
5554 "Spring Security"
56- )) ;
55+ );
5756
5857 // then
59- await ().untilAsserted (() -> assertThat ( consumedMessages )
60- .containsExactlyInAnyOrder (
58+ await ().untilAsserted (() ->
59+ assertThat ( consumedMessages ) .containsExactlyInAnyOrder (
6160 "Introduction to Kafka" ,
6261 "Kotlin for Java Developers" ,
6362 "Reactive Spring Boot" ,
@@ -66,9 +65,9 @@ void test() {
6665 ));
6766 }
6867
69- private void publishArticles (String topic , List < String > articles ) {
68+ private void publishArticles (String topic , String ... articles ) {
7069 try (KafkaProducer <String , String > producer = testKafkaProducer ()) {
71- articles .stream ()
70+ Arrays .stream (articles )
7271 .map (article -> new ProducerRecord <String ,String >(topic , article ))
7372 .forEach (producer ::send );
7473 }
0 commit comments