Skip to content

Commit e85df79

Browse files
authored
Add Managed Iceberg example (#32678)
* add iceberg example * runtime dependency * update to let the sink create tables * remove dependencies
1 parent eed82f0 commit e85df79

2 files changed

Lines changed: 123 additions & 0 deletions

File tree

examples/java/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ dependencies {
6666
implementation project(":sdks:java:extensions:python")
6767
implementation project(":sdks:java:io:google-cloud-platform")
6868
implementation project(":sdks:java:io:kafka")
69+
runtimeOnly project(":sdks:java:io:iceberg")
70+
implementation project(":sdks:java:managed")
6971
implementation project(":sdks:java:extensions:ml")
7072
implementation library.java.avro
7173
implementation library.java.bigdataoss_util
@@ -100,6 +102,8 @@ dependencies {
100102
implementation "org.apache.httpcomponents:httpcore:4.4.13"
101103
implementation "com.fasterxml.jackson.core:jackson-annotations:2.14.1"
102104
implementation "com.fasterxml.jackson.core:jackson-core:2.14.1"
105+
runtimeOnly library.java.hadoop_client
106+
runtimeOnly library.java.bigdataoss_gcs_connector
103107
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
104108
testImplementation project(":sdks:java:io:google-cloud-platform")
105109
testImplementation project(":sdks:java:extensions:ml")
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.examples.cookbook;
19+
20+
import java.util.Arrays;
21+
import java.util.Map;
22+
import org.apache.beam.sdk.Pipeline;
23+
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
24+
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
25+
import org.apache.beam.sdk.managed.Managed;
26+
import org.apache.beam.sdk.options.Default;
27+
import org.apache.beam.sdk.options.Description;
28+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
29+
import org.apache.beam.sdk.options.Validation;
30+
import org.apache.beam.sdk.schemas.Schema;
31+
import org.apache.beam.sdk.schemas.transforms.Filter;
32+
import org.apache.beam.sdk.transforms.JsonToRow;
33+
import org.apache.beam.sdk.values.Row;
34+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
35+
36+
/**
37+
* Reads real-time NYC taxi ride information from {@code
38+
* projects/pubsub-public-data/topics/taxirides-realtime} and writes to Iceberg tables using Beam's
39+
* {@link Managed} IcebergIO sink.
40+
*
41+
* <p>This is a streaming pipeline that writes records to Iceberg tables dynamically, depending on
42+
* each record's passenger count. New tables are created as needed. We set a triggering frequency of
43+
* 10s; at around this interval, the sink will accumulate records and write them to the appropriate
44+
* table, creating a new snapshot each time.
45+
*/
46+
public class IcebergTaxiExamples {
47+
private static final String TAXI_RIDES_TOPIC =
48+
"projects/pubsub-public-data/topics/taxirides-realtime";
49+
private static final Schema TAXI_RIDE_INFO_SCHEMA =
50+
Schema.builder()
51+
.addStringField("ride_id")
52+
.addInt32Field("point_idx")
53+
.addDoubleField("latitude")
54+
.addDoubleField("longitude")
55+
.addStringField("timestamp")
56+
.addDoubleField("meter_reading")
57+
.addDoubleField("meter_increment")
58+
.addStringField("ride_status")
59+
.addInt32Field("passenger_count")
60+
.build();
61+
62+
public static void main(String[] args) {
63+
IcebergPipelineOptions options =
64+
PipelineOptionsFactory.fromArgs(args).as(IcebergPipelineOptions.class);
65+
options.setProject("apache-beam-testing");
66+
67+
// each record's 'passenger_count' value will be substituted in to determine
68+
// its final table destination
69+
// e.g. an event with 3 passengers will be written to 'iceberg_taxi.3_passengers'
70+
String tableIdentifierTemplate = "iceberg_taxi.{passenger_count}_passengers";
71+
72+
Map<String, String> catalogProps =
73+
ImmutableMap.<String, String>builder()
74+
.put("catalog-impl", options.getCatalogImpl())
75+
.put("warehouse", options.getWarehouse())
76+
.build();
77+
Map<String, Object> icebergWriteConfig =
78+
ImmutableMap.<String, Object>builder()
79+
.put("table", tableIdentifierTemplate)
80+
.put("catalog_name", options.getCatalogName())
81+
.put("catalog_properties", catalogProps)
82+
.put("triggering_frequency_seconds", 10)
83+
// perform a final filter to only write these two columns
84+
.put("keep", Arrays.asList("ride_id", "meter_reading"))
85+
.build();
86+
87+
Pipeline p = Pipeline.create(options);
88+
p
89+
// Read taxi ride data
90+
.apply(PubsubIO.readStrings().fromTopic(TAXI_RIDES_TOPIC))
91+
// Convert JSON strings to Beam Rows
92+
.apply(JsonToRow.withSchema(TAXI_RIDE_INFO_SCHEMA))
93+
// Filter to only include drop-offs
94+
.apply(Filter.<Row>create().whereFieldName("ride_status", "dropoff"::equals))
95+
// Write to Iceberg tables
96+
.apply(Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));
97+
p.run();
98+
}
99+
100+
public interface IcebergPipelineOptions extends GcpOptions {
101+
@Description("Warehouse location where the table's data will be written to.")
102+
@Default.String("gs://apache-beam-samples/iceberg-examples")
103+
String getWarehouse();
104+
105+
void setWarehouse(String warehouse);
106+
107+
@Description("Fully-qualified name of the catalog class to use.")
108+
@Default.String("org.apache.iceberg.hadoop.HadoopCatalog")
109+
String getCatalogImpl();
110+
111+
void setCatalogImpl(String catalogName);
112+
113+
@Validation.Required
114+
@Default.String("example-catalog")
115+
String getCatalogName();
116+
117+
void setCatalogName(String catalogName);
118+
}
119+
}

0 commit comments

Comments
 (0)