Skip to content

Commit 2262aca

Browse files
authored
Merge branch 'master' into task/JAVA-3247
2 parents b6a5974 + 4083feb commit 2262aca

107 files changed

Lines changed: 11466 additions & 2493 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apache-kafka/README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
## Apache Kafka
2+
3+
This module contains articles about Apache Kafka.
4+
5+
### Relevant articles
6+
- [Kafka Streams vs Kafka Consumer](https://www.baeldung.com/java-kafka-streams-vs-kafka-consumer)
7+
- [Kafka Topic Creation Using Java](https://www.baeldung.com/kafka-topic-creation)
8+
- [Using Kafka MockConsumer](https://www.baeldung.com/kafka-mockconsumer)
9+
- [Using Kafka MockProducer](https://www.baeldung.com/kafka-mockproducer)
10+
- [Introduction to KafkaStreams in Java](https://www.baeldung.com/java-kafka-streams)
11+
- [Introduction to Kafka Connectors](https://www.baeldung.com/kafka-connectors-guide)
12+
- [Kafka Connect Example with MQTT and MongoDB](https://www.baeldung.com/kafka-connect-mqtt-mongodb)
13+
- [Building a Data Pipeline with Flink and Kafka](https://www.baeldung.com/kafka-flink-data-pipeline)
14+
- [Exactly Once Processing in Kafka with Java](https://www.baeldung.com/kafka-exactly-once)
15+
16+
17+
##### Building the project
18+
You can build the project from the command line using: *mvn clean install*, or in an IDE.

apache-kafka/pom.xml

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<artifactId>apache-kafka</artifactId>
7+
<name>apache-kafka</name>
8+
9+
<parent>
10+
<groupId>com.baeldung</groupId>
11+
<artifactId>parent-modules</artifactId>
12+
<version>1.0.0-SNAPSHOT</version>
13+
</parent>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>org.apache.kafka</groupId>
18+
<artifactId>kafka-clients</artifactId>
19+
<version>${kafka.version}</version>
20+
</dependency>
21+
<dependency>
22+
<groupId>org.apache.kafka</groupId>
23+
<artifactId>kafka-streams</artifactId>
24+
<version>${kafka.version}</version>
25+
</dependency>
26+
<dependency>
27+
<groupId>org.slf4j</groupId>
28+
<artifactId>slf4j-api</artifactId>
29+
<version>${org.slf4j.version}</version>
30+
</dependency>
31+
<dependency>
32+
<groupId>org.slf4j</groupId>
33+
<artifactId>slf4j-log4j12</artifactId>
34+
<version>${org.slf4j.version}</version>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.apache.flink</groupId>
38+
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
39+
<version>${flink.version}</version>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.apache.flink</groupId>
43+
<artifactId>flink-streaming-java_2.11</artifactId>
44+
<version>${flink.version}</version>
45+
</dependency>
46+
<dependency>
47+
<groupId>org.apache.flink</groupId>
48+
<artifactId>flink-core</artifactId>
49+
<version>${flink.version}</version>
50+
<exclusions>
51+
<exclusion>
52+
<artifactId>commons-logging</artifactId>
53+
<groupId>commons-logging</groupId>
54+
</exclusion>
55+
</exclusions>
56+
</dependency>
57+
<dependency>
58+
<groupId>org.apache.flink</groupId>
59+
<artifactId>flink-java</artifactId>
60+
<version>${flink.version}</version>
61+
<exclusions>
62+
<exclusion>
63+
<artifactId>commons-logging</artifactId>
64+
<groupId>commons-logging</groupId>
65+
</exclusion>
66+
</exclusions>
67+
</dependency>
68+
<dependency>
69+
<groupId>org.apache.flink</groupId>
70+
<artifactId>flink-test-utils_2.11</artifactId>
71+
<version>${flink.version}</version>
72+
<scope>test</scope>
73+
</dependency>
74+
<dependency>
75+
<groupId>com.google.guava</groupId>
76+
<artifactId>guava</artifactId>
77+
<version>${guava.version}</version>
78+
</dependency>
79+
<dependency>
80+
<groupId>org.awaitility</groupId>
81+
<artifactId>awaitility</artifactId>
82+
<version>${awaitility.version}</version>
83+
<scope>test</scope>
84+
</dependency>
85+
<dependency>
86+
<groupId>org.awaitility</groupId>
87+
<artifactId>awaitility-proxy</artifactId>
88+
<version>${awaitility.version}</version>
89+
<scope>test</scope>
90+
</dependency>
91+
<dependency>
92+
<groupId>com.fasterxml.jackson.datatype</groupId>
93+
<artifactId>jackson-datatype-jsr310</artifactId>
94+
<version>${jackson.version}</version>
95+
</dependency>
96+
<dependency>
97+
<groupId>com.fasterxml.jackson.core</groupId>
98+
<artifactId>jackson-databind</artifactId>
99+
<version>${jackson.version}</version>
100+
</dependency>
101+
<dependency>
102+
<groupId>org.assertj</groupId>
103+
<artifactId>assertj-core</artifactId>
104+
<version>${assertj.version}</version>
105+
<scope>test</scope>
106+
</dependency>
107+
<dependency>
108+
<groupId>org.testcontainers</groupId>
109+
<artifactId>kafka</artifactId>
110+
<version>${testcontainers-kafka.version}</version>
111+
<scope>test</scope>
112+
</dependency>
113+
<dependency>
114+
<groupId>org.testcontainers</groupId>
115+
<artifactId>junit-jupiter</artifactId>
116+
<version>${testcontainers-jupiter.version}</version>
117+
<scope>test</scope>
118+
</dependency>
119+
<dependency>
120+
<groupId>org.apache.spark</groupId>
121+
<artifactId>spark-core_2.11</artifactId>
122+
<version>${org.apache.spark.spark-core.version}</version>
123+
<scope>provided</scope>
124+
</dependency>
125+
<dependency>
126+
<groupId>org.apache.spark</groupId>
127+
<artifactId>spark-sql_2.11</artifactId>
128+
<version>${org.apache.spark.spark-core.version}</version>
129+
<scope>provided</scope>
130+
</dependency>
131+
<dependency>
132+
<groupId>org.apache.spark</groupId>
133+
<artifactId>spark-graphx_2.11</artifactId>
134+
<version>${org.apache.spark.spark-core.version}</version>
135+
<scope>provided</scope>
136+
</dependency>
137+
<dependency>
138+
<groupId>org.apache.spark</groupId>
139+
<artifactId>spark-streaming_2.11</artifactId>
140+
<version>${org.apache.spark.spark-core.version}</version>
141+
<scope>provided</scope>
142+
</dependency>
143+
<dependency>
144+
<groupId>org.apache.spark</groupId>
145+
<artifactId>spark-mllib_2.11</artifactId>
146+
<version>${org.apache.spark.spark-core.version}</version>
147+
<scope>provided</scope>
148+
</dependency>
149+
<dependency>
150+
<groupId>org.apache.spark</groupId>
151+
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
152+
<version>${org.apache.spark.spark-core.version}</version>
153+
</dependency>
154+
<dependency>
155+
<groupId>com.datastax.spark</groupId>
156+
<artifactId>spark-cassandra-connector_2.11</artifactId>
157+
<version>${com.datastax.spark.spark-cassandra-connector.version}</version>
158+
</dependency>
159+
<dependency>
160+
<groupId>com.datastax.spark</groupId>
161+
<artifactId>spark-cassandra-connector-java_2.11</artifactId>
162+
<version>${com.datastax.spark.spark-cassandra-connector-java.version}</version>
163+
</dependency>
164+
</dependencies>
165+
166+
<properties>
167+
<assertj.version>3.6.2</assertj.version>
168+
<kafka.version>2.8.0</kafka.version>
169+
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
170+
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
171+
<flink.version>1.5.0</flink.version>
172+
<awaitility.version>3.0.0</awaitility.version>
173+
<guava.version>29.0-jre</guava.version>
174+
<org.apache.spark.spark-core.version>2.4.8</org.apache.spark.spark-core.version>
175+
<graphframes.version>0.8.1-spark3.0-s_2.12</graphframes.version>
176+
<com.datastax.spark.spark-cassandra-connector.version>2.5.2</com.datastax.spark.spark-cassandra-connector.version>
177+
<com.datastax.spark.spark-cassandra-connector-java.version>1.6.0-M1</com.datastax.spark.spark-cassandra-connector-java.version>
178+
</properties>
179+
180+
</project>
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.baeldung.flink;
2+
3+
import com.baeldung.flink.model.Backup;
4+
import com.baeldung.flink.model.InputMessage;
5+
import com.baeldung.flink.operator.BackupAggregator;
6+
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
7+
import com.baeldung.flink.operator.WordsCapitalizer;
8+
import org.apache.flink.streaming.api.TimeCharacteristic;
9+
import org.apache.flink.streaming.api.datastream.DataStream;
10+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
11+
import org.apache.flink.streaming.api.windowing.time.Time;
12+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
13+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
14+
15+
import static com.baeldung.flink.connector.Consumers.*;
16+
import static com.baeldung.flink.connector.Producers.*;
17+
18+
public class FlinkDataPipeline {
19+
20+
public static void capitalize() throws Exception {
21+
String inputTopic = "flink_input";
22+
String outputTopic = "flink_output";
23+
String consumerGroup = "baeldung";
24+
String address = "localhost:9092";
25+
26+
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
27+
28+
FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address, consumerGroup);
29+
flinkKafkaConsumer.setStartFromEarliest();
30+
31+
DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
32+
33+
FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
34+
35+
stringInputStream.map(new WordsCapitalizer())
36+
.addSink(flinkKafkaProducer);
37+
38+
environment.execute();
39+
}
40+
41+
public static void createBackup() throws Exception {
42+
String inputTopic = "flink_input";
43+
String outputTopic = "flink_output";
44+
String consumerGroup = "baeldung";
45+
String kafkaAddress = "localhost:9092";
46+
47+
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
48+
49+
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
50+
51+
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
52+
flinkKafkaConsumer.setStartFromEarliest();
53+
54+
flinkKafkaConsumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
55+
FlinkKafkaProducer011<Backup> flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress);
56+
57+
DataStream<InputMessage> inputMessagesStream = environment.addSource(flinkKafkaConsumer);
58+
59+
inputMessagesStream.timeWindowAll(Time.hours(24))
60+
.aggregate(new BackupAggregator())
61+
.addSink(flinkKafkaProducer);
62+
63+
environment.execute();
64+
}
65+
66+
public static void main(String[] args) throws Exception {
67+
createBackup();
68+
}
69+
70+
}

libraries-data-2/src/main/java/com/baeldung/flink/connector/Consumers.java renamed to apache-kafka/src/main/java/com/baeldung/flink/connector/Consumers.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,20 @@
99

1010
public class Consumers {
1111

12-
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
13-
String topic, String kafkaAddress, String kafkaGroup ) {
14-
Properties props = new Properties();
15-
props.setProperty("bootstrap.servers", kafkaAddress);
16-
props.setProperty("group.id",kafkaGroup);
17-
FlinkKafkaConsumer011<String> consumer =
18-
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
12+
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
13+
Properties props = new Properties();
14+
props.setProperty("bootstrap.servers", kafkaAddress);
15+
props.setProperty("group.id", kafkaGroup);
16+
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props);
1917

20-
return consumer;
21-
}
18+
return consumer;
19+
}
2220

23-
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) {
21+
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup) {
2422
Properties properties = new Properties();
2523
properties.setProperty("bootstrap.servers", kafkaAddress);
26-
properties.setProperty("group.id",kafkaGroup);
27-
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(
28-
topic, new InputMessageDeserializationSchema(),properties);
24+
properties.setProperty("group.id", kafkaGroup);
25+
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(topic, new InputMessageDeserializationSchema(), properties);
2926

3027
return consumer;
3128
}

libraries-data-2/src/main/java/com/baeldung/flink/connector/Producers.java renamed to apache-kafka/src/main/java/com/baeldung/flink/connector/Producers.java

File renamed without changes.

libraries-data-2/src/main/java/com/baeldung/flink/model/Backup.java renamed to apache-kafka/src/main/java/com/baeldung/flink/model/Backup.java

File renamed without changes.

libraries-data-2/src/main/java/com/baeldung/flink/model/InputMessage.java renamed to apache-kafka/src/main/java/com/baeldung/flink/model/InputMessage.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public InputMessage() {
1818
public String getSender() {
1919
return sender;
2020
}
21+
2122
public void setSender(String sender) {
2223
this.sender = sender;
2324
}
@@ -55,12 +56,14 @@ public InputMessage(String sender, String recipient, LocalDateTime sentAt, Strin
5556

5657
@Override
5758
public boolean equals(Object o) {
58-
if (this == o) return true;
59-
if (o == null || getClass() != o.getClass()) return false;
59+
if (this == o)
60+
return true;
61+
if (o == null || getClass() != o.getClass())
62+
return false;
6063
InputMessage message1 = (InputMessage) o;
61-
return Objects.equal(sender, message1.sender) &&
62-
Objects.equal(recipient, message1.recipient) &&
63-
Objects.equal(sentAt, message1.sentAt) &&
64+
return Objects.equal(sender, message1.sender) &&
65+
Objects.equal(recipient, message1.recipient) &&
66+
Objects.equal(sentAt, message1.sentAt) &&
6467
Objects.equal(message, message1.message);
6568
}
6669

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.baeldung.flink.operator;
2+
3+
import com.baeldung.flink.model.Backup;
4+
import com.baeldung.flink.model.InputMessage;
5+
import org.apache.flink.api.common.functions.AggregateFunction;
6+
7+
import java.time.LocalDateTime;
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
11+
public class BackupAggregator implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
12+
@Override
13+
public List<InputMessage> createAccumulator() {
14+
return new ArrayList<>();
15+
}
16+
17+
@Override
18+
public List<InputMessage> add(InputMessage inputMessage, List<InputMessage> inputMessages) {
19+
inputMessages.add(inputMessage);
20+
return inputMessages;
21+
}
22+
23+
@Override
24+
public Backup getResult(List<InputMessage> inputMessages) {
25+
Backup backup = new Backup(inputMessages, LocalDateTime.now());
26+
return backup;
27+
}
28+
29+
@Override
30+
public List<InputMessage> merge(List<InputMessage> inputMessages, List<InputMessage> acc1) {
31+
inputMessages.addAll(acc1);
32+
return inputMessages;
33+
}
34+
}

libraries-data-2/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java renamed to apache-kafka/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWate
1212
@Override
1313
public long extractTimestamp(InputMessage element, long previousElementTimestamp) {
1414
ZoneId zoneId = ZoneId.systemDefault();
15-
return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
15+
return element.getSentAt()
16+
.atZone(zoneId)
17+
.toEpochSecond() * 1000;
1618
}
1719

1820
@Nullable

0 commit comments

Comments
 (0)