|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | + * or more contributor license agreements. See the NOTICE file |
| 4 | + * distributed with this work for additional information |
| 5 | + * regarding copyright ownership. The ASF licenses this file |
| 6 | + * to you under the Apache License, Version 2.0 (the |
| 7 | + * "License"); you may not use this file except in compliance |
| 8 | + * with the License. You may obtain a copy of the License at |
| 9 | + * |
| 10 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + * |
| 12 | + * Unless required by applicable law or agreed to in writing, software |
| 13 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | + * See the License for the specific language governing permissions and |
| 16 | + * limitations under the License. |
| 17 | + */ |
| 18 | +package org.apache.beam.examples; |
| 19 | + |
| 20 | +import org.apache.beam.sdk.Pipeline; |
| 21 | +import org.apache.beam.sdk.io.TextIO; |
| 22 | +import org.apache.beam.sdk.options.Default; |
| 23 | +import org.apache.beam.sdk.options.Description; |
| 24 | +import org.apache.beam.sdk.options.PipelineOptions; |
| 25 | +import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| 26 | +import org.apache.beam.sdk.options.Validation.Required; |
| 27 | +import org.apache.beam.sdk.transforms.DoFn; |
| 28 | +import org.apache.beam.sdk.transforms.GroupByKey; |
| 29 | +import org.apache.beam.sdk.transforms.PTransform; |
| 30 | +import org.apache.beam.sdk.transforms.ParDo; |
| 31 | +import org.apache.beam.sdk.values.KV; |
| 32 | +import org.apache.beam.sdk.values.PCollection; |
| 33 | + |
| 34 | +/** |
| 35 | + * A naive simulation of the Variant Transforms pipeline. |
| 36 | + * Modified from the WordCount example of Beam Java SDK. |
| 37 | + */ |
| 38 | +public class CallMerger { |
| 39 | + |
| 40 | + /** |
| 41 | + * This DoFn filters out lines starting with "##" and create a key for others. |
| 42 | + */ |
| 43 | + static class FilterOrKeyDoFn extends DoFn<String, KV<String, String>> { |
| 44 | + |
| 45 | + @ProcessElement |
| 46 | + public void processElement(ProcessContext c) { |
| 47 | + if (c.element().startsWith("##")) return; |
| 48 | + // Split the line into words. |
| 49 | + String[] words = c.element().trim().split("\\s+"); |
| 50 | + if (words.length < 5) return; |
| 51 | + StringBuilder b = new StringBuilder(); |
| 52 | + b.append(words[0]).append(":"); |
| 53 | + b.append(words[1]).append(":"); |
| 54 | + b.append(words[2]).append(":"); |
| 55 | + b.append(words[3]).append(":"); |
| 56 | + b.append(words[4]); |
| 57 | + String key = b.toString(); |
| 58 | + |
| 59 | + c.output(KV.of(key, c.element())); |
| 60 | + } |
| 61 | + } |
| 62 | + |
| 63 | + /** |
| 64 | + * This DoFn merges lines with the same key by taking the first full line and |
| 65 | + * adding the last word of other lines (e.g., simulating merging samples). |
| 66 | + */ |
| 67 | + static class MergeDoFn extends DoFn<KV<String, Iterable<String>>, String> { |
| 68 | + |
| 69 | + @ProcessElement |
| 70 | + public void processElement(ProcessContext c) { |
| 71 | + boolean first = true; |
| 72 | + StringBuilder b = new StringBuilder(); |
| 73 | + for (String line : c.element().getValue()) { |
| 74 | + if (first) { |
| 75 | + b.append(line); |
| 76 | + first = false; |
| 77 | + } else { |
| 78 | + String[] words = line.split("\\s+"); |
| 79 | + // This section is added for making this a CPU intensive DoFn. |
| 80 | + int s = 1; |
| 81 | + for (int i = 0; i < 10000; i++) { |
| 82 | + for (String word : words) { |
| 83 | + s = (s * (i + word.length()) + 1) % 1000; |
| 84 | + } |
| 85 | + } |
| 86 | + // End of dummy CPU intensive part. |
| 87 | + if (words.length > 0) { |
| 88 | + b.append("\t").append(words[words.length - 1]).append("\t").append(s); |
| 89 | + } |
| 90 | + } |
| 91 | + } |
| 92 | + c.output(b.toString()); |
| 93 | + } |
| 94 | + } |
| 95 | + |
| 96 | + /** |
| 97 | + * A PTransform that converts a PCollection containing lines of text into a |
| 98 | + * PCollection of merged lines based on line keys. |
| 99 | + */ |
| 100 | + public static class MergeCalls extends PTransform<PCollection<String>, |
| 101 | + PCollection<String>> { |
| 102 | + @Override |
| 103 | + public PCollection<String> expand(PCollection<String> lines) { |
| 104 | + |
| 105 | + // Convert lines of text into individual words. |
| 106 | + PCollection<KV<String, String>> keyLines = lines.apply( |
| 107 | + ParDo.of(new FilterOrKeyDoFn())); |
| 108 | + |
| 109 | + PCollection<KV<String, Iterable<String>>> groupedLines = |
| 110 | + keyLines.apply(GroupByKey.<String, String>create()); |
| 111 | + |
| 112 | + PCollection<String> mergedLines = groupedLines.apply( |
| 113 | + ParDo.of(new MergeDoFn())); |
| 114 | + return mergedLines; |
| 115 | + } |
| 116 | + } |
| 117 | + |
| 118 | + /** |
| 119 | + * Options supported by {@link CallMerger}. |
| 120 | + */ |
| 121 | + public interface CallMergerOptions extends PipelineOptions { |
| 122 | + |
| 123 | + /** |
| 124 | + * By default, this example reads from a public dataset containing the text |
| 125 | + * of King Lear. Set this option to choose a different input file or glob. |
| 126 | + */ |
| 127 | + @Description("Path of the file to read from") |
| 128 | + @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt") |
| 129 | + String getInputFile(); |
| 130 | + void setInputFile(String value); |
| 131 | + |
| 132 | + /** |
| 133 | + * Set this required option to specify where to write the output. |
| 134 | + */ |
| 135 | + @Description("Path of the file to write to") |
| 136 | + @Required |
| 137 | + String getOutput(); |
| 138 | + void setOutput(String value); |
| 139 | + } |
| 140 | + |
| 141 | + public static void main(String[] args) { |
| 142 | + CallMergerOptions options = PipelineOptionsFactory.fromArgs(args) |
| 143 | + .withValidation().as(CallMergerOptions.class); |
| 144 | + Pipeline p = Pipeline.create(options); |
| 145 | + |
| 146 | + p.apply("ReadLines", TextIO.read().from(options.getInputFile())) |
| 147 | + .apply(new MergeCalls()) |
| 148 | + .apply("WriteCounts", TextIO.write().to(options.getOutput())); |
| 149 | + |
| 150 | + p.run().waitUntilFinish(); |
| 151 | + } |
| 152 | +} |
0 commit comments