Skip to content

Commit f53e2c3

Browse files
committed
Kafka datastream.
1 parent e336cd6 commit f53e2c3

3 files changed

Lines changed: 133 additions & 0 deletions

File tree

data_streams/pom.xml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>ds</groupId>
8+
<artifactId>ds</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<dependencies>
12+
<dependency>
13+
<groupId>org.apache.flink</groupId>
14+
<artifactId>flink-connector-kafka_2.12</artifactId>
15+
<version>1.8.0</version>
16+
</dependency>
17+
<dependency>
18+
<groupId>com.google.code.gson</groupId>
19+
<artifactId>gson</artifactId>
20+
<version>2.8.5</version>
21+
</dependency>
22+
<dependency>
23+
<groupId>org.apache.flink</groupId>
24+
<artifactId>flink-java</artifactId>
25+
<version>1.8.0</version>
26+
</dependency>
27+
<dependency>
28+
<groupId>org.apache.flink</groupId>
29+
<artifactId>flink-streaming-java_2.12</artifactId>
30+
<version>1.8.0</version>
31+
<scope>provided</scope>
32+
</dependency>
33+
<dependency>
34+
<groupId>org.apache.flink</groupId>
35+
<artifactId>flink-clients_2.12</artifactId>
36+
<version>1.8.0</version>
37+
</dependency>
38+
</dependencies>
39+
</project>
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package tutorial;
2+
3+
/*
4+
Consume Kafka data stream using Flink.
5+
*/
6+
7+
import com.google.gson.Gson;
8+
import org.apache.flink.api.common.functions.FlatMapFunction;
9+
import org.apache.flink.api.common.serialization.SimpleStringSchema;
10+
import org.apache.flink.api.java.tuple.Tuple2;
11+
import org.apache.flink.api.java.utils.ParameterTool;
12+
import org.apache.flink.streaming.api.datastream.DataStream;
13+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
14+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
15+
import org.apache.flink.util.Collector;
16+
import java.util.ArrayList;
17+
import java.util.Properties;
18+
19+
public class KafkaStream {
20+
21+
// used to parse JSON
22+
final static Gson gson = new Gson();
23+
24+
public static void main(String[] args) throws Exception {
25+
26+
// returns the execution environment (the context 'Local or Remote' in which a program is executed)
27+
// LocalEnvironment will cause execution in the current JVM
28+
// RemoteEnvironment will cause execution on a remote setup
29+
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
30+
31+
// provides utility methods for reading and parsing the program arguments
32+
// in this tutorial we will have to provide the input file and the output file as arguments
33+
final ParameterTool parameters = ParameterTool.fromArgs(args);
34+
35+
// register parameters globally so it can be available for each node in the cluster
36+
environment.getConfig().setGlobalJobParameters(parameters);
37+
38+
// set properties for kafka
39+
Properties properties = new Properties();
40+
properties.setProperty("bootstrap.servers", "localhost:9092"); // IP address where Kafka is running
41+
42+
// pull datastreams from kafka to flink's datastream
43+
// must specify topic name, deserializer, properties
44+
DataStream<String> kafkaData = environment.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));
45+
46+
// keyword count from stream and saves to textfile
47+
DataStream<Tuple2<String, Integer>> result = kafkaData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
48+
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
49+
50+
// convert each line/json object to Publication
51+
Publication publication = gson.fromJson(value, Publication.class);
52+
53+
// get all keywords
54+
ArrayList<String> keywords = publication.getKeywords();
55+
56+
if(keywords == null){
57+
return;
58+
}
59+
60+
for (String keyword : keywords) {
61+
out.collect(new Tuple2<String, Integer>(keyword, 1));
62+
}
63+
}
64+
});
65+
66+
result.keyBy(0).sum(1).writeAsText(parameters.get("output"));
67+
environment.execute("Kafka stream keyword count");
68+
}
69+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package tutorial;
2+
3+
import java.util.ArrayList;
4+
5+
public class Publication {
6+
7+
private String title;
8+
private ArrayList<String> keywords;
9+
10+
public String getTitle() {
11+
return title;
12+
}
13+
14+
public void setTitle(String title) {
15+
this.title = title;
16+
}
17+
18+
public ArrayList<String> getKeywords() {
19+
return keywords;
20+
}
21+
22+
public void setKeywords(ArrayList<String> keywords) {
23+
this.keywords = keywords;
24+
}
25+
}

0 commit comments

Comments
 (0)