Skip to content

Commit 2576fa2

Browse files
authored
Add snippets for JDBC using Managed I/O (GoogleCloudPlatform#10239)
* Fix missing symbol error while upgrading beam to 2.69. * Add an example for reading from postgres via managed io. * Add an example for write to postgres via managed io. * Reformat using google-java-format * Minor change on table names. * Fix iceberg test due to beam version upgrade. * Address reviews. * Address comments from reviewers. * Formatting
1 parent f26a97e commit 2576fa2

File tree

6 files changed

+444
-5
lines changed

6 files changed

+444
-5
lines changed

dataflow/snippets/pom.xml

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,12 @@
3737
<maven.compiler.source>11</maven.compiler.source>
3838
<maven.compiler.target>11</maven.compiler.target>
3939
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
40-
<apache_beam.version>2.67.0</apache_beam.version>
40+
<apache_beam.version>2.70.0</apache_beam.version>
4141
<slf4j.version>2.0.12</slf4j.version>
42-
<parquet.version>1.14.0</parquet.version>
43-
<iceberg.version>1.4.2</iceberg.version>
42+
<parquet.version>1.16.0</parquet.version>
43+
<iceberg.version>1.10.0</iceberg.version>
44+
<postgresql.version>42.7.3</postgresql.version>
45+
<testcontainers.version>1.20.0</testcontainers.version>
4446
</properties>
4547

4648
<build>
@@ -65,6 +67,13 @@
6567

6668
<dependencyManagement>
6769
<dependencies>
70+
<!-- Force Avro version for everything. -->
71+
<dependency>
72+
<groupId>org.apache.avro</groupId>
73+
<artifactId>avro</artifactId>
74+
<!-- `LogicalTypes.TimestampNanos` was introduced at 1.12.0. -->
75+
<version>1.12.0</version>
76+
</dependency>
6877
<dependency>
6978
<groupId>com.google.cloud</groupId>
7079
<artifactId>libraries-bom</artifactId>
@@ -160,6 +169,11 @@
160169
<artifactId>iceberg-gcp</artifactId>
161170
<version>${iceberg.version}</version>
162171
</dependency>
172+
<dependency>
173+
<groupId>org.apache.iceberg</groupId>
174+
<artifactId>iceberg-parquet</artifactId>
175+
<version>${iceberg.version}</version>
176+
</dependency>
163177

164178
<!-- Kafka -->
165179
<dependency>
@@ -181,6 +195,27 @@
181195
<scope>test</scope>
182196
</dependency>
183197

198+
<!-- Postgres -->
199+
<dependency>
200+
<groupId>org.apache.beam</groupId>
201+
<artifactId>beam-sdks-java-io-jdbc</artifactId>
202+
<version>${apache_beam.version}</version>
203+
</dependency>
204+
<dependency>
205+
<!-- For preparing and verifying a test table in the integration test -->
206+
<groupId>org.postgresql</groupId>
207+
<artifactId>postgresql</artifactId>
208+
<version>${postgresql.version}</version>
209+
<scope>test</scope>
210+
</dependency>
211+
<dependency>
212+
<!-- For running containerized Postgres instance in the integration test -->
213+
<groupId>org.testcontainers</groupId>
214+
<artifactId>postgresql</artifactId>
215+
<version>${testcontainers.version}</version>
216+
<scope>test</scope>
217+
</dependency>
218+
184219
<!-- Google Cloud -->
185220
<dependency>
186221
<groupId>com.google.cloud</groupId>
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
// [START dataflow_postgres_read]
20+
import com.google.common.collect.ImmutableMap;
21+
import org.apache.beam.sdk.Pipeline;
22+
import org.apache.beam.sdk.PipelineResult;
23+
import org.apache.beam.sdk.io.TextIO;
24+
import org.apache.beam.sdk.managed.Managed;
25+
import org.apache.beam.sdk.options.Description;
26+
import org.apache.beam.sdk.options.PipelineOptions;
27+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
28+
import org.apache.beam.sdk.transforms.MapElements;
29+
import org.apache.beam.sdk.values.TypeDescriptors;
30+
31+
public class PostgresRead {
32+
33+
public interface Options extends PipelineOptions {
34+
@Description("The JDBC URL of the PostgreSQL database to read from.")
35+
String getJdbcUrl();
36+
37+
void setJdbcUrl(String value);
38+
39+
@Description("The PostgresSQL table to read from.")
40+
String getTable();
41+
42+
void setTable(String value);
43+
44+
@Description("The username for the PostgreSQL database.")
45+
String getUsername();
46+
47+
void setUsername(String value);
48+
49+
@Description("The password for the PostgreSQL database.")
50+
String getPassword();
51+
52+
void setPassword(String value);
53+
54+
@Description(
55+
"The path to write the output file. Can be a local file path, "
56+
+ "a GCS path, or a path to any other supported file systems.")
57+
String getOutputPath();
58+
59+
void setOutputPath(String value);
60+
}
61+
62+
public static PipelineResult.State main(String[] args) {
63+
// Parse the pipeline options passed into the application. Example:
64+
// --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE
65+
// --username=$USERNAME --password=$PASSWORD --outputPath=$OUTPUT_FILE
66+
// For more information, see
67+
// https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
68+
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
69+
Pipeline pipeline = createPipeline(options);
70+
return pipeline.run().waitUntilFinish();
71+
}
72+
73+
public static Pipeline createPipeline(Options options) {
74+
75+
// Create configuration parameters for the Managed I/O transform.
76+
ImmutableMap<String, Object> config =
77+
ImmutableMap.<String, Object>builder()
78+
.put("jdbc_url", options.getJdbcUrl())
79+
.put("location", options.getTable())
80+
.put("username", options.getUsername())
81+
.put("password", options.getPassword())
82+
.build();
83+
84+
// Build the pipeline.
85+
var pipeline = Pipeline.create(options);
86+
pipeline
87+
// Read data from a Postgres database using Managed I/O.
88+
.apply(Managed.read(Managed.POSTGRES).withConfig(config))
89+
.getSinglePCollection()
90+
// Convert each row to a string.
91+
.apply(
92+
MapElements.into(TypeDescriptors.strings())
93+
.via((row -> String.format("%d,%s", row.getInt32("id"), row.getString("name")))))
94+
// Write strings to a text file.
95+
.apply(TextIO.write().to(options.getOutputPath()).withSuffix(".txt").withNumShards(1));
96+
return pipeline;
97+
}
98+
}
99+
// [END dataflow_postgres_read]
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
// [START dataflow_postgres_write]
20+
import static org.apache.beam.sdk.schemas.Schema.toSchema;
21+
22+
import com.google.common.collect.ImmutableMap;
23+
import java.util.Arrays;
24+
import java.util.List;
25+
import java.util.stream.Stream;
26+
import org.apache.beam.sdk.Pipeline;
27+
import org.apache.beam.sdk.PipelineResult;
28+
import org.apache.beam.sdk.managed.Managed;
29+
import org.apache.beam.sdk.options.Description;
30+
import org.apache.beam.sdk.options.PipelineOptions;
31+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
32+
import org.apache.beam.sdk.schemas.Schema;
33+
import org.apache.beam.sdk.transforms.Create;
34+
import org.apache.beam.sdk.values.Row;
35+
36+
public class PostgresWrite {
37+
38+
private static Schema INPUT_SCHEMA =
39+
Stream.of(
40+
Schema.Field.of("id", Schema.FieldType.INT32),
41+
Schema.Field.of("name", Schema.FieldType.STRING))
42+
.collect(toSchema());
43+
44+
private static List<Row> ROWS =
45+
Arrays.asList(
46+
Row.withSchema(INPUT_SCHEMA)
47+
.withFieldValue("id", 1)
48+
.withFieldValue("name", "John Doe")
49+
.build(),
50+
Row.withSchema(INPUT_SCHEMA)
51+
.withFieldValue("id", 2)
52+
.withFieldValue("name", "Jane Smith")
53+
.build());
54+
55+
public interface Options extends PipelineOptions {
56+
@Description("The JDBC URL of the PostgreSQL database to write to.")
57+
String getJdbcUrl();
58+
59+
void setJdbcUrl(String value);
60+
61+
@Description("The PostgresSQL table to write to.")
62+
String getTable();
63+
64+
void setTable(String value);
65+
66+
@Description("The username for the PostgreSQL database.")
67+
String getUsername();
68+
69+
void setUsername(String value);
70+
71+
@Description("The password for the PostgreSQL database.")
72+
String getPassword();
73+
74+
void setPassword(String value);
75+
}
76+
77+
public static PipelineResult.State main(String[] args) {
78+
// Parse the pipeline options passed into the application. Example:
79+
// --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE
80+
// --username=$USERNAME --password=$PASSWORD
81+
// For more information, see
82+
// https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
83+
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
84+
Pipeline pipeline = createPipeline(options);
85+
return pipeline.run().waitUntilFinish();
86+
}
87+
88+
public static Pipeline createPipeline(Options options) {
89+
90+
// Create configuration parameters for the Managed I/O transform.
91+
ImmutableMap<String, Object> config =
92+
ImmutableMap.<String, Object>builder()
93+
.put("jdbc_url", options.getJdbcUrl())
94+
.put("location", options.getTable())
95+
.put("username", options.getUsername())
96+
.put("password", options.getPassword())
97+
.build();
98+
99+
// Build the pipeline.
100+
var pipeline = Pipeline.create(options);
101+
pipeline
102+
// Create data to write to Postgres.
103+
.apply(Create.of(ROWS))
104+
.setRowSchema(INPUT_SCHEMA)
105+
// Write data to a Postgres database using Managed I/O.
106+
.apply(Managed.write(Managed.POSTGRES).withConfig(config))
107+
.getSinglePCollection();
108+
return pipeline;
109+
}
110+
}
111+
// [END dataflow_postgres_write]

dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private void writeTableRecord(Table table)
9898

9999
FileAppender<Record> appender =
100100
Parquet.write(HadoopOutputFile.fromPath(path, hadoopConf))
101-
.createWriterFunc(GenericParquetWriter::buildWriter)
101+
.createWriterFunc(msgType -> GenericParquetWriter.create(table.schema(), msgType))
102102
.schema(table.schema())
103103
.overwrite()
104104
.build();
@@ -166,7 +166,7 @@ public void tearDown() throws IOException, ExecutionException, InterruptedExcept
166166
RemoteStorageHelper.forceDelete(storage, bucketName, 1, TimeUnit.MINUTES);
167167
}
168168
}
169-
169+
170170
@Test
171171
public void testApacheIcebergRestCatalog() throws IOException, InterruptedException {
172172
String warehouse = "gs://" + bucketName;
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertTrue;
21+
22+
import java.io.File;
23+
import java.io.IOException;
24+
import java.nio.file.Files;
25+
import java.nio.file.Paths;
26+
import java.sql.Connection;
27+
import java.sql.DriverManager;
28+
import java.sql.Statement;
29+
import org.apache.beam.sdk.PipelineResult;
30+
import org.junit.After;
31+
import org.junit.Before;
32+
import org.junit.Test;
33+
import org.testcontainers.containers.PostgreSQLContainer;
34+
35+
public class PostgresReadIT {
36+
37+
private static final String TABLE_NAME = "test_read_table";
38+
private static final String OUTPUT_PATH = "test-output";
39+
// The TextIO connector appends this suffix to the pipeline output file.
40+
private static final String OUTPUT_FILE_SUFFIX = "-00000-of-00001.txt";
41+
private static final String OUTPUT_FILE_NAME = OUTPUT_PATH + OUTPUT_FILE_SUFFIX;
42+
43+
private static final PostgreSQLContainer<?> postgres =
44+
new PostgreSQLContainer<>("postgres:15-alpine");
45+
46+
@Before
47+
public void setUp() throws Exception {
48+
postgres.start();
49+
50+
// Initialize the database with table and data.
51+
try (Connection conn =
52+
DriverManager.getConnection(
53+
postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword())) {
54+
55+
Statement stmt = conn.createStatement();
56+
stmt.execute(
57+
String.format("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(255))", TABLE_NAME));
58+
stmt.execute(String.format("INSERT INTO %s (id, name) VALUES (1, 'John Doe')", TABLE_NAME));
59+
stmt.execute(String.format("INSERT INTO %s (id, name) VALUES (2, 'Jane Smith')", TABLE_NAME));
60+
}
61+
}
62+
63+
@After
64+
public void tearDown() throws IOException {
65+
if (postgres != null) {
66+
postgres.stop();
67+
}
68+
Files.deleteIfExists(Paths.get(OUTPUT_FILE_NAME));
69+
}
70+
71+
@Test
72+
public void testPostgresRead() throws IOException {
73+
// Execute the Beam pipeline.
74+
PipelineResult.State state =
75+
PostgresRead.main(
76+
new String[] {
77+
"--runner=DirectRunner",
78+
"--jdbcUrl=" + postgres.getJdbcUrl(),
79+
"--table=" + TABLE_NAME,
80+
"--username=" + postgres.getUsername(),
81+
"--password=" + postgres.getPassword(),
82+
"--outputPath=" + OUTPUT_PATH
83+
});
84+
85+
assertEquals(PipelineResult.State.DONE, state);
86+
verifyOutput();
87+
}
88+
89+
private void verifyOutput() throws IOException {
90+
File outputFile = new File(OUTPUT_FILE_NAME);
91+
assertTrue("Output file should exist", outputFile.exists());
92+
93+
String content = Files.readString(Paths.get(OUTPUT_FILE_NAME));
94+
95+
assertTrue(content.contains("1,John Doe"));
96+
assertTrue(content.contains("2,Jane Smith"));
97+
}
98+
}

0 commit comments

Comments
 (0)