44import static java .time .Duration .ofSeconds ;
55import static java .util .Arrays .asList ;
66import static org .assertj .core .api .Assertions .assertThat ;
7+ import static org .junit .jupiter .api .Assertions .assertEquals ;
8+ import static org .junit .jupiter .api .Assertions .assertNotNull ;
79import static org .testcontainers .shaded .org .awaitility .Awaitility .await ;
810
9- import java .util .ArrayList ;
10- import java .util .Arrays ;
11- import java .util .List ;
12- import java .util .Properties ;
11+ import java .time .Duration ;
12+ import java .util .*;
1313import java .util .concurrent .CompletableFuture ;
14+ import java .util .concurrent .ExecutionException ;
15+ import java .util .concurrent .Future ;
1416
17+ import org .apache .kafka .clients .admin .Admin ;
18+ import org .apache .kafka .clients .admin .AdminClientConfig ;
19+ import org .apache .kafka .clients .admin .NewTopic ;
20+ import org .apache .kafka .clients .consumer .ConsumerConfig ;
21+ import org .apache .kafka .clients .consumer .ConsumerRecord ;
22+ import org .apache .kafka .clients .consumer .ConsumerRecords ;
23+ import org .apache .kafka .clients .consumer .KafkaConsumer ;
1524import org .apache .kafka .clients .producer .KafkaProducer ;
1625import org .apache .kafka .clients .producer .ProducerConfig ;
1726import org .apache .kafka .clients .producer .ProducerRecord ;
27+ import org .apache .kafka .clients .producer .RecordMetadata ;
28+ import org .apache .kafka .common .header .Header ;
29+ import org .apache .kafka .common .header .Headers ;
30+ import org .apache .kafka .common .serialization .StringDeserializer ;
1831import org .apache .kafka .common .serialization .StringSerializer ;
32+ import org .junit .jupiter .api .BeforeAll ;
1933import org .junit .jupiter .api .Test ;
2034import org .testcontainers .containers .KafkaContainer ;
2135import org .testcontainers .junit .jupiter .Container ;
@@ -30,7 +44,7 @@ class CustomKafkaListenerLiveTest {
3044 private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer (DockerImageName .parse ("confluentinc/cp-kafka:latest" ));
3145
3246 static {
33- Awaitility .setDefaultTimeout (ofSeconds (1L ));
47+ Awaitility .setDefaultTimeout (ofSeconds (5L ));
3448 Awaitility .setDefaultPollInterval (ofMillis (50L ));
3549 }
3650
@@ -42,33 +56,34 @@ void givenANewCustomKafkaListener_thenConsumesAllMessages() {
4256 List <String > consumedMessages = new ArrayList <>();
4357
4458 // when
45- try ( CustomKafkaListener listener = new CustomKafkaListener (topic , bootstrapServers ).onEach (consumedMessages ::add )) {
46- CompletableFuture .runAsync (listener );
47- }
59+ CustomKafkaListener listener = new CustomKafkaListener (topic , bootstrapServers ).onEach (consumedMessages ::add );
60+ CompletableFuture .runAsync (listener );
61+
4862 // and
4963 publishArticles (topic ,
64+ "Introduction to Kafka" ,
65+ "Kotlin for Java Developers" ,
66+ "Reactive Spring Boot" ,
67+ "Deploying Spring Boot Applications" ,
68+ "Spring Security"
69+ );
70+
71+ // then
72+ await ().untilAsserted (() ->
73+ assertThat (consumedMessages ).containsExactlyInAnyOrder (
5074 "Introduction to Kafka" ,
5175 "Kotlin for Java Developers" ,
5276 "Reactive Spring Boot" ,
5377 "Deploying Spring Boot Applications" ,
5478 "Spring Security"
55- );
79+ ) );
5680
57- // then
58- await ().untilAsserted (() ->
59- assertThat (consumedMessages ).containsExactlyInAnyOrder (
60- "Introduction to Kafka" ,
61- "Kotlin for Java Developers" ,
62- "Reactive Spring Boot" ,
63- "Deploying Spring Boot Applications" ,
64- "Spring Security"
65- ));
6681 }
6782
6883 private void publishArticles (String topic , String ... articles ) {
6984 try (KafkaProducer <String , String > producer = testKafkaProducer ()) {
7085 Arrays .stream (articles )
71- .map (article -> new ProducerRecord <String , String >(topic , article ))
86+ .map (article -> new ProducerRecord <>(topic , "key-1" , article ))
7287 .forEach (producer ::send );
7388 }
7489 }
0 commit comments