Skip to content

Commit 94b1abc

Browse files
committed
feat: add flink
1 parent 97604e9 commit 94b1abc

File tree

9 files changed

+249
-1
lines changed

9 files changed

+249
-1
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Spring-Security/springsecurity.iml
1313
rocketmqdemo/rocketmqdemo.iml
1414

1515
# target
16+
Flink/target
1617
JdkLearn/target
1718
Spring/target
1819
Spring-AOP/target

Flink/pom.xml

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
3+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>org.example</groupId>
8+
<artifactId>Flink</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<name>Flink</name>
12+
<!-- FIXME change it to the project's website -->
13+
<url>http://www.example.com</url>
14+
15+
<properties>
16+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
17+
<maven.compiler.source>11</maven.compiler.source>
18+
<maven.compiler.target>11</maven.compiler.target>
19+
<java.version>11</java.version>
20+
</properties>
21+
22+
<dependencies>
23+
<dependency>
24+
<groupId>org.slf4j</groupId>
25+
<artifactId>slf4j-api</artifactId>
26+
<version>1.7.28</version>
27+
</dependency>
28+
29+
<dependency>
30+
<groupId>junit</groupId>
31+
<artifactId>junit</artifactId>
32+
<version>4.11</version>
33+
<scope>test</scope>
34+
</dependency>
35+
36+
<dependency>
37+
<groupId>org.apache.flink</groupId>
38+
<artifactId>flink-clients_2.12</artifactId>
39+
<version>1.13.6</version>
40+
</dependency>
41+
</dependencies>
42+
43+
<build>
44+
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
45+
<plugins>
46+
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
47+
<plugin>
48+
<artifactId>maven-clean-plugin</artifactId>
49+
<version>3.1.0</version>
50+
</plugin>
51+
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
52+
<plugin>
53+
<artifactId>maven-resources-plugin</artifactId>
54+
<version>3.0.2</version>
55+
</plugin>
56+
<plugin>
57+
<artifactId>maven-compiler-plugin</artifactId>
58+
<version>3.8.0</version>
59+
</plugin>
60+
<plugin>
61+
<artifactId>maven-surefire-plugin</artifactId>
62+
<version>2.22.1</version>
63+
</plugin>
64+
<plugin>
65+
<artifactId>maven-jar-plugin</artifactId>
66+
<version>3.0.2</version>
67+
</plugin>
68+
<plugin>
69+
<artifactId>maven-install-plugin</artifactId>
70+
<version>2.5.2</version>
71+
</plugin>
72+
<plugin>
73+
<artifactId>maven-deploy-plugin</artifactId>
74+
<version>2.8.2</version>
75+
</plugin>
76+
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
77+
<plugin>
78+
<artifactId>maven-site-plugin</artifactId>
79+
<version>3.7.1</version>
80+
</plugin>
81+
<plugin>
82+
<artifactId>maven-project-info-reports-plugin</artifactId>
83+
<version>3.0.0</version>
84+
</plugin>
85+
</plugins>
86+
</pluginManagement>
87+
</build>
88+
</project>
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.example;
2+
3+
import org.apache.flink.api.java.ExecutionEnvironment;
4+
import org.apache.flink.api.java.operators.DataSource;
5+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
6+
7+
import java.util.Collections;
8+
9+
/**
10+
* Hello world!
11+
*
12+
*/
13+
public class App
14+
{
15+
public static void main( String[] args ) throws Exception {
16+
//var logger = LoggerFactory.getLogger(App.class);
17+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
18+
DataSource<String> dataSource = env.fromCollection(Collections.singletonList("Hello Word"));
19+
dataSource.print();
20+
System.out.println( "Hello World!" );
21+
//logger.info("hello flink");
22+
}
23+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package org.example;
2+
3+
import org.apache.flink.api.common.functions.FlatMapFunction;
4+
import org.apache.flink.api.java.ExecutionEnvironment;
5+
import org.apache.flink.api.java.operators.DataSource;
6+
import org.apache.flink.api.java.tuple.Tuple;
7+
import org.apache.flink.api.java.tuple.Tuple2;
8+
9+
import org.apache.flink.util.Collector;
10+
11+
// 离线批处理
12+
public class BatchWordCount {
13+
public static void main(String[] args) throws Exception{
14+
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
15+
final String fileName = "F:\\java\\JavaSourceCodeLearning\\Flink\\src\\main\\java\\org\\example\\word-count.txt";
16+
DataSource<String> dataSource = environment.readTextFile(fileName);
17+
dataSource
18+
.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
19+
public void flatMap(String s, Collector<Tuple2<String,Integer>> collector) throws Exception{
20+
//对读取到的每一行数据按照空格分割
21+
String[] split = s.split(" ");
22+
//将每个单词放入collector中作为输出,格式类似于{word:1}
23+
for (String word : split) {
24+
collector.collect(new Tuple2<String, Integer>(word, 1));
25+
}
26+
}
27+
})
28+
.groupBy(0)
29+
.sum(1)
30+
.print();
31+
//environment.execute();
32+
}
33+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.example;
2+
3+
import org.apache.flink.api.common.functions.FlatMapFunction;
4+
import org.apache.flink.api.java.ExecutionEnvironment;
5+
import org.apache.flink.api.java.operators.DataSource;
6+
import org.apache.flink.api.java.tuple.Tuple;
7+
import org.apache.flink.api.java.tuple.Tuple2;
8+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
9+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
10+
import org.apache.flink.util.Collector;
11+
12+
public class SocketStreamWordCount {
13+
public static void main(String[] args) throws Exception {
14+
//获取Flink批处理执行环境
15+
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
16+
17+
final String host = "localhost";
18+
final int port = 8000;
19+
//从socket中获取数据源
20+
DataStreamSource<String> source = environment.socketTextStream(host, port);
21+
//单词计数
22+
source
23+
//将一行句子按照空格拆分,输入一个字符串,输出一个2元组,key为一个单词,value为1
24+
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
25+
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
26+
//对读取到的每一行数据按照空格分割
27+
String[] split = s.split(" ");
28+
//将每个单词放入collector中作为输出,格式类似于{word:1}
29+
for (String word : split) {
30+
collector.collect(new Tuple2<String, Integer>(word, 1));
31+
}
32+
}
33+
})
34+
//聚合算子,按照第一个字段(即word字段)进行分组
35+
.keyBy(v -> v.f0)
36+
//聚合算子,对每一个分租内的数据按照第二个字段进行求和
37+
.sum(1)
38+
.print();
39+
40+
environment.execute();
41+
}
42+
}
43+
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.example;
2+
3+
import org.apache.flink.api.common.functions.FlatMapFunction;
4+
import org.apache.flink.api.java.tuple.Tuple2;
5+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
6+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
7+
import org.apache.flink.util.Collector;
8+
9+
public class StreamWordCount {
10+
public static void main(String[] args)throws Exception{
11+
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
12+
13+
//从文件中获取数据源
14+
final String fileName = "F:\\java\\JavaSourceCodeLearning\\Flink\\src\\main\\java\\org\\example\\word-count.txt";
15+
DataStreamSource<String> source = environment.readTextFile(fileName);
16+
//单词计数
17+
source
18+
//将一行句子按照空格拆分,输入一个字符串,输出一个2元组,key为一个单词,value为1
19+
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
20+
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
21+
//对读取到的每一行数据按照空格分割
22+
String[] split = s.split(" ");
23+
//将每个单词放入collector中作为输出,格式类似于{word:1}
24+
for (String word : split) {
25+
collector.collect(new Tuple2<String, Integer>(word, 1));
26+
}
27+
}
28+
})
29+
//聚合算子,按照第一个字段(即word字段)进行分组
30+
.keyBy(v -> v.f0)
31+
//聚合算子,对每一个分租内的数据按照第二个字段进行求和
32+
.sum(1)
33+
.print();
34+
35+
environment.execute();
36+
}
37+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
hello word
2+
hello flink
3+
hello java
4+
java is best
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package org.example;
2+
3+
import static org.junit.Assert.assertTrue;
4+
5+
import org.junit.Test;
6+
7+
/**
8+
* Unit test for simple App.
9+
*/
10+
public class AppTest
11+
{
12+
/**
13+
* Rigorous Test :-)
14+
*/
15+
@Test
16+
public void shouldAnswerWithTrue()
17+
{
18+
assertTrue( true );
19+
}
20+
}

JdkLearn/src/main/java/com/learnjava/lambda/LambdaComparatorDemo.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import lombok.AllArgsConstructor;
44
import lombok.Data;
5-
import sun.java2d.pipe.SpanShapeRenderer;
65

76
import java.text.SimpleDateFormat;
87
import java.util.Arrays;

0 commit comments

Comments
 (0)