Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit e054007

Browse files
committed
feat: Adding CDC Sample
1 parent 5df548a commit e054007

3 files changed

Lines changed: 312 additions & 0 deletions

File tree

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example;
18+
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.ApiFutureCallback;
21+
import com.google.api.core.ApiFutures;
22+
import com.google.cloud.bigquery.BigQuery;
23+
import com.google.cloud.bigquery.BigQueryOptions;
24+
import com.google.cloud.bigquery.Field;
25+
import com.google.cloud.bigquery.FieldList;
26+
import com.google.cloud.bigquery.Schema;
27+
import com.google.cloud.bigquery.StandardSQLTypeName;
28+
import com.google.cloud.bigquery.StandardTableDefinition;
29+
import com.google.cloud.bigquery.Table;
30+
import com.google.cloud.bigquery.TableId;
31+
import com.google.cloud.bigquery.TableInfo;
32+
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
33+
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
34+
import com.google.cloud.bigquery.storage.v1.TableName;
35+
import com.google.cloud.bigquery.storage.v1.TableSchema;
36+
import com.google.common.util.concurrent.MoreExecutors;
37+
import com.google.protobuf.Descriptors.DescriptorValidationException;
38+
import java.io.BufferedReader;
39+
import java.io.FileReader;
40+
import java.io.IOException;
41+
import org.json.JSONArray;
42+
import org.json.JSONObject;
43+
44+
class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {
45+
private static final Object lock = new Object();
46+
private static int batchCount = 0;
47+
48+
public void onSuccess(AppendRowsResponse response) {
49+
synchronized (lock) {
50+
if (response.hasError()) {
51+
System.out.format("Error: %s\n", response.getError());
52+
} else {
53+
++batchCount;
54+
System.out.format("Wrote batch %d\n", batchCount);
55+
}
56+
}
57+
}
58+
59+
public void onFailure(Throwable throwable) {
60+
System.out.format("Error: %s\n", throwable.toString());
61+
}
62+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/*
18+
* Class is copied from java-bigquerystorage/samples snippet, as a temporary workaround
19+
* to the fact there is no built-in converter between the REST object {@see com.google.cloud.bigquery.Schema}
20+
* and the gRPC/Proto based {@see com.google.cloud.bigquery.storage.v1.TableSchema}.
21+
* https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java
22+
*/
23+
24+
package com.example.bigquerystorage;
25+
26+
import com.google.cloud.bigquery.Field;
27+
import com.google.cloud.bigquery.Schema;
28+
import com.google.cloud.bigquery.StandardSQLTypeName;
29+
import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
30+
import com.google.cloud.bigquery.storage.v1.TableSchema;
31+
import com.google.common.collect.ImmutableMap;
32+
33+
/** Converts structure from BigQuery client to BigQueryStorage client */
34+
public class BqToBqStorageSchemaConverter {
35+
private static ImmutableMap<Field.Mode, TableFieldSchema.Mode> BQTableSchemaModeMap =
36+
ImmutableMap.of(
37+
Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE,
38+
Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED,
39+
Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED);
40+
41+
private static ImmutableMap<StandardSQLTypeName, TableFieldSchema.Type> BQTableSchemaTypeMap =
42+
new ImmutableMap.Builder<StandardSQLTypeName, TableFieldSchema.Type>()
43+
.put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL)
44+
.put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES)
45+
.put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE)
46+
.put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME)
47+
.put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE)
48+
.put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY)
49+
.put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64)
50+
.put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC)
51+
.put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING)
52+
.put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT)
53+
.put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME)
54+
.put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP)
55+
// Below this comment: mappings manually added by the Datastream team
56+
.put(StandardSQLTypeName.BIGNUMERIC, TableFieldSchema.Type.BIGNUMERIC)
57+
.put(StandardSQLTypeName.JSON, TableFieldSchema.Type.JSON)
58+
.put(StandardSQLTypeName.INTERVAL, TableFieldSchema.Type.INTERVAL)
59+
.build();
60+
61+
/**
62+
* Converts from BigQuery client Table Schema to bigquery storage API Table Schema.
63+
*
64+
* @param schema the BigQuery client Table Schema
65+
* @return the bigquery storage API Table Schema
66+
*/
67+
public static TableSchema convertTableSchema(Schema schema) {
68+
TableSchema.Builder result = TableSchema.newBuilder();
69+
for (int i = 0; i < schema.getFields().size(); i++) {
70+
result.addFields(i, convertFieldSchema(schema.getFields().get(i)));
71+
}
72+
return result.build();
73+
}
74+
75+
/**
76+
* Converts from bigquery v2 Field Schema to bigquery storage API Field Schema.
77+
*
78+
* @param field the BigQuery client Field Schema
79+
* @return the bigquery storage API Field Schema
80+
*/
81+
public static TableFieldSchema convertFieldSchema(Field field) {
82+
TableFieldSchema.Builder result = TableFieldSchema.newBuilder();
83+
if (field.getMode() == null) {
84+
field = field.toBuilder().setMode(Field.Mode.NULLABLE).build();
85+
}
86+
result.setMode(BQTableSchemaModeMap.get(field.getMode()));
87+
result.setName(field.getName());
88+
result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType()));
89+
if (field.getDescription() != null) {
90+
result.setDescription(field.getDescription());
91+
}
92+
if (field.getSubFields() != null) {
93+
for (int i = 0; i < field.getSubFields().size(); i++) {
94+
result.addFields(i, convertFieldSchema(field.getSubFields().get(i)));
95+
}
96+
}
97+
return result.build();
98+
}
99+
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example;
18+
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.ApiFutureCallback;
21+
import com.google.api.core.ApiFutures;
22+
import com.google.cloud.bigquery.BigQuery;
23+
import com.google.cloud.bigquery.BigQueryOptions;
24+
import com.google.cloud.bigquery.Field;
25+
import com.google.cloud.bigquery.FieldList;
26+
import com.google.cloud.bigquery.Schema;
27+
import com.google.cloud.bigquery.StandardSQLTypeName;
28+
import com.google.cloud.bigquery.StandardTableDefinition;
29+
import com.google.cloud.bigquery.Table;
30+
import com.google.cloud.bigquery.TableId;
31+
import com.google.cloud.bigquery.TableInfo;
32+
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
33+
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
34+
import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
35+
import com.google.cloud.bigquery.storage.v1.TableName;
36+
import com.google.cloud.bigquery.storage.v1.TableSchema;
37+
import com.google.common.util.concurrent.MoreExecutors;
38+
import com.google.protobuf.Descriptors.DescriptorValidationException;
39+
import java.io.BufferedReader;
40+
import java.io.FileReader;
41+
import java.io.IOException;
42+
import org.json.JSONArray;
43+
import org.json.JSONObject;
44+
45+
public class JsonWriterStreamCdc {
46+
47+
private static final String CHANGE_TYPE_PSEUDO_COLUMN = "_change_type";
48+
49+
public static void main(String[] args) throws Exception {
50+
if (args.length < 4) {
51+
System.out.println("Arguments: project, dataset, table, source_file");
52+
return;
53+
}
54+
55+
String projectId = args[0];
56+
String datasetName = args[1];
57+
String tableName = args[2];
58+
String dataFile = args[3];
59+
createDestinationTable(projectId, datasetName, tableName);
60+
writeToDefaultStream(projectId, datasetName, tableName, dataFile);
61+
}
62+
63+
public static void createDestinationTable(
64+
String projectId, String datasetName, String tableName) {
65+
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
66+
// Create a schema that matches the source data.
67+
Schema schema =
68+
Schema.of(
69+
Field.of("commit", StandardSQLTypeName.STRING),
70+
Field.newBuilder("parent", StandardSQLTypeName.STRING)
71+
.setMode(Field.Mode.REPEATED)
72+
.build(),
73+
Field.of("author", StandardSQLTypeName.STRING),
74+
Field.of("committer", StandardSQLTypeName.STRING),
75+
Field.of("commit_date", StandardSQLTypeName.DATETIME),
76+
Field.of(
77+
"commit_msg",
78+
StandardSQLTypeName.STRUCT,
79+
FieldList.of(
80+
Field.of("subject", StandardSQLTypeName.STRING),
81+
Field.of("message", StandardSQLTypeName.STRING))),
82+
Field.of("repo_name", StandardSQLTypeName.STRING));
83+
84+
// Create a table that uses this schema.
85+
TableId tableId = TableId.of(projectId, datasetName, tableName);
86+
Table table = bigquery.getTable(tableId);
87+
if (table == null) {
88+
TableInfo tableInfo =
89+
TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build();
90+
bigquery.create(tableInfo);
91+
}
92+
}
93+
94+
// writeToDefaultStream: Writes records from the source file to the destination table.
95+
public static void writeToDefaultStream(
96+
String projectId, String datasetName, String tableName, String dataFile)
97+
throws DescriptorValidationException, InterruptedException, IOException {
98+
99+
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
100+
101+
// Get the schema of the destination table and convert to the equivalent BigQueryStorage type.
102+
Table table = bigquery.getTable(datasetName, tableName);
103+
Schema schema = table.getDefinition().getSchema();
104+
TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema);
105+
106+
// Use the JSON stream writer to send records in JSON format.
107+
TableName parentTable = TableName.of(projectId, datasetName, tableName);
108+
try (JsonStreamWriter writer =
109+
JsonStreamWriter.newBuilder(parentTable.toString(),
110+
addPseudoColumnsIfNeeded(tableSchema))
111+
.build()) {
112+
// Read JSON data from the source file and send it to the Write API.
113+
BufferedReader reader = new BufferedReader(new FileReader(dataFile));
114+
String line = reader.readLine();
115+
while (line != null) {
116+
// As a best practice, send batches of records, instead of single records at a time.
117+
JSONArray jsonArr = new JSONArray();
118+
for (int i = 0; i < 100; i++) {
119+
JSONObject record = new JSONObject(line);
120+
jsonArr.put(record);
121+
line = reader.readLine();
122+
if (line == null) {
123+
break;
124+
}
125+
} // batch
126+
ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
127+
// The append method is asynchronous. Rather than waiting for the method to complete,
128+
// which can hurt performance, register a completion callback and continue streaming.
129+
ApiFutures.addCallback(
130+
future, new AppendCompleteCallback(), MoreExecutors.directExecutor());
131+
}
132+
}
133+
}
134+
135+
private static TableSchema addPseudoColumnsIfNeeded(TableSchema tableSchema) {
136+
return tableSchema
137+
.toBuilder()
138+
.addFields(
139+
TableFieldSchema.newBuilder()
140+
.setType(TableFieldSchema.Type.STRING)
141+
.setMode(TableFieldSchema.Mode.NULLABLE)
142+
.build())
143+
.addFields(
144+
TableFieldSchema.newBuilder()
145+
.setName(CHANGE_TYPE_PSEUDO_COLUMN)
146+
.setType(TableFieldSchema.Type.STRING)
147+
.setMode(TableFieldSchema.Mode.NULLABLE)
148+
.build())
149+
.build();
150+
}
151+
}

0 commit comments

Comments
 (0)