Skip to content

Commit 86eee4e

Browse files
author
James Lee
committed
add StackOverFlowSurveyFollowUp
1 parent 35b15dc commit 86eee4e

2 files changed

Lines changed: 52 additions & 1 deletion

File tree

src/main/java/com/sparkTutorial/advanced/accumulator/StackOverFlowSurvey.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import org.apache.spark.SparkContext;
55
import org.apache.spark.api.java.JavaRDD;
66
import org.apache.spark.api.java.JavaSparkContext;
7+
import org.apache.spark.util.AccumulatorV2;
78
import org.apache.spark.util.LongAccumulator;
89
import scala.Option;
910

@@ -23,7 +24,6 @@ public static void main(String[] args) throws Exception {
2324
total.register(sparkContext, Option.apply("total"), false);
2425
missingSalaryMidPoint.register(sparkContext, Option.apply("missing salary middle point"), false);
2526

26-
2727
JavaRDD<String> responseRDD = javaSparkContext.textFile("in/2016-stack-overflow-survey-responses.csv");
2828

2929
JavaRDD<String> responseFromCanada = responseRDD.filter(response -> {
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.sparkTutorial.advanced.accumulator;
2+
3+
import org.apache.spark.SparkConf;
4+
import org.apache.spark.SparkContext;
5+
import org.apache.spark.api.java.JavaRDD;
6+
import org.apache.spark.api.java.JavaSparkContext;
7+
import org.apache.spark.util.LongAccumulator;
8+
import scala.Option;
9+
10+
public class StackOverFlowSurveyFollowUp {
11+
12+
public static void main(String[] args) throws Exception {
13+
14+
SparkConf conf = new SparkConf().setAppName("StackOverFlowSurvey").setMaster("local[1]");
15+
16+
SparkContext sparkContext = new SparkContext(conf);
17+
18+
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
19+
20+
final LongAccumulator total = new LongAccumulator();
21+
final LongAccumulator missingSalaryMidPoint = new LongAccumulator();
22+
final LongAccumulator processedBytes = new LongAccumulator();
23+
24+
total.register(sparkContext, Option.apply("total"), false);
25+
missingSalaryMidPoint.register(sparkContext, Option.apply("missing salary middle point"), false);
26+
processedBytes.register(sparkContext, Option.apply("Processed bytes"), true);
27+
28+
JavaRDD<String> responseRDD = javaSparkContext.textFile("in/2016-stack-overflow-survey-responses.csv");
29+
30+
JavaRDD<String> responseFromCanada = responseRDD.filter(response -> {
31+
32+
processedBytes.add(response.getBytes().length);
33+
34+
String[] splits = response.split(",", -1);
35+
36+
total.add(1);
37+
38+
if (splits[14].equals("")) {
39+
missingSalaryMidPoint.add(1);
40+
}
41+
42+
return splits[2].equals("Canada");
43+
44+
});
45+
46+
System.out.println("Count of responses from Canada: " + responseFromCanada.count());
47+
System.out.println("Number of bytes processed: " + processedBytes.value());
48+
System.out.println("Total count of responses: " + total.value());
49+
System.out.println("Count of responses missing salary middle point: " + missingSalaryMidPoint.value());
50+
}
51+
}

0 commit comments

Comments
 (0)