From f5836d00c9231d7c0fcc05bdb1790c7aafa94459 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 8 Feb 2023 00:23:47 +0000 Subject: [PATCH 01/11] . --- .../BqToBqStorageSchemaConverter.java | 88 ------------------- .../bigquerystorage/WriteToDefaultStream.java | 1 + 2 files changed, 1 insertion(+), 88 deletions(-) delete mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java b/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java deleted file mode 100644 index 27c5d1dbcb..0000000000 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.example.bigquerystorage; - -import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardSQLTypeName; -import com.google.cloud.bigquery.storage.v1.TableFieldSchema; -import com.google.cloud.bigquery.storage.v1.TableSchema; -import com.google.common.collect.ImmutableMap; - -/** Converts structure from BigQuery client to BigQueryStorage client */ -public class BqToBqStorageSchemaConverter { - private static ImmutableMap BQTableSchemaModeMap = - ImmutableMap.of( - Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE, - Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED, - Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED); - - private static ImmutableMap BQTableSchemaTypeMap = - new ImmutableMap.Builder() - .put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL) - .put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES) - .put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE) - .put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME) - .put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE) - .put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY) - .put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64) - .put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC) - .put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING) - .put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT) - .put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME) - .put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP) - .build(); - - /** - * Converts from BigQuery client Table Schema to bigquery storage API Table Schema. - * - * @param schema the BigQuery client Table Schema - * @return the bigquery storage API Table Schema - */ - public static TableSchema convertTableSchema(Schema schema) { - TableSchema.Builder result = TableSchema.newBuilder(); - for (int i = 0; i < schema.getFields().size(); i++) { - result.addFields(i, convertFieldSchema(schema.getFields().get(i))); - } - return result.build(); - } - - /** - * Converts from bigquery v2 Field Schema to bigquery storage API Field Schema. - * - * @param field the BigQuery client Field Schema - * @return the bigquery storage API Field Schema - */ - public static TableFieldSchema convertFieldSchema(Field field) { - TableFieldSchema.Builder result = TableFieldSchema.newBuilder(); - if (field.getMode() == null) { - field = field.toBuilder().setMode(Field.Mode.NULLABLE).build(); - } - result.setMode(BQTableSchemaModeMap.get(field.getMode())); - result.setName(field.getName()); - result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType())); - if (field.getDescription() != null) { - result.setDescription(field.getDescription()); - } - if (field.getSubFields() != null) { - for (int i = 0; i < field.getSubFields().size(); i++) { - result.addFields(i, convertFieldSchema(field.getSubFields().get(i))); - } - } - return result.build(); - } -} diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index f5f357238a..62dd2eef43 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -198,6 +198,7 @@ public void onSuccess(AppendRowsResponse response) { } public void onFailure(Throwable throwable) { + if (this.parent.streamWriter.isDone()) { // If the wrapped exception is a StatusRuntimeException, check the state of the operation. // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, // see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html From fcf6e9fb7c5f741171668bebe1117657f8298448 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 8 Feb 2023 00:52:31 +0000 Subject: [PATCH 02/11] . --- .../bigquery/storage/v1/JsonStreamWriter.java | 5 +++++ .../bigquery/storage/v1/StreamWriter.java | 5 +++++ .../storage/v1/JsonStreamWriterTest.java | 9 +++++---- .../bigquery/storage/v1/StreamWriterTest.java | 4 ++++ .../bigquerystorage/WriteToDefaultStream.java | 18 +++++++++++++++--- 5 files changed, 34 insertions(+), 7 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 5cf3cec250..080e237d05 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -391,6 +391,11 @@ public boolean isDone() { return this.streamWriter.isDone(); } + /** @return if user explicitly closed the writer. */ + public boolean isUserClosed() { + return this.streamWriter.isUserClosed(); + } + public static final class Builder { private String streamName; private BigQueryWriteClient client; diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index e09467981c..bbd9a4914e 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -435,6 +435,11 @@ public boolean isDone() { } } + /** @return if user explicitly closed the writer. */ + public boolean isUserClosed() { + return userClosed.get(); + } + /** Close the stream writer. Shut down all resources. */ @Override public void close() { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 1cb8000858..c3f60511c6 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -1141,10 +1141,11 @@ public void testWriterId() @Test public void testIsDone() throws DescriptorValidationException, IOException, InterruptedException { - JsonStreamWriter writer1 = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build(); - Assert.assertFalse(writer1.isDone()); - writer1.close(); - Assert.assertTrue(writer1.isDone()); + JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build(); + Assert.assertFalse(writer.isDone()); + writer.close(); + Assert.assertTrue(writer.isDone()); + Assert.assertTrue(writer.isUserClosed()); } private AppendRowsResponse createAppendResponse(long offset) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 731d6d6364..2b7ef0151d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -1248,6 +1248,7 @@ public void testStreamWriterUserCloseMultiplexing() throws Exception { assertEquals( Status.Code.FAILED_PRECONDITION, ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); + assertTrue(writer.isUserClosed()); } @Test(timeout = 10000) @@ -1267,6 +1268,7 @@ public void testStreamWriterUserCloseNoMultiplexing() throws Exception { assertEquals( Status.Code.FAILED_PRECONDITION, ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); + assertTrue(writer.isUserClosed()); } @Test(timeout = 10000) @@ -1292,6 +1294,7 @@ public void testStreamWriterPermanentErrorMultiplexing() throws Exception { }); assertTrue(ex.getCause() instanceof InvalidArgumentException); assertFalse(writer.isDone()); + assertFalse(writer.isUserClosed()); } @Test(timeout = 10000) @@ -1313,5 +1316,6 @@ public void testStreamWriterPermanentErrorNoMultiplexing() throws Exception { }); assertTrue(writer.isDone()); assertTrue(ex.getCause() instanceof InvalidArgumentException); + assertFalse(writer.isUserClosed()); } } diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 62dd2eef43..b60740589b 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -135,6 +135,7 @@ private static class DataWriter { // Track the number of in-flight requests to wait for all responses before shutting down. private final Phaser inflightRequestCount = new Phaser(1); private final Object lock = new Object(); + private JsonStreamWriter streamWriter; @GuardedBy("lock") @@ -142,15 +143,24 @@ private static class DataWriter { public void initialize(TableName parentTable) throws DescriptorValidationException, IOException, InterruptedException { + initialize(parentTable.); + } + + public void reInitialize() { + initialize(streamWriter.getStreamName()); + } + + private void initialize(String streamOrTableName) { // Use the JSON stream writer to send records in JSON format. Specify the table name to write // to the default stream. // For more information about JsonStreamWriter, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html streamWriter = - JsonStreamWriter.newBuilder(parentTable.toString(), BigQueryWriteClient.create()).build(); + JsonStreamWriter.newBuilder(streamOrTableName, BigQueryWriteClient.create()) + .build(); } - public void append(AppendContext appendContext) + public synchronized void append(AppendContext appendContext) throws DescriptorValidationException, IOException { synchronized (this.lock) { // If earlier appends have failed, we need to reset before continuing. @@ -159,6 +169,9 @@ public void append(AppendContext appendContext) } } // Append asynchronously for increased throughput. + if (streamWriter.isDone() && !streamWriter.isUserClosed()) { + reInitialize(); + } ApiFuture future = streamWriter.append(appendContext.data); ApiFutures.addCallback( future, new AppendCompleteCallback(this, appendContext), MoreExecutors.directExecutor()); @@ -198,7 +211,6 @@ public void onSuccess(AppendRowsResponse response) { } public void onFailure(Throwable throwable) { - if (this.parent.streamWriter.isDone()) { // If the wrapped exception is a StatusRuntimeException, check the state of the operation. // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, // see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html From cde9e111ae7567065434be82a623513b602425e2 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 8 Feb 2023 01:04:58 +0000 Subject: [PATCH 03/11] . --- .../example/bigquerystorage/WriteToDefaultStream.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index b60740589b..ed2f87033a 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -143,14 +143,10 @@ private static class DataWriter { public void initialize(TableName parentTable) throws DescriptorValidationException, IOException, InterruptedException { - initialize(parentTable.); + initialize(parentTable.toString()); } - public void reInitialize() { - initialize(streamWriter.getStreamName()); - } - - private void initialize(String streamOrTableName) { + private void initialize(String streamOrTableName) throws DescriptorValidationException, IOException, InterruptedException { // Use the JSON stream writer to send records in JSON format. Specify the table name to write // to the default stream. // For more information about JsonStreamWriter, see: @@ -170,7 +166,7 @@ public synchronized void append(AppendContext appendContext) } // Append asynchronously for increased throughput. if (streamWriter.isDone() && !streamWriter.isUserClosed()) { - reInitialize(); + initialize(streamWriter.getStreamName()); } ApiFuture future = streamWriter.append(appendContext.data); ApiFutures.addCallback( From fc28eaa366ccb048f331e6112876458915890909 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 8 Feb 2023 01:08:35 +0000 Subject: [PATCH 04/11] . --- .../java/com/example/bigquerystorage/WriteToDefaultStream.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index ed2f87033a..11615e20d9 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -146,7 +146,8 @@ public void initialize(TableName parentTable) initialize(parentTable.toString()); } - private void initialize(String streamOrTableName) throws DescriptorValidationException, IOException, InterruptedException { + private void initialize(String streamOrTableName) + throws DescriptorValidationException, IOException, InterruptedException { // Use the JSON stream writer to send records in JSON format. Specify the table name to write // to the default stream. // For more information about JsonStreamWriter, see: From 0832d28ba32a47af1969360520509de606e82198 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 8 Feb 2023 01:14:32 +0000 Subject: [PATCH 05/11] . --- .../bigquerystorage/WriteToDefaultStream.java | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 11615e20d9..fa0da13948 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -135,7 +135,6 @@ private static class DataWriter { // Track the number of in-flight requests to wait for all responses before shutting down. private final Phaser inflightRequestCount = new Phaser(1); private final Object lock = new Object(); - private JsonStreamWriter streamWriter; @GuardedBy("lock") @@ -143,21 +142,15 @@ private static class DataWriter { public void initialize(TableName parentTable) throws DescriptorValidationException, IOException, InterruptedException { - initialize(parentTable.toString()); - } - - private void initialize(String streamOrTableName) - throws DescriptorValidationException, IOException, InterruptedException { // Use the JSON stream writer to send records in JSON format. Specify the table name to write // to the default stream. // For more information about JsonStreamWriter, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html streamWriter = - JsonStreamWriter.newBuilder(streamOrTableName, BigQueryWriteClient.create()) - .build(); + JsonStreamWriter.newBuilder(parentTable.toString(), BigQueryWriteClient.create()).build(); } - public synchronized void append(AppendContext appendContext) + public void append(AppendContext appendContext) throws DescriptorValidationException, IOException { synchronized (this.lock) { // If earlier appends have failed, we need to reset before continuing. @@ -166,9 +159,6 @@ public synchronized void append(AppendContext appendContext) } } // Append asynchronously for increased throughput. - if (streamWriter.isDone() && !streamWriter.isUserClosed()) { - initialize(streamWriter.getStreamName()); - } ApiFuture future = streamWriter.append(appendContext.data); ApiFutures.addCallback( future, new AppendCompleteCallback(this, appendContext), MoreExecutors.directExecutor()); @@ -276,4 +266,4 @@ private void done() { } } } -// [END bigquerystorage_jsonstreamwriter_default] +// [END bigquerystorage_jsonstreamwriter_default] \ No newline at end of file From 65cbbf29b172483d245aedaf542ffe5417729691 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 8 Feb 2023 01:17:44 +0000 Subject: [PATCH 06/11] . --- .../java/com/example/bigquerystorage/WriteToDefaultStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index fa0da13948..f5f357238a 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -266,4 +266,4 @@ private void done() { } } } -// [END bigquerystorage_jsonstreamwriter_default] \ No newline at end of file +// [END bigquerystorage_jsonstreamwriter_default] From 82e7b827914ba4d0bea7b10d046334d02ca6b852 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 8 Feb 2023 01:21:26 +0000 Subject: [PATCH 07/11] . --- .../BqToBqStorageSchemaConverter.java | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java b/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java new file mode 100644 index 0000000000..f662112c6a --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java @@ -0,0 +1,88 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.collect.ImmutableMap; + +/** Converts structure from BigQuery client to BigQueryStorage client */ +public class BqToBqStorageSchemaConverter { + private static ImmutableMap BQTableSchemaModeMap = + ImmutableMap.of( + Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE, + Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED, + Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED); + + private static ImmutableMap BQTableSchemaTypeMap = + new ImmutableMap.Builder() + .put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL) + .put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES) + .put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE) + .put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME) + .put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE) + .put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY) + .put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64) + .put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC) + .put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING) + .put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT) + .put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME) + .put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP) + .build(); + + /** + * Converts from BigQuery client Table Schema to bigquery storage API Table Schema. + * + * @param schema the BigQuery client Table Schema + * @return the bigquery storage API Table Schema + */ + public static TableSchema convertTableSchema(Schema schema) { + TableSchema.Builder result = TableSchema.newBuilder(); + for (int i = 0; i < schema.getFields().size(); i++) { + result.addFields(i, convertFieldSchema(schema.getFields().get(i))); + } + return result.build(); + } + + /** + * Converts from bigquery v2 Field Schema to bigquery storage API Field Schema. + * + * @param field the BigQuery client Field Schema + * @return the bigquery storage API Field Schema + */ + public static TableFieldSchema convertFieldSchema(Field field) { + TableFieldSchema.Builder result = TableFieldSchema.newBuilder(); + if (field.getMode() == null) { + field = field.toBuilder().setMode(Field.Mode.NULLABLE).build(); + } + result.setMode(BQTableSchemaModeMap.get(field.getMode())); + result.setName(field.getName()); + result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType())); + if (field.getDescription() != null) { + result.setDescription(field.getDescription()); + } + if (field.getSubFields() != null) { + for (int i = 0; i < field.getSubFields().size(); i++) { + result.addFields(i, convertFieldSchema(field.getSubFields().get(i))); + } + } + return result.build(); + } +} \ No newline at end of file From bd29bd335536fa9823759f859824e0e5a9266ee3 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 8 Feb 2023 01:21:47 +0000 Subject: [PATCH 08/11] . --- .../example/bigquerystorage/BqToBqStorageSchemaConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java b/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java index f662112c6a..27c5d1dbcb 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java @@ -85,4 +85,4 @@ public static TableFieldSchema convertFieldSchema(Field field) { } return result.build(); } -} \ No newline at end of file +} From 697836a3b0d45c267d475bf1831d711c03109252 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 8 Feb 2023 08:25:09 +0000 Subject: [PATCH 09/11] . --- .../google/cloud/bigquery/storage/v1/JsonStreamWriter.java | 4 ++-- .../com/google/cloud/bigquery/storage/v1/StreamWriter.java | 2 +- .../cloud/bigquery/storage/v1/JsonStreamWriterTest.java | 4 ++-- .../google/cloud/bigquery/storage/v1/StreamWriterTest.java | 6 +++--- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 080e237d05..24061878f2 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -387,8 +387,8 @@ public void close() { * JsonStreamWriter is explicitly closed or the underlying connection is broken when * connection pool is not used. Client should recreate JsonStreamWriter in this case. */ - public boolean isDone() { - return this.streamWriter.isDone(); + public boolean isClosed() { + return this.streamWriter.isClosed(); } /** @return if user explicitly closed the writer. */ diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index bbd9a4914e..0a65c656b4 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -425,7 +425,7 @@ public String getLocation() { * StreamWriter is explicitly closed or the underlying connection is broken when connection * pool is not used. Client should recreate StreamWriter in this case. */ - public boolean isDone() { + public boolean isClosed() { if (singleConnectionOrConnectionPool.getKind() == Kind.CONNECTION_WORKER) { return userClosed.get() || singleConnectionOrConnectionPool.connectionWorker().isConnectionInUnrecoverableState(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index c3f60511c6..286061a795 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -1142,9 +1142,9 @@ public void testWriterId() @Test public void testIsDone() throws DescriptorValidationException, IOException, InterruptedException { JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build(); - Assert.assertFalse(writer.isDone()); + Assert.assertFalse(writer.isClosed()); writer.close(); - Assert.assertTrue(writer.isDone()); + Assert.assertTrue(writer.isClosed()); Assert.assertTrue(writer.isUserClosed()); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 2b7ef0151d..1ef8ffdd5b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -1257,7 +1257,7 @@ public void testStreamWriterUserCloseNoMultiplexing() throws Exception { StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(createProtoSchema()).build(); writer.close(); - assertTrue(writer.isDone()); + assertTrue(writer.isClosed()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ExecutionException ex = assertThrows( @@ -1293,7 +1293,7 @@ public void testStreamWriterPermanentErrorMultiplexing() throws Exception { appendFuture2.get(); }); assertTrue(ex.getCause() instanceof InvalidArgumentException); - assertFalse(writer.isDone()); + assertFalse(writer.isClosed()); assertFalse(writer.isUserClosed()); } @@ -1314,7 +1314,7 @@ public void testStreamWriterPermanentErrorNoMultiplexing() throws Exception { () -> { appendFuture2.get(); }); - assertTrue(writer.isDone()); + assertTrue(writer.isClosed()); assertTrue(ex.getCause() instanceof InvalidArgumentException); assertFalse(writer.isUserClosed()); } From 99d39d63a715cffefd0ddc18f50d756d6d6caefd Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 8 Feb 2023 08:45:45 +0000 Subject: [PATCH 10/11] . --- .../com/google/cloud/bigquery/storage/v1/StreamWriterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 1ef8ffdd5b..383301d820 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -1237,7 +1237,7 @@ public void testStreamWriterUserCloseMultiplexing() throws Exception { .build(); writer.close(); - assertTrue(writer.isDone()); + assertTrue(writer.isClosed()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ExecutionException ex = assertThrows( From 1a2f504e304b926b04b89e4ffd6a380881418dd8 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 8 Feb 2023 08:57:13 +0000 Subject: [PATCH 11/11] . --- .../clirr-ignored-differences.xml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index b0d2b7c898..96d4b3d595 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -147,4 +147,15 @@ com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings) + + 7002 + com/google/cloud/bigquery/storage/v1/StreamWriter + boolean isDone() + + + 7002 + com/google/cloud/bigquery/storage/v1/JsonStreamWriter + boolean isDone() + +