Skip to content

Commit 19e7c0f

Browse files
authored
[yaml] - add bigquery destination test (#37787)
* add bigquery destination test * try postcommit test * comment out blocked writeToBQ part * add tmp location * add BQ project detect and test
1 parent 72749a2 commit 19e7c0f

4 files changed

Lines changed: 177 additions & 6 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"revision": 6
3+
"revision": 8
44
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,27 @@ public List<BigQueryStorageStreamSource<T>> split(
156156
streamCount = Math.max(streamCount, MIN_SPLIT_COUNT);
157157
}
158158

159+
String project =
160+
bqOptions.getBigQueryProject() == null
161+
? bqOptions.getProject()
162+
: bqOptions.getBigQueryProject();
163+
if (project == null) {
164+
if (targetTable != null
165+
&& targetTable.getTableReference() != null
166+
&& targetTable.getTableReference().getProjectId() != null) {
167+
project = targetTable.getTableReference().getProjectId();
168+
} else {
169+
@Nullable String tableReferenceId = getTargetTableId(bqOptions);
170+
if (tableReferenceId != null) {
171+
TableReference tableReference = BigQueryHelpers.parseTableUrn(tableReferenceId);
172+
project = tableReference.getProjectId();
173+
}
174+
}
175+
}
176+
159177
CreateReadSessionRequest createReadSessionRequest =
160178
CreateReadSessionRequest.newBuilder()
161-
.setParent(
162-
BigQueryHelpers.toProjectResourceName(
163-
bqOptions.getBigQueryProject() == null
164-
? bqOptions.getProject()
165-
: bqOptions.getBigQueryProject()))
179+
.setParent(BigQueryHelpers.toProjectResourceName(project))
166180
.setReadSession(readSessionBuilder)
167181
.setMaxStreamCount(streamCount)
168182
.build();

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1600,6 +1600,66 @@ public void testReadFromBigQueryIO() throws Exception {
16001600
p.run();
16011601
}
16021602

1603+
@Test
1604+
public void testReadFromBigQueryIOWithFallbackProject() throws Exception {
1605+
fakeDatasetService.createDataset("fallback-project", "dataset", "", "", null);
1606+
TableReference tableRef = BigQueryHelpers.parseTableSpec("fallback-project:dataset.table");
1607+
Table table = new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA);
1608+
fakeDatasetService.createTable(table);
1609+
1610+
CreateReadSessionRequest expectedCreateReadSessionRequest =
1611+
CreateReadSessionRequest.newBuilder()
1612+
.setParent("projects/fallback-project")
1613+
.setReadSession(
1614+
ReadSession.newBuilder()
1615+
.setTable("projects/fallback-project/datasets/dataset/tables/table")
1616+
.setDataFormat(DataFormat.AVRO)
1617+
.setReadOptions(ReadSession.TableReadOptions.newBuilder()))
1618+
.setMaxStreamCount(10)
1619+
.build();
1620+
1621+
ReadSession readSession =
1622+
ReadSession.newBuilder()
1623+
.setName("readSessionName")
1624+
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
1625+
.addStreams(ReadStream.newBuilder().setName("streamName"))
1626+
.setDataFormat(DataFormat.AVRO)
1627+
.build();
1628+
1629+
ReadRowsRequest expectedReadRowsRequest =
1630+
ReadRowsRequest.newBuilder().setReadStream("streamName").build();
1631+
1632+
List<GenericRecord> records = Lists.newArrayList(createRecord("A", 1, AVRO_SCHEMA));
1633+
1634+
List<ReadRowsResponse> readRowsResponses =
1635+
Lists.newArrayList(createResponse(AVRO_SCHEMA, records.subList(0, 1), 0.0, 1.0));
1636+
1637+
StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
1638+
when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
1639+
.thenReturn(readSession);
1640+
when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
1641+
.thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
1642+
1643+
// Explicitly set the pipeline's project option to null to simulate missing
1644+
// cross-language parameters, and verify it uses the project from the TableReference.
1645+
options.as(BigQueryOptions.class).setProject(null);
1646+
1647+
PCollection<KV<String, Long>> output =
1648+
p.apply(
1649+
BigQueryIO.read(new ParseKeyValue())
1650+
.from("fallback-project:dataset.table")
1651+
.withMethod(Method.DIRECT_READ)
1652+
.withFormat(DataFormat.AVRO)
1653+
.withTestServices(
1654+
new FakeBigQueryServices()
1655+
.withDatasetService(fakeDatasetService)
1656+
.withStorageClient(fakeStorageClient)));
1657+
1658+
PAssert.that(output).containsInAnyOrder(ImmutableList.of(KV.of("A", 1L)));
1659+
1660+
p.run();
1661+
}
1662+
16031663
@Test
16041664
public void testReadFromBigQueryIOWithTrimmedSchema() throws Exception {
16051665
fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
fixtures:
19+
- name: BQ_TABLE
20+
type: "apache_beam.yaml.integration_tests.temp_bigquery_table"
21+
config:
22+
project: "apache-beam-testing"
23+
- name: TEMP_DIR_0
24+
# Need distributed filesystem to be able to read and write from a container.
25+
type: "apache_beam.yaml.integration_tests.gcs_temp_dir"
26+
config:
27+
bucket: "gs://temp-storage-for-end-to-end-tests/temp-it"
28+
29+
pipelines:
30+
- pipeline:
31+
type: chain
32+
transforms:
33+
- type: Create
34+
name: CreateUsers
35+
config:
36+
elements:
37+
- {id: 1, name: "Alice", country: "US"}
38+
- {id: 2, name: "Bob", country: "UK"}
39+
- {id: 3, name: "Charlie", country: "CN"}
40+
- {id: 4, name: "David", country: "US"}
41+
- type: WriteToBigQuery
42+
name: WriteWithDynamicDestinations
43+
config:
44+
# NOTE: This will create 3 tables: BQ_TABLE_US, BQ_TABLE_UK, BQ_TABLE_CN
45+
# This is because the table name is dynamically generated using the country field.
46+
# The {{country}} syntax is used to dynamically generate the table name.
47+
# For this testing example we are using {{country}} to dynamically generate the table name.
48+
# In production, we would use {country} to dynamically generate the table name.
49+
table: "{BQ_TABLE}_{{country}}"
50+
create_disposition: CREATE_IF_NEEDED
51+
write_disposition: WRITE_APPEND
52+
options:
53+
project: "apache-beam-testing"
54+
temp_location: "{TEMP_DIR_0}"
55+
56+
- pipeline:
57+
type: chain
58+
transforms:
59+
- type: ReadFromBigQuery
60+
config:
61+
table: "{BQ_TABLE}_US"
62+
- type: AssertEqual
63+
config:
64+
elements:
65+
- {id: 1, name: "Alice", country: "US"}
66+
- {id: 4, name: "David", country: "US"}
67+
options:
68+
project: "apache-beam-testing"
69+
temp_location: "{TEMP_DIR_0}"
70+
71+
- pipeline:
72+
type: chain
73+
transforms:
74+
- type: ReadFromBigQuery
75+
config:
76+
table: "{BQ_TABLE}_UK"
77+
- type: AssertEqual
78+
config:
79+
elements:
80+
- {id: 2, name: "Bob", country: "UK"}
81+
options:
82+
project: "apache-beam-testing"
83+
temp_location: "{TEMP_DIR_0}"
84+
85+
- pipeline:
86+
type: chain
87+
transforms:
88+
- type: ReadFromBigQuery
89+
config:
90+
table: "{BQ_TABLE}_CN"
91+
- type: AssertEqual
92+
config:
93+
elements:
94+
- {id: 3, name: "Charlie", country: "CN"}
95+
options:
96+
project: "apache-beam-testing"
97+
temp_location: "{TEMP_DIR_0}"

0 commit comments

Comments
 (0)