Skip to content

Commit 5a555f7

Browse files
[BAEL-4907] - Add the tests to libraries-data-3 (#10848)
1 parent 1b2a677 commit 5a555f7

4 files changed

Lines changed: 349 additions & 0 deletions

File tree

libraries-data-3/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
## Data Libraries
2+
3+
This module contains articles about libraries for data processing in Java.
4+
5+
### Relevant articles
6+
- [Kafka Streams vs Kafka Consumer]()
7+
- More articles: [[<-- prev]](/../libraries-data-2)
8+
9+
##### Building the project
10+
You can build the project from the command line using: *mvn clean install*, or in an IDE. If you have issues with the derive4j imports in your IDE, you have to add the folder: *target/generated-sources/annotations* to the project build path in your IDE.

libraries-data-3/log4j.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
log4j.rootLogger=INFO, stdout

libraries-data-3/pom.xml

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<artifactId>libraries-data-3</artifactId>
7+
<name>libraries-data-3</name>
8+
9+
<parent>
10+
<groupId>com.baeldung</groupId>
11+
<artifactId>parent-modules</artifactId>
12+
<version>1.0.0-SNAPSHOT</version>
13+
</parent>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>org.apache.kafka</groupId>
18+
<artifactId>kafka-clients</artifactId>
19+
<version>${kafka.version}</version>
20+
<classifier>test</classifier>
21+
<scope>test</scope>
22+
</dependency>
23+
<dependency>
24+
<groupId>org.apache.kafka</groupId>
25+
<artifactId>kafka-streams</artifactId>
26+
<version>${kafka.version}</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.slf4j</groupId>
30+
<artifactId>slf4j-api</artifactId>
31+
<version>${slf4j.version}</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>org.slf4j</groupId>
35+
<artifactId>slf4j-log4j12</artifactId>
36+
<version>${slf4j.version}</version>
37+
</dependency>
38+
<dependency>
39+
<groupId>org.assertj</groupId>
40+
<artifactId>assertj-core</artifactId>
41+
<version>${assertj.version}</version>
42+
<scope>test</scope>
43+
</dependency>
44+
<dependency>
45+
<groupId>org.testcontainers</groupId>
46+
<artifactId>kafka</artifactId>
47+
<version>${testcontainers-kafka.version}</version>
48+
<scope>test</scope>
49+
</dependency>
50+
</dependencies>
51+
52+
<properties>
53+
<assertj.version>3.6.2</assertj.version>
54+
<slf4j.version>1.7.25</slf4j.version>
55+
<kafka.version>2.8.0</kafka.version>
56+
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
57+
</properties>
58+
59+
</project>
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
package com.baeldung.kafka.streams;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerConfig;
4+
import org.apache.kafka.clients.producer.KafkaProducer;
5+
import org.apache.kafka.clients.producer.ProducerRecord;
6+
import org.apache.kafka.common.serialization.Serde;
7+
import org.apache.kafka.common.serialization.Serdes;
8+
import org.apache.kafka.common.utils.Bytes;
9+
import org.apache.kafka.streams.KafkaStreams;
10+
import org.apache.kafka.streams.KeyValue;
11+
import org.apache.kafka.streams.StoreQueryParameters;
12+
import org.apache.kafka.streams.StreamsBuilder;
13+
import org.apache.kafka.streams.StreamsConfig;
14+
import org.apache.kafka.streams.Topology;
15+
import org.apache.kafka.streams.kstream.Consumed;
16+
import org.apache.kafka.streams.kstream.Grouped;
17+
import org.apache.kafka.streams.kstream.JoinWindows;
18+
import org.apache.kafka.streams.kstream.KGroupedStream;
19+
import org.apache.kafka.streams.kstream.KGroupedTable;
20+
import org.apache.kafka.streams.kstream.KStream;
21+
import org.apache.kafka.streams.kstream.KTable;
22+
import org.apache.kafka.streams.kstream.Materialized;
23+
import org.apache.kafka.streams.kstream.Produced;
24+
import org.apache.kafka.streams.kstream.TimeWindows;
25+
import org.apache.kafka.streams.state.KeyValueIterator;
26+
import org.apache.kafka.streams.state.KeyValueStore;
27+
import org.apache.kafka.streams.state.QueryableStoreTypes;
28+
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
29+
import org.apache.kafka.streams.state.StoreBuilder;
30+
import org.apache.kafka.streams.state.Stores;
31+
import org.apache.kafka.streams.state.WindowStore;
32+
import org.junit.Before;
33+
import org.junit.ClassRule;
34+
import org.junit.Test;
35+
import org.testcontainers.containers.KafkaContainer;
36+
import org.testcontainers.utility.DockerImageName;
37+
38+
import java.time.Duration;
39+
import java.util.Arrays;
40+
import java.util.Locale;
41+
import java.util.Properties;
42+
43+
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
44+
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
45+
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
46+
47+
public class KafkaStreamsLiveTest {
48+
private final String LEFT_TOPIC = "left-stream-topic";
49+
private final String RIGHT_TOPIC = "right-stream-topic";
50+
private final String LEFT_RIGHT_TOPIC = "left-right-stream-topic";
51+
52+
private KafkaProducer<String, String> producer = createKafkaProducer();
53+
private Properties streamsConfiguration = new Properties();
54+
55+
static final String TEXT_LINES_TOPIC = "TextLinesTopic";
56+
57+
private final String TEXT_EXAMPLE_1 = "test test and test";
58+
private final String TEXT_EXAMPLE_2 = "test filter filter this sentence";
59+
60+
@ClassRule
61+
public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
62+
63+
@Before
64+
public void setUp() {
65+
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
66+
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
67+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
68+
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
69+
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
70+
}
71+
72+
@Test
73+
public void shouldTestKafkaTableLatestWord() throws InterruptedException {
74+
String inputTopic = "topicTable";
75+
76+
final StreamsBuilder builder = new StreamsBuilder();
77+
78+
KTable<String, String> textLinesTable = builder.table(inputTopic,
79+
Consumed.with(Serdes.String(), Serdes.String()));
80+
81+
textLinesTable.toStream().foreach((word, count) -> System.out.println("Latest word: " + word + " -> " + count));
82+
83+
final Topology topology = builder.build();
84+
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "latest-word-id");
85+
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
86+
87+
streams.cleanUp();
88+
streams.start();
89+
producer.send(new ProducerRecord<String, String>(inputTopic, "1", TEXT_EXAMPLE_1));
90+
producer.send(new ProducerRecord<String, String>(inputTopic, "2", TEXT_EXAMPLE_2));
91+
92+
Thread.sleep(2000);
93+
streams.close();
94+
}
95+
96+
@Test
97+
public void shouldTestWordCountKafkaStreams() throws InterruptedException {
98+
String wordCountTopic = "wordCountTopic";
99+
100+
final StreamsBuilder builder = new StreamsBuilder();
101+
KStream<String, String> textLines = builder.stream(wordCountTopic,
102+
Consumed.with(Serdes.String(), Serdes.String()));
103+
104+
KTable<String, Long> wordCounts = textLines
105+
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT)
106+
.split("\\W+")))
107+
.groupBy((key, word) -> word)
108+
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));
109+
110+
wordCounts.toStream().foreach((word, count) -> System.out.println("Word: " + word + " -> " + count));
111+
112+
wordCounts.toStream().to("outputTopic",
113+
Produced.with(Serdes.String(), Serdes.Long()));
114+
115+
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-stream-table-id");
116+
final Topology topology = builder.build();
117+
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
118+
119+
streams.cleanUp();
120+
streams.start();
121+
122+
producer.send(new ProducerRecord<String, String>(wordCountTopic, "1", TEXT_EXAMPLE_1));
123+
producer.send(new ProducerRecord<String, String>(wordCountTopic, "2", TEXT_EXAMPLE_2));
124+
125+
Thread.sleep(2000);
126+
streams.close();
127+
}
128+
129+
// Filter, map
130+
@Test
131+
public void shouldTestStatelessTransformations() throws InterruptedException {
132+
String wordCountTopic = "wordCountTopic";
133+
134+
//when
135+
final StreamsBuilder builder = new StreamsBuilder();
136+
KStream<String, String> textLines = builder.stream(wordCountTopic,
137+
Consumed.with(Serdes.String(), Serdes.String()));
138+
139+
final KStream<String, String> textLinesUpperCase =
140+
textLines
141+
.map((key, value) -> KeyValue.pair(value, value.toUpperCase()))
142+
.filter((key, value) -> value.contains("FILTER"));
143+
144+
KTable<String, Long> wordCounts = textLinesUpperCase
145+
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
146+
.groupBy((key, word) -> word)
147+
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));
148+
149+
wordCounts.toStream().foreach((word, count) -> System.out.println("Word: " + word + " -> " + count));
150+
151+
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-filter-map-id");
152+
final Topology topology = builder.build();
153+
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
154+
155+
streams.cleanUp();
156+
streams.start();
157+
158+
producer.send(new ProducerRecord<String, String>(wordCountTopic, "1", TEXT_EXAMPLE_1));
159+
producer.send(new ProducerRecord<String, String>(wordCountTopic, "2", TEXT_EXAMPLE_2));
160+
161+
Thread.sleep(2000);
162+
streams.close();
163+
164+
}
165+
166+
@Test
167+
public void shouldTestAggregationStatefulTransformations() throws InterruptedException {
168+
String aggregationTopic = "aggregationTopic";
169+
170+
final StreamsBuilder builder = new StreamsBuilder();
171+
final KStream<byte[], String> input = builder.stream(aggregationTopic,
172+
Consumed.with(Serdes.ByteArray(), Serdes.String()));
173+
final KTable<String, Long> aggregated = input
174+
.groupBy((key, value) -> (value != null && value.length() > 0) ? value.substring(0, 2).toLowerCase() : "",
175+
Grouped.with(Serdes.String(), Serdes.String()))
176+
.aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue.length(),
177+
Materialized.with(Serdes.String(), Serdes.Long()));
178+
179+
aggregated.toStream().foreach((word, count) -> System.out.println("Word: " + word + " -> " + count));
180+
181+
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregation-id");
182+
final Topology topology = builder.build();
183+
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
184+
185+
streams.cleanUp();
186+
streams.start();
187+
188+
producer.send(new ProducerRecord<String, String>(aggregationTopic, "1", "one"));
189+
producer.send(new ProducerRecord<String, String>(aggregationTopic, "2", "two"));
190+
producer.send(new ProducerRecord<String, String>(aggregationTopic, "3", "three"));
191+
producer.send(new ProducerRecord<String, String>(aggregationTopic, "4", "four"));
192+
producer.send(new ProducerRecord<String, String>(aggregationTopic, "5", "five"));
193+
194+
Thread.sleep(5000);
195+
streams.close();
196+
197+
}
198+
199+
@Test
200+
public void shouldTestWindowingJoinStatefulTransformations() throws InterruptedException {
201+
final StreamsBuilder builder = new StreamsBuilder();
202+
203+
KStream<String, String> leftSource = builder.stream(LEFT_TOPIC);
204+
KStream<String, String> rightSource = builder.stream(RIGHT_TOPIC);
205+
206+
KStream<String, String> leftRightSource = leftSource.outerJoin(rightSource,
207+
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
208+
JoinWindows.of(Duration.ofSeconds(5)))
209+
.groupByKey()
210+
.reduce(((key, lastValue) -> lastValue))
211+
.toStream();
212+
213+
leftRightSource.foreach((key, value) -> System.out.println("(key= " + key + ") -> (" + value + ")"));
214+
215+
final Topology topology = builder.build();
216+
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowing-join-id");
217+
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
218+
219+
streams.cleanUp();
220+
streams.start();
221+
222+
producer.send(new ProducerRecord<String, String>(LEFT_TOPIC, "1", "left"));
223+
producer.send(new ProducerRecord<String, String>(RIGHT_TOPIC, "2", "right"));
224+
225+
Thread.sleep(2000);
226+
streams.close();
227+
}
228+
229+
@Test
230+
public void shouldTestWordCountWithInteractiveQueries() throws InterruptedException {
231+
232+
final Serde<String> stringSerde = Serdes.String();
233+
final StreamsBuilder builder = new StreamsBuilder();
234+
final KStream<String, String>
235+
textLines = builder.stream(TEXT_LINES_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
236+
237+
final KGroupedStream<String, String> groupedByWord = textLines
238+
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
239+
.groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));
240+
241+
groupedByWord.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("WordCountsStore")
242+
.withValueSerde(Serdes.Long()));
243+
244+
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-interactive-queries");
245+
246+
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
247+
streams.cleanUp();
248+
streams.start();
249+
250+
producer.send(new ProducerRecord<String, String>(TEXT_LINES_TOPIC, "1", TEXT_EXAMPLE_1));
251+
producer.send(new ProducerRecord<String, String>(TEXT_LINES_TOPIC, "2", TEXT_EXAMPLE_2));
252+
253+
Thread.sleep(2000);
254+
ReadOnlyKeyValueStore<String, Long> keyValueStore =
255+
streams.store(StoreQueryParameters.fromNameAndType(
256+
"WordCountsStore", QueryableStoreTypes.keyValueStore()));
257+
258+
KeyValueIterator<String, Long> range = keyValueStore.all();
259+
while (range.hasNext()) {
260+
KeyValue<String, Long> next = range.next();
261+
System.out.println("Count for " + next.key + ": " + next.value);
262+
}
263+
264+
streams.close();
265+
}
266+
267+
private static KafkaProducer<String, String> createKafkaProducer() {
268+
269+
Properties props = new Properties();
270+
props.put(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
271+
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
272+
props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
273+
274+
return new KafkaProducer(props);
275+
276+
}
277+
}
278+
279+

0 commit comments

Comments
 (0)