1- /*
2- * Licensed to the Apache Software Foundation (ASF) under one
3- * or more contributor license agreements. See the NOTICE file
4- * distributed with this work for additional information
5- * regarding copyright ownership. The ASF licenses this file
6- * to you under the Apache License, Version 2.0 (the
7- * "License"); you may not use this file except in compliance
8- * with the License. You may obtain a copy of the License at
9- *
10- * http://www.apache.org/licenses/LICENSE-2.0
11- *
12- * Unless required by applicable law or agreed to in writing, software
13- * distributed under the License is distributed on an "AS IS" BASIS,
14- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15- * See the License for the specific language governing permissions and
16- * limitations under the License.
17- */
18- package org .apache .beam .samples ;
1+ // Copyright 2020 Google Inc.
2+ //
3+ // Licensed under the Apache License, Version 2.0 (the "License");
4+ // you may not use this file except in compliance with the License.
5+ // You may obtain a copy of the License at
6+ //
7+ // http://www.apache.org/licenses/LICENSE-2.0
8+ //
9+ // Unless required by applicable law or agreed to in writing, software
10+ // distributed under the License is distributed on an "AS IS" BASIS,
11+ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+ // See the License for the specific language governing permissions and
13+ // limitations under the License.
1914
20- import java . util . Arrays ;
15+ package org . apache . beam . samples ;
2116
2217import com .google .api .services .bigquery .model .TableFieldSchema ;
2318import com .google .api .services .bigquery .model .TableRow ;
2419import com .google .api .services .bigquery .model .TableSchema ;
2520import com .google .gson .Gson ;
2621import com .google .pubsub .v1 .ProjectSubscriptionName ;
27-
22+ import java . util . Arrays ;
2823import org .apache .avro .reflect .Nullable ;
2924import org .apache .beam .sdk .Pipeline ;
3025import org .apache .beam .sdk .coders .AvroCoder ;
3126import org .apache .beam .sdk .coders .DefaultCoder ;
3227import org .apache .beam .sdk .extensions .gcp .options .GcpOptions ;
3328import org .apache .beam .sdk .extensions .sql .SqlTransform ;
29+ import org .apache .beam .sdk .io .gcp .bigquery .BigQueryIO ;
3430import org .apache .beam .sdk .io .gcp .bigquery .BigQueryIO .Write .CreateDisposition ;
3531import org .apache .beam .sdk .io .gcp .bigquery .BigQueryIO .Write .WriteDisposition ;
36- import org .apache .beam .sdk .io .gcp .bigquery .BigQueryIO ;
3732import org .apache .beam .sdk .io .gcp .pubsub .PubsubIO ;
3833import org .apache .beam .sdk .options .Default ;
3934import org .apache .beam .sdk .options .Description ;
4035import org .apache .beam .sdk .options .PipelineOptionsFactory ;
4136import org .apache .beam .sdk .options .StreamingOptions ;
4237import org .apache .beam .sdk .options .Validation ;
4338import org .apache .beam .sdk .schemas .Schema ;
44- import org .apache .beam .sdk .transforms .Create ;
4539import org .apache .beam .sdk .transforms .MapElements ;
4640import org .apache .beam .sdk .transforms .WithTimestamps ;
4741import org .apache .beam .sdk .transforms .windowing .FixedWindows ;
5751 * An Apache Beam streaming pipeline that reads JSON encoded messages fromPub/Sub,
5852 * uses Beam SQL to transform the message data, and writes the results to a BigQuery.
5953 */
60- public class StreamingBeamSQL {
61- private static final Logger LOG = LoggerFactory .getLogger (StreamingBeamSQL .class );
54+ public class StreamingBeamSql {
55+ private static final Logger LOG = LoggerFactory .getLogger (StreamingBeamSql .class );
6256 private static final Gson GSON = new Gson ();
6357
6458 public interface Options extends StreamingOptions {
6559 @ Description ("Pub/Sub subscription to read from." )
6660 @ Validation .Required
6761 String getInputSubscription ();
62+
6863 void setInputSubscription (String value );
6964
70- @ Description ("BigQuery table to write to, in the form 'project:dataset.table' or 'dataset.table'." )
65+ @ Description ("BigQuery table to write to, in the form "
66+ + "'project:dataset.table' or 'dataset.table'." )
7167 @ Default .String ("beam_samples.streaming_beam_sql" )
7268 String getOutputTable ();
69+
7370 void setOutputTable (String value );
7471 }
7572
7673 @ DefaultCoder (AvroCoder .class )
7774 private static class PageReviewMessage {
78- @ Nullable String url ;
79- @ Nullable String review ;
75+ @ Nullable
76+ String url ;
77+ @ Nullable
78+ String review ;
8079 }
8180
8281 public static void main (final String [] args ) {
8382 Options options = PipelineOptionsFactory .fromArgs (args ).withValidation ().as (Options .class );
8483 options .setStreaming (true );
8584
8685 var project = options .as (GcpOptions .class ).getProject ();
87- var subscription = ProjectSubscriptionName .of (project , options .getInputSubscription ()).toString ();
86+ var subscription = ProjectSubscriptionName
87+ .of (project , options .getInputSubscription ()).toString ();
8888
8989 var schema = Schema .builder ()
9090 .addStringField ("url" )
@@ -96,47 +96,50 @@ public static void main(final String[] args) {
9696 pipeline
9797 // Read, parse, and validate messages from Pub/Sub.
9898 .apply ("Read messages from Pub/Sub" , PubsubIO .readStrings ().fromSubscription (subscription ))
99- .apply ("Parse JSON into SQL rows" , MapElements .into (TypeDescriptor .of (Row .class )).via (message -> {
100- // This is a good place to add error handling.
101- // The first transform should act as a validation layer to make sure
102- // that any data coming to the processing pipeline must be valid.
103- // See `MapElements.MapWithFailures` for more details.
104- LOG .info ("message: {}" , message );
105- var msg = GSON .fromJson (message , PageReviewMessage .class );
106- return Row .withSchema (schema ).addValues (
107- msg .url , // row url
108- msg .review .equals ("positive" ) ? 1.0 : 0.0 , // row page_score
109- new Instant () // row processing_time
110- ).build ();
111- })).setRowSchema (schema ) // make sure to set the row schema for the PCollection
99+ .apply ("Parse JSON into SQL rows" , MapElements .into (TypeDescriptor .of (Row .class ))
100+ .via (message -> {
101+ // This is a good place to add error handling.
102+ // The first transform should act as a validation layer to make sure
103+ // that any data coming to the processing pipeline must be valid.
104+ // See `MapElements.MapWithFailures` for more details.
105+ LOG .info ("message: {}" , message );
106+ var msg = GSON .fromJson (message , PageReviewMessage .class );
107+ return Row .withSchema (schema ).addValues (
108+ msg .url , // row url
109+ msg .review .equals ("positive" ) ? 1.0 : 0.0 , // row page_score
110+ new Instant () // row processing_time
111+ ).build ();
112+ })).setRowSchema (schema ) // make sure to set the row schema for the PCollection
112113
113114 // Add timestamps and bundle elements into windows.
114- .apply ("Add processing time" , WithTimestamps .of ((row ) -> row .getDateTime ("processing_time" ).toInstant ()))
115+ .apply ("Add processing time" , WithTimestamps
116+ .of ((row ) -> row .getDateTime ("processing_time" ).toInstant ()))
115117 .apply ("Fixed-size windows" , Window .into (FixedWindows .of (Duration .standardMinutes (1 ))))
116118
117119 // Apply a SQL query for every window of elements.
118120 .apply ("Run Beam SQL query" , SqlTransform .query (
119- "SELECT " +
120- " url, " +
121- " COUNT(page_score) AS num_reviews, " +
122- " AVG(page_score) AS score, " +
123- " MIN(processing_time) AS first_date, " +
124- " MAX(processing_time) AS last_date " +
125- "FROM PCOLLECTION " +
126- "GROUP BY url"
121+ "SELECT "
122+ + " url, "
123+ + " COUNT(page_score) AS num_reviews, "
124+ + " AVG(page_score) AS score, "
125+ + " MIN(processing_time) AS first_date, "
126+ + " MAX(processing_time) AS last_date "
127+ + "FROM PCOLLECTION "
128+ + "GROUP BY url"
127129 ))
128130
129131 // Convert the SQL Rows into BigQuery TableRows and write them to BigQuery.
130- .apply ("Convert to BigQuery TableRow" , MapElements .into (TypeDescriptor .of (TableRow .class )).via (row -> {
131- LOG .info ("rating summary: {} {} ({} reviews)" , row .getDouble ("score" ), row .getString ("url" ),
132- row .getInt64 ("num_reviews" ));
133- return new TableRow ()
134- .set ("url" , row .getString ("url" ))
135- .set ("num_reviews" , row .getInt64 ("num_reviews" ))
136- .set ("score" , row .getDouble ("score" ))
137- .set ("first_date" , row .getDateTime ("first_date" ).toInstant ().toString ())
138- .set ("last_date" , row .getDateTime ("last_date" ).toInstant ().toString ());
139- }))
132+ .apply ("Convert to BigQuery TableRow" , MapElements .into (TypeDescriptor .of (TableRow .class ))
133+ .via (row -> {
134+ LOG .info ("rating summary: {} {} ({} reviews)" , row .getDouble ("score" ),
135+ row .getString ("url" ), row .getInt64 ("num_reviews" ));
136+ return new TableRow ()
137+ .set ("url" , row .getString ("url" ))
138+ .set ("num_reviews" , row .getInt64 ("num_reviews" ))
139+ .set ("score" , row .getDouble ("score" ))
140+ .set ("first_date" , row .getDateTime ("first_date" ).toInstant ().toString ())
141+ .set ("last_date" , row .getDateTime ("last_date" ).toInstant ().toString ());
142+ }))
140143 .apply ("Write to BigQuery" , BigQueryIO .writeTableRows ()
141144 .to (options .getOutputTable ())
142145 .withSchema (new TableSchema ().setFields (Arrays .asList (
0 commit comments