Skip to content

Commit bddea52

Browse files
author
Bogdan Feraru
committed
[BAEL-3890] Using Kafka MockConsumer
1 parent 1fe51ed commit bddea52

4 files changed

Lines changed: 194 additions & 0 deletions

File tree

libraries-data-2/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@
121121
<artifactId>univocity-parsers</artifactId>
122122
<version>${univocity.version}</version>
123123
</dependency>
124+
<dependency>
125+
<groupId>org.apache.kafka</groupId>
126+
<artifactId>kafka-clients</artifactId>
127+
<version>${kafka.version}</version>
128+
</dependency>
124129
<dependency>
125130
<groupId>org.awaitility</groupId>
126131
<artifactId>awaitility</artifactId>
@@ -184,6 +189,7 @@
184189
<renjin.version>RELEASE</renjin.version>
185190
<rcaller.version>3.0</rcaller.version>
186191
<rserve.version>1.8.1</rserve.version>
192+
<kafka.version>2.5.0</kafka.version>
187193
</properties>
188194

189195
<build>
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.baeldung.kafka.consumer;
2+
3+
class CountryPopulation {
4+
5+
private String country;
6+
private Integer population;
7+
8+
public CountryPopulation(String country, Integer population) {
9+
this.country = country;
10+
this.population = population;
11+
}
12+
13+
public String getCountry() {
14+
return country;
15+
}
16+
17+
public void setCountry(String country) {
18+
this.country = country;
19+
}
20+
21+
public Integer getPopulation() {
22+
return population;
23+
}
24+
25+
public void setPopulation(Integer population) {
26+
this.population = population;
27+
}
28+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.baeldung.kafka.consumer;
2+
3+
import java.time.Duration;
4+
import java.util.Collections;
5+
import java.util.stream.StreamSupport;
6+
7+
import org.apache.kafka.clients.consumer.Consumer;
8+
import org.apache.kafka.clients.consumer.ConsumerRecords;
9+
import org.apache.kafka.common.TopicPartition;
10+
import org.apache.kafka.common.errors.WakeupException;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
public class CountryPopulationConsumer {
15+
16+
private static Logger logger = LoggerFactory.getLogger(CountryPopulationConsumer.class);
17+
18+
private Consumer<String, Integer> consumer;
19+
private java.util.function.Consumer<Throwable> exceptionConsumer;
20+
private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer;
21+
22+
public CountryPopulationConsumer(
23+
Consumer<String, Integer> consumer, java.util.function.Consumer<Throwable> exceptionConsumer,
24+
java.util.function.Consumer<CountryPopulation> countryPopulationConsumer) {
25+
this.consumer = consumer;
26+
this.exceptionConsumer = exceptionConsumer;
27+
this.countryPopulationConsumer = countryPopulationConsumer;
28+
}
29+
30+
void startBySubscribing(String topic) {
31+
consume(() -> consumer.subscribe(Collections.singleton(topic)));
32+
}
33+
34+
void startByAssigning(String topic, int partition) {
35+
consume(() -> consumer.assign(Collections.singleton(new TopicPartition(topic, partition))));
36+
}
37+
38+
private void consume(Runnable beforePollingTask) {
39+
try {
40+
beforePollingTask.run();
41+
while (true) {
42+
ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(1000));
43+
StreamSupport.stream(records.spliterator(), false)
44+
.map(record -> new CountryPopulation(record.key(), record.value()))
45+
.forEach(countryPopulationConsumer);
46+
consumer.commitSync();
47+
}
48+
} catch (WakeupException e) {
49+
logger.info("Shutting down...");
50+
} catch (RuntimeException ex) {
51+
exceptionConsumer.accept(ex);
52+
} finally {
53+
consumer.close();
54+
}
55+
}
56+
57+
public void stop() {
58+
consumer.wakeup();
59+
}
60+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package com.baeldung.kafka.consumer;
2+
3+
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
4+
5+
import java.util.ArrayList;
6+
import java.util.Collections;
7+
import java.util.HashMap;
8+
import java.util.List;
9+
10+
import org.apache.kafka.clients.consumer.ConsumerRecord;
11+
import org.apache.kafka.clients.consumer.MockConsumer;
12+
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
13+
import org.apache.kafka.common.KafkaException;
14+
import org.apache.kafka.common.TopicPartition;
15+
import org.junit.jupiter.api.BeforeEach;
16+
import org.junit.jupiter.api.Test;
17+
18+
class CountryPopulationConsumerUnitTest {
19+
20+
private static final String TOPIC = "topic";
21+
private static final int PARTITION = 0;
22+
23+
private CountryPopulationConsumer countryPopulationConsumer;
24+
25+
private List<CountryPopulation> updates;
26+
private Throwable pollException;
27+
28+
private MockConsumer<String, Integer> consumer;
29+
30+
@BeforeEach
31+
void setUp() {
32+
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
33+
updates = new ArrayList<>();
34+
countryPopulationConsumer = new CountryPopulationConsumer(consumer, ex -> this.pollException = ex, updates::add);
35+
}
36+
37+
@Test
38+
void whenStartingByAssigningTopicPartition_thenExpectUpdatesAreConsumedCorrectly() {
39+
// GIVEN
40+
consumer.schedulePollTask(() -> consumer.addRecord(record(TOPIC, PARTITION, "Romania", 19_410_000)));
41+
consumer.schedulePollTask(() -> countryPopulationConsumer.stop());
42+
43+
HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
44+
TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
45+
startOffsets.put(tp, 0L);
46+
consumer.updateBeginningOffsets(startOffsets);
47+
48+
// WHEN
49+
countryPopulationConsumer.startByAssigning(TOPIC, PARTITION);
50+
51+
// THEN
52+
assertThat(updates).hasSize(1);
53+
assertThat(consumer.closed()).isTrue();
54+
}
55+
56+
@Test
57+
void whenStartingBySubscribingToTopic_thenExpectUpdatesAreConsumedCorrectly() {
58+
// GIVEN
59+
consumer.schedulePollTask(() -> {
60+
consumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, 0)));
61+
consumer.addRecord(record(TOPIC, PARTITION, "Romania", 19_410_000));
62+
});
63+
consumer.schedulePollTask(() -> countryPopulationConsumer.stop());
64+
65+
HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
66+
TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
67+
startOffsets.put(tp, 0L);
68+
consumer.updateBeginningOffsets(startOffsets);
69+
70+
// WHEN
71+
countryPopulationConsumer.startBySubscribing(TOPIC);
72+
73+
// THEN
74+
assertThat(updates).hasSize(1);
75+
assertThat(consumer.closed()).isTrue();
76+
}
77+
78+
@Test
79+
void whenStartingBySubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly() {
80+
// GIVEN
81+
consumer.schedulePollTask(() -> consumer.setPollException(new KafkaException("poll exception")));
82+
consumer.schedulePollTask(() -> countryPopulationConsumer.stop());
83+
84+
HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
85+
TopicPartition tp = new TopicPartition(TOPIC, 0);
86+
startOffsets.put(tp, 0L);
87+
consumer.updateBeginningOffsets(startOffsets);
88+
89+
// WHEN
90+
countryPopulationConsumer.startBySubscribing(TOPIC);
91+
92+
// THEN
93+
assertThat(pollException).isInstanceOf(KafkaException.class).hasMessage("poll exception");
94+
assertThat(consumer.closed()).isTrue();
95+
}
96+
97+
private ConsumerRecord<String, Integer> record(String topic, int partition, String country, int population) {
98+
return new ConsumerRecord<>(topic, partition, 0, country, population);
99+
}
100+
}

0 commit comments

Comments
 (0)