1+ /*
2+ * Copyright 2023 Google LLC
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package com .example ;
18+
19+ import com .google .api .core .ApiFuture ;
20+ import com .google .api .core .ApiFutureCallback ;
21+ import com .google .api .core .ApiFutures ;
22+ import com .google .cloud .bigquery .BigQuery ;
23+ import com .google .cloud .bigquery .BigQueryOptions ;
24+ import com .google .cloud .bigquery .Field ;
25+ import com .google .cloud .bigquery .FieldList ;
26+ import com .google .cloud .bigquery .Schema ;
27+ import com .google .cloud .bigquery .StandardSQLTypeName ;
28+ import com .google .cloud .bigquery .StandardTableDefinition ;
29+ import com .google .cloud .bigquery .Table ;
30+ import com .google .cloud .bigquery .TableId ;
31+ import com .google .cloud .bigquery .TableInfo ;
32+ import com .google .cloud .bigquery .storage .v1 .AppendRowsResponse ;
33+ import com .google .cloud .bigquery .storage .v1 .JsonStreamWriter ;
34+ import com .google .cloud .bigquery .storage .v1 .TableFieldSchema ;
35+ import com .google .cloud .bigquery .storage .v1 .TableName ;
36+ import com .google .cloud .bigquery .storage .v1 .TableSchema ;
37+ import com .google .common .util .concurrent .MoreExecutors ;
38+ import com .google .protobuf .Descriptors .DescriptorValidationException ;
39+ import java .io .BufferedReader ;
40+ import java .io .FileReader ;
41+ import java .io .IOException ;
42+ import org .json .JSONArray ;
43+ import org .json .JSONObject ;
44+
45+ public class JsonWriterStreamCdc {
46+
47+ private static final String CHANGE_TYPE_PSEUDO_COLUMN = "_change_type" ;
48+
49+ public static void main (String [] args ) throws Exception {
50+ if (args .length < 4 ) {
51+ System .out .println ("Arguments: project, dataset, table, source_file" );
52+ return ;
53+ }
54+
55+ String projectId = args [0 ];
56+ String datasetName = args [1 ];
57+ String tableName = args [2 ];
58+ String dataFile = args [3 ];
59+ createDestinationTable (projectId , datasetName , tableName );
60+ writeToDefaultStream (projectId , datasetName , tableName , dataFile );
61+ }
62+
63+ public static void createDestinationTable (
64+ String projectId , String datasetName , String tableName ) {
65+ BigQuery bigquery = BigQueryOptions .getDefaultInstance ().getService ();
66+ // Create a schema that matches the source data.
67+ Schema schema =
68+ Schema .of (
69+ Field .of ("commit" , StandardSQLTypeName .STRING ),
70+ Field .newBuilder ("parent" , StandardSQLTypeName .STRING )
71+ .setMode (Field .Mode .REPEATED )
72+ .build (),
73+ Field .of ("author" , StandardSQLTypeName .STRING ),
74+ Field .of ("committer" , StandardSQLTypeName .STRING ),
75+ Field .of ("commit_date" , StandardSQLTypeName .DATETIME ),
76+ Field .of (
77+ "commit_msg" ,
78+ StandardSQLTypeName .STRUCT ,
79+ FieldList .of (
80+ Field .of ("subject" , StandardSQLTypeName .STRING ),
81+ Field .of ("message" , StandardSQLTypeName .STRING ))),
82+ Field .of ("repo_name" , StandardSQLTypeName .STRING ));
83+
84+ // Create a table that uses this schema.
85+ TableId tableId = TableId .of (projectId , datasetName , tableName );
86+ Table table = bigquery .getTable (tableId );
87+ if (table == null ) {
88+ TableInfo tableInfo =
89+ TableInfo .newBuilder (tableId , StandardTableDefinition .of (schema )).build ();
90+ bigquery .create (tableInfo );
91+ }
92+ }
93+
94+ // writeToDefaultStream: Writes records from the source file to the destination table.
95+ public static void writeToDefaultStream (
96+ String projectId , String datasetName , String tableName , String dataFile )
97+ throws DescriptorValidationException , InterruptedException , IOException {
98+
99+ BigQuery bigquery = BigQueryOptions .getDefaultInstance ().getService ();
100+
101+ // Get the schema of the destination table and convert to the equivalent BigQueryStorage type.
102+ Table table = bigquery .getTable (datasetName , tableName );
103+ Schema schema = table .getDefinition ().getSchema ();
104+ TableSchema tableSchema = BqToBqStorageSchemaConverter .convertTableSchema (schema );
105+
106+ // Use the JSON stream writer to send records in JSON format.
107+ TableName parentTable = TableName .of (projectId , datasetName , tableName );
108+ try (JsonStreamWriter writer =
109+ JsonStreamWriter .newBuilder (parentTable .toString (),
110+ addPseudoColumnsIfNeeded (tableSchema ))
111+ .build ()) {
112+ // Read JSON data from the source file and send it to the Write API.
113+ BufferedReader reader = new BufferedReader (new FileReader (dataFile ));
114+ String line = reader .readLine ();
115+ while (line != null ) {
116+ // As a best practice, send batches of records, instead of single records at a time.
117+ JSONArray jsonArr = new JSONArray ();
118+ for (int i = 0 ; i < 100 ; i ++) {
119+ JSONObject record = new JSONObject (line );
120+ jsonArr .put (record );
121+ line = reader .readLine ();
122+ if (line == null ) {
123+ break ;
124+ }
125+ } // batch
126+ ApiFuture <AppendRowsResponse > future = writer .append (jsonArr );
127+ // The append method is asynchronous. Rather than waiting for the method to complete,
128+ // which can hurt performance, register a completion callback and continue streaming.
129+ ApiFutures .addCallback (
130+ future , new AppendCompleteCallback (), MoreExecutors .directExecutor ());
131+ }
132+ }
133+ }
134+
135+ private static TableSchema addPseudoColumnsIfNeeded (TableSchema tableSchema ) {
136+ return tableSchema
137+ .toBuilder ()
138+ .addFields (
139+ TableFieldSchema .newBuilder ()
140+ .setType (TableFieldSchema .Type .STRING )
141+ .setMode (TableFieldSchema .Mode .NULLABLE )
142+ .build ())
143+ .addFields (
144+ TableFieldSchema .newBuilder ()
145+ .setName (CHANGE_TYPE_PSEUDO_COLUMN )
146+ .setType (TableFieldSchema .Type .STRING )
147+ .setMode (TableFieldSchema .Mode .NULLABLE )
148+ .build ())
149+ .build ();
150+ }
151+ }
0 commit comments