1+ package com .baeldung .kafka .consumer ;
2+
3+ import static org .assertj .core .api .AssertionsForInterfaceTypes .assertThat ;
4+
5+ import java .util .ArrayList ;
6+ import java .util .Collections ;
7+ import java .util .HashMap ;
8+ import java .util .List ;
9+
10+ import org .apache .kafka .clients .consumer .ConsumerRecord ;
11+ import org .apache .kafka .clients .consumer .MockConsumer ;
12+ import org .apache .kafka .clients .consumer .OffsetResetStrategy ;
13+ import org .apache .kafka .common .KafkaException ;
14+ import org .apache .kafka .common .TopicPartition ;
15+ import org .junit .jupiter .api .BeforeEach ;
16+ import org .junit .jupiter .api .Test ;
17+
18+ class CountryPopulationConsumerUnitTest {
19+
20+ private static final String TOPIC = "topic" ;
21+ private static final int PARTITION = 0 ;
22+
23+ private CountryPopulationConsumer countryPopulationConsumer ;
24+
25+ private List <CountryPopulation > updates ;
26+ private Throwable pollException ;
27+
28+ private MockConsumer <String , Integer > consumer ;
29+
30+ @ BeforeEach
31+ void setUp () {
32+ consumer = new MockConsumer <>(OffsetResetStrategy .EARLIEST );
33+ updates = new ArrayList <>();
34+ countryPopulationConsumer = new CountryPopulationConsumer (consumer , ex -> this .pollException = ex , updates ::add );
35+ }
36+
37+ @ Test
38+ void whenStartingByAssigningTopicPartition_thenExpectUpdatesAreConsumedCorrectly () {
39+ // GIVEN
40+ consumer .schedulePollTask (() -> consumer .addRecord (record (TOPIC , PARTITION , "Romania" , 19_410_000 )));
41+ consumer .schedulePollTask (() -> countryPopulationConsumer .stop ());
42+
43+ HashMap <TopicPartition , Long > startOffsets = new HashMap <>();
44+ TopicPartition tp = new TopicPartition (TOPIC , PARTITION );
45+ startOffsets .put (tp , 0L );
46+ consumer .updateBeginningOffsets (startOffsets );
47+
48+ // WHEN
49+ countryPopulationConsumer .startByAssigning (TOPIC , PARTITION );
50+
51+ // THEN
52+ assertThat (updates ).hasSize (1 );
53+ assertThat (consumer .closed ()).isTrue ();
54+ }
55+
56+ @ Test
57+ void whenStartingBySubscribingToTopic_thenExpectUpdatesAreConsumedCorrectly () {
58+ // GIVEN
59+ consumer .schedulePollTask (() -> {
60+ consumer .rebalance (Collections .singletonList (new TopicPartition (TOPIC , 0 )));
61+ consumer .addRecord (record (TOPIC , PARTITION , "Romania" , 19_410_000 ));
62+ });
63+ consumer .schedulePollTask (() -> countryPopulationConsumer .stop ());
64+
65+ HashMap <TopicPartition , Long > startOffsets = new HashMap <>();
66+ TopicPartition tp = new TopicPartition (TOPIC , PARTITION );
67+ startOffsets .put (tp , 0L );
68+ consumer .updateBeginningOffsets (startOffsets );
69+
70+ // WHEN
71+ countryPopulationConsumer .startBySubscribing (TOPIC );
72+
73+ // THEN
74+ assertThat (updates ).hasSize (1 );
75+ assertThat (consumer .closed ()).isTrue ();
76+ }
77+
78+ @ Test
79+ void whenStartingBySubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly () {
80+ // GIVEN
81+ consumer .schedulePollTask (() -> consumer .setPollException (new KafkaException ("poll exception" )));
82+ consumer .schedulePollTask (() -> countryPopulationConsumer .stop ());
83+
84+ HashMap <TopicPartition , Long > startOffsets = new HashMap <>();
85+ TopicPartition tp = new TopicPartition (TOPIC , 0 );
86+ startOffsets .put (tp , 0L );
87+ consumer .updateBeginningOffsets (startOffsets );
88+
89+ // WHEN
90+ countryPopulationConsumer .startBySubscribing (TOPIC );
91+
92+ // THEN
93+ assertThat (pollException ).isInstanceOf (KafkaException .class ).hasMessage ("poll exception" );
94+ assertThat (consumer .closed ()).isTrue ();
95+ }
96+
97+ private ConsumerRecord <String , Integer > record (String topic , int partition , String country , int population ) {
98+ return new ConsumerRecord <>(topic , partition , 0 , country , population );
99+ }
100+ }
0 commit comments