Skip to content

Commit 2b92640

Browse files
committed
feat: 更新示例
1 parent 95ed70e commit 2b92640

8 files changed

Lines changed: 635 additions & 0 deletions

File tree

codes/flink/pom.xml

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<parent>
7+
<groupId>org.springframework.boot</groupId>
8+
<artifactId>spring-boot-starter-parent</artifactId>
9+
<version>2.6.3</version>
10+
</parent>
11+
12+
<groupId>io.github.dunwu.bigdata</groupId>
13+
<artifactId>flink</artifactId>
14+
<name>大数据 - Flink</name>
15+
<version>1.0.0</version>
16+
<packaging>jar</packaging>
17+
18+
<properties>
19+
<flink.version>1.14.3</flink.version>
20+
</properties>
21+
22+
<dependencies>
23+
<dependency>
24+
<groupId>org.apache.flink</groupId>
25+
<artifactId>flink-java</artifactId>
26+
<version>${flink.version}</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.apache.flink</groupId>
30+
<artifactId>flink-core</artifactId>
31+
<version>${flink.version}</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>org.apache.flink</groupId>
35+
<artifactId>flink-clients_2.12</artifactId>
36+
<version>${flink.version}</version>
37+
</dependency>
38+
<dependency>
39+
<groupId>org.apache.flink</groupId>
40+
<artifactId>flink-streaming-java_2.12</artifactId>
41+
<version>${flink.version}</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.apache.flink</groupId>
45+
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
46+
<version>${flink.version}</version>
47+
</dependency>
48+
<dependency>
49+
<groupId>org.apache.flink</groupId>
50+
<artifactId>flink-table-planner_2.12</artifactId>
51+
<version>${flink.version}</version>
52+
</dependency>
53+
<dependency>
54+
<groupId>org.apache.flink</groupId>
55+
<artifactId>flink-test-utils_2.12</artifactId>
56+
<version>${flink.version}</version>
57+
<scope>test</scope>
58+
</dependency>
59+
60+
<dependency>
61+
<groupId>org.springframework.boot</groupId>
62+
<artifactId>spring-boot-starter</artifactId>
63+
</dependency>
64+
<dependency>
65+
<groupId>org.springframework.boot</groupId>
66+
<artifactId>spring-boot-starter-test</artifactId>
67+
<scope>test</scope>
68+
</dependency>
69+
</dependencies>
70+
</project>
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.github.dunwu.bigdata.flink;
2+
3+
import org.apache.flink.api.common.functions.FlatMapFunction;
4+
import org.apache.flink.api.java.tuple.Tuple2;
5+
import org.apache.flink.util.Collector;
6+
7+
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
8+
9+
@Override
10+
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
11+
// normalize and split the line into words
12+
String[] tokens = value.toLowerCase().split("\\W+");
13+
14+
// emit the pairs
15+
for (String token : tokens) {
16+
if (token.length() > 0) {
17+
out.collect(new Tuple2<String, Integer>(token, 1));
18+
}
19+
}
20+
}
21+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.github.dunwu.bigdata.flink;
2+
3+
import org.apache.flink.api.java.DataSet;
4+
import org.apache.flink.api.java.ExecutionEnvironment;
5+
import org.apache.flink.api.java.aggregation.Aggregations;
6+
import org.apache.flink.api.java.tuple.Tuple2;
7+
8+
public class WordCount {
9+
10+
public static void main(String[] args) throws Exception {
11+
12+
// 设置运行环境
13+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
14+
15+
// 配置数据源
16+
// you can also use env.readTextFile(...) to get words
17+
DataSet<String> text = env.fromElements("To be, or not to be,--that is the question:--",
18+
"Whether 'tis nobler in the mind to suffer",
19+
"The slings and arrows of outrageous fortune",
20+
"Or to take arms against a sea of troubles,");
21+
22+
// 进行一系列转换
23+
DataSet<Tuple2<String, Integer>> counts =
24+
// split up the lines in pairs (2-tuples) containing: (word,1)
25+
text.flatMap(new LineSplitter())
26+
// group by the tuple field "0" and sum up tuple field "1"
27+
.groupBy(0).aggregate(Aggregations.SUM, 1);
28+
29+
// emit result
30+
counts.print();
31+
}
32+
33+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package io.github.dunwu.bigdata.flink;
2+
3+
import org.apache.flink.api.java.tuple.Tuple2;
4+
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
5+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
6+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
7+
import org.apache.flink.util.Collector;
8+
9+
public class WordCountStreaming {
10+
11+
public static void main(String[] args) throws Exception {
12+
13+
// 设置运行环境
14+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
15+
16+
// 配置数据源
17+
DataStreamSource<String> source = env.fromElements("To be, or not to be,--that is the question:--",
18+
"Whether 'tis nobler in the mind to suffer",
19+
"The slings and arrows of outrageous fortune",
20+
"Or to take arms against a sea of troubles");
21+
22+
// 进行一系列转换
23+
source
24+
// split up the lines in pairs (2-tuples) containing: (word,1)
25+
.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
26+
// emit the pairs
27+
for (String token : value.toLowerCase().split("\\W+")) {
28+
if (token.length() > 0) {
29+
out.collect(new Tuple2<>(token, 1));
30+
}
31+
}
32+
})
33+
// due to type erasure, we need to specify the return type
34+
.returns(TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class))
35+
// group by the tuple field "0"
36+
.keyBy(0)
37+
// sum up tuple on field "1"
38+
.sum(1)
39+
// print the result
40+
.print();
41+
42+
// 提交执行
43+
env.execute();
44+
}
45+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package io.github.dunwu.bigdata.flink.streaming.socket;
20+
21+
import org.apache.flink.api.common.functions.FlatMapFunction;
22+
import org.apache.flink.api.common.functions.ReduceFunction;
23+
import org.apache.flink.api.java.utils.ParameterTool;
24+
import org.apache.flink.streaming.api.datastream.DataStream;
25+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
27+
import org.apache.flink.streaming.api.windowing.time.Time;
28+
import org.apache.flink.util.Collector;
29+
30+
/**
31+
* Implements a streaming windowed version of the "WordCount" program.
32+
*
33+
* <p>This program connects to a server socket and reads strings from the socket. The easiest way to
34+
* try this out is to open a text server (at port 12345) using the <i>netcat</i> tool via
35+
*
36+
* <pre>
37+
* nc -l 12345 on Linux or nc -l -p 12345 on Windows
38+
* </pre>
39+
*
40+
* <p>and run this example with the hostname and the port as arguments.
41+
*/
42+
@SuppressWarnings("serial")
43+
public class SocketWindowWordCount {
44+
45+
public static void main(String[] args) throws Exception {
46+
47+
// the host and the port to connect to
48+
final String hostname;
49+
final int port;
50+
try {
51+
final ParameterTool params = ParameterTool.fromArgs(args);
52+
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
53+
port = params.has("port") ? params.getInt("port") : 9000;
54+
} catch (Exception e) {
55+
System.err.println(
56+
"No port specified. Please run 'SocketWindowWordCount "
57+
+ "--hostname <hostname> --port <port>', where hostname (localhost by default) "
58+
+ "and port is the address of the text server");
59+
System.err.println(
60+
"To start a simple text server, run 'netcat -l <port>' and "
61+
+ "type the input text into the command line");
62+
return;
63+
}
64+
65+
// get the execution environment
66+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
67+
68+
// get input data by connecting to the socket
69+
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
70+
71+
// parse the data, group it, window it, and aggregate the counts
72+
DataStream<WordWithCount> windowCounts =
73+
text.flatMap(
74+
new FlatMapFunction<String, WordWithCount>() {
75+
@Override
76+
public void flatMap(
77+
String value, Collector<WordWithCount> out) {
78+
for (String word : value.split("\\s")) {
79+
out.collect(new WordWithCount(word, 1L));
80+
}
81+
}
82+
})
83+
.keyBy(value -> value.word)
84+
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
85+
.reduce(
86+
new ReduceFunction<WordWithCount>() {
87+
@Override
88+
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
89+
return new WordWithCount(a.word, a.count + b.count);
90+
}
91+
});
92+
93+
// print the results with a single thread, rather than in parallel
94+
windowCounts.print().setParallelism(1);
95+
96+
env.execute("Socket Window WordCount");
97+
}
98+
99+
// ------------------------------------------------------------------------
100+
101+
/** Data type for words with count. */
102+
public static class WordWithCount {
103+
104+
public String word;
105+
public long count;
106+
107+
public WordWithCount() {}
108+
109+
public WordWithCount(String word, long count) {
110+
this.word = word;
111+
this.count = count;
112+
}
113+
114+
@Override
115+
public String toString() {
116+
return word + " : " + count;
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)