Skip to content

Commit d10249d

Browse files
docs(samples): Add snippets for Apache Iceberg using Managed I/O (GoogleCloudPlatform#9339)
* Add snippets for Apached Iceberg using Managed I/O * Fix region tag * Fix dependency * Address review feedback * Add comment about Schema type in IT
1 parent 34c4c01 commit d10249d

File tree

4 files changed

+422
-1
lines changed

4 files changed

+422
-1
lines changed

dataflow/snippets/pom.xml

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@
3737
<maven.compiler.source>11</maven.compiler.source>
3838
<maven.compiler.target>11</maven.compiler.target>
3939
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
40-
<apache_beam.version>2.54.0</apache_beam.version>
40+
<apache_beam.version>2.56.0</apache_beam.version>
4141
<slf4j.version>2.0.12</slf4j.version>
42+
<parquet.version>1.14.0</parquet.version>
43+
<iceberg.version>1.4.2</iceberg.version>
4244
</properties>
4345

4446
<build>
@@ -119,6 +121,41 @@
119121
</exclusions>
120122
</dependency>
121123

124+
125+
<!-- Managed I/O -->
126+
<dependency>
127+
<groupId>org.apache.beam</groupId>
128+
<artifactId>beam-sdks-java-managed</artifactId>
129+
<version>${apache_beam.version}</version>
130+
</dependency>
131+
132+
<!-- Apache Iceberg I/O -->
133+
<dependency>
134+
<groupId>org.apache.beam</groupId>
135+
<artifactId>beam-sdks-java-io-iceberg</artifactId>
136+
<version>${apache_beam.version}</version>
137+
</dependency>
138+
<dependency>
139+
<groupId>org.apache.parquet</groupId>
140+
<artifactId>parquet-column</artifactId>
141+
<version>${parquet.version}</version>
142+
</dependency>
143+
<dependency>
144+
<groupId>org.apache.parquet</groupId>
145+
<artifactId>parquet-hadoop</artifactId>
146+
<version>${parquet.version}</version>
147+
</dependency>
148+
<dependency>
149+
<groupId>org.apache.hadoop</groupId>
150+
<artifactId>hadoop-client-runtime</artifactId>
151+
<version>3.4.0</version>
152+
</dependency>
153+
<dependency>
154+
<groupId>org.apache.iceberg</groupId>
155+
<artifactId>iceberg-data</artifactId>
156+
<version>${iceberg.version}</version>
157+
</dependency>
158+
122159
<!-- Google Cloud -->
123160
<dependency>
124161
<groupId>com.google.cloud</groupId>
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
// [START dataflow_apache_iceberg_read]
20+
import com.google.common.collect.ImmutableMap;
21+
import java.util.Map;
22+
import org.apache.beam.sdk.Pipeline;
23+
import org.apache.beam.sdk.io.TextIO;
24+
import org.apache.beam.sdk.managed.Managed;
25+
import org.apache.beam.sdk.options.Description;
26+
import org.apache.beam.sdk.options.PipelineOptions;
27+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
28+
import org.apache.beam.sdk.transforms.MapElements;
29+
import org.apache.beam.sdk.values.PCollectionRowTuple;
30+
import org.apache.beam.sdk.values.TypeDescriptors;
31+
32+
public class ApacheIcebergRead {
33+
34+
static final String CATALOG_TYPE = "hadoop";
35+
36+
public interface Options extends PipelineOptions {
37+
@Description("The URI of the Apache Iceberg warehouse location")
38+
String getWarehouseLocation();
39+
40+
void setWarehouseLocation(String value);
41+
42+
@Description("Path to write the output file")
43+
String getOutputPath();
44+
45+
void setOutputPath(String value);
46+
47+
@Description("The name of the Apache Iceberg catalog")
48+
String getCatalogName();
49+
50+
void setCatalogName(String value);
51+
52+
@Description("The name of the table to write to")
53+
String getTableName();
54+
55+
void setTableName(String value);
56+
}
57+
58+
public static void main(String[] args) {
59+
60+
// Parse the pipeline options passed into the application. Example:
61+
// --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
62+
// -tableName= $TABLE_NAME --outputPath=$OUTPUT_FILE
63+
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
64+
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
65+
Pipeline pipeline = Pipeline.create(options);
66+
67+
// Configure the Iceberg source I/O
68+
Map catalogConfig = ImmutableMap.<String, Object>builder()
69+
.put("catalog_name", options.getCatalogName())
70+
.put("warehouse_location", options.getWarehouseLocation())
71+
.put("catalog_type", CATALOG_TYPE)
72+
.build();
73+
74+
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
75+
.put("table", options.getTableName())
76+
.put("catalog_config", catalogConfig)
77+
.build();
78+
79+
// Build the pipeline.
80+
PCollectionRowTuple.empty(pipeline).apply(
81+
Managed.read(Managed.ICEBERG)
82+
.withConfig(config)
83+
)
84+
.get("output")
85+
// Format each record as a string with the format 'id:name'.
86+
.apply(MapElements
87+
.into(TypeDescriptors.strings())
88+
.via((row -> {
89+
return String.format("%d:%s",
90+
row.getInt64("id"),
91+
row.getString("name"));
92+
})))
93+
// Write to a text file.
94+
.apply(
95+
TextIO.write()
96+
.to(options.getOutputPath())
97+
.withNumShards(1)
98+
.withSuffix(".txt"));
99+
100+
pipeline.run().waitUntilFinish();
101+
}
102+
}
103+
// [END dataflow_apache_iceberg_read]
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
// [START dataflow_apache_iceberg_write]
20+
import com.google.common.collect.ImmutableMap;
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import java.util.Map;
24+
import org.apache.beam.sdk.Pipeline;
25+
import org.apache.beam.sdk.managed.Managed;
26+
import org.apache.beam.sdk.options.Description;
27+
import org.apache.beam.sdk.options.PipelineOptions;
28+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
29+
import org.apache.beam.sdk.schemas.Schema;
30+
import org.apache.beam.sdk.transforms.Create;
31+
import org.apache.beam.sdk.transforms.JsonToRow;
32+
import org.apache.beam.sdk.values.PCollectionRowTuple;
33+
34+
public class ApacheIcebergWrite {
35+
static final List<String> TABLE_ROWS = Arrays.asList(
36+
"{\"id\":0, \"name\":\"Alice\"}",
37+
"{\"id\":1, \"name\":\"Bob\"}",
38+
"{\"id\":2, \"name\":\"Charles\"}"
39+
);
40+
41+
static final String CATALOG_TYPE = "hadoop";
42+
43+
// The schema for the table rows.
44+
public static final Schema SCHEMA = new Schema.Builder()
45+
.addStringField("name")
46+
.addInt64Field("id")
47+
.build();
48+
49+
public interface Options extends PipelineOptions {
50+
@Description("The URI of the Apache Iceberg warehouse location")
51+
String getWarehouseLocation();
52+
53+
void setWarehouseLocation(String value);
54+
55+
@Description("The name of the Apache Iceberg catalog")
56+
String getCatalogName();
57+
58+
void setCatalogName(String value);
59+
60+
@Description("The name of the table to write to")
61+
String getTableName();
62+
63+
void setTableName(String value);
64+
}
65+
66+
public static void main(String[] args) {
67+
68+
// Parse the pipeline options passed into the application. Example:
69+
// --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
70+
// --tableName= $TABLE_NAME
71+
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
72+
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
73+
Pipeline pipeline = Pipeline.create(options);
74+
75+
// Configure the Iceberg source I/O
76+
Map catalogConfig = ImmutableMap.<String, Object>builder()
77+
.put("catalog_name", options.getCatalogName())
78+
.put("warehouse_location", options.getWarehouseLocation())
79+
.put("catalog_type", CATALOG_TYPE)
80+
.build();
81+
82+
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
83+
.put("table", options.getTableName())
84+
.put("catalog_config", catalogConfig)
85+
.build();
86+
87+
// Build the pipeline.
88+
var input = pipeline
89+
.apply(Create.of(TABLE_ROWS))
90+
.apply(JsonToRow.withSchema(SCHEMA));
91+
92+
PCollectionRowTuple.of("input", input).apply(
93+
Managed.write(Managed.ICEBERG)
94+
.withConfig(config)
95+
);
96+
97+
pipeline.run().waitUntilFinish();
98+
}
99+
}
100+
// [END dataflow_apache_iceberg_write]

0 commit comments

Comments
 (0)