Skip to content

Commit 0331f20

Browse files
authored
Add Example for Iceberg REST Catalog CDC (#35649)
* Add Example for Iceberg REST Catalog CDC * fix constants * Add more Iceberg feature demos * fix spotless * change table name to metrics * Add ability to trigger full functionality from CDC * make topic name final
1 parent fb1fab0 commit 0331f20

2 files changed

Lines changed: 401 additions & 69 deletions

File tree

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.examples.cookbook;
19+
20+
import static org.apache.beam.sdk.managed.Managed.ICEBERG_CDC;
21+
22+
import com.google.auth.oauth2.GoogleCredentials;
23+
import java.io.IOException;
24+
import java.util.Map;
25+
import org.apache.beam.sdk.Pipeline;
26+
import org.apache.beam.sdk.coders.RowCoder;
27+
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
28+
import org.apache.beam.sdk.managed.Managed;
29+
import org.apache.beam.sdk.options.Default;
30+
import org.apache.beam.sdk.options.Description;
31+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
32+
import org.apache.beam.sdk.schemas.Schema;
33+
import org.apache.beam.sdk.transforms.DoFn;
34+
import org.apache.beam.sdk.transforms.ParDo;
35+
import org.apache.beam.sdk.transforms.Sum;
36+
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
37+
import org.apache.beam.sdk.transforms.windowing.Window;
38+
import org.apache.beam.sdk.util.Preconditions;
39+
import org.apache.beam.sdk.values.KV;
40+
import org.apache.beam.sdk.values.PCollection;
41+
import org.apache.beam.sdk.values.Row;
42+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
43+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
44+
import org.joda.time.DateTime;
45+
import org.joda.time.Duration;
46+
47+
/**
48+
* This pipeline demonstrates how to read a continuous stream of change data capture (CDC) events
49+
* from an Apache Iceberg table. It processes these events to calculate the hourly total of
50+
* passengers and writes the aggregated results into a new Iceberg table.
51+
*
52+
* <p>This pipeline can be used to process the output of {@link
53+
* IcebergRestCatalogStreamingWriteExample}.
54+
*
55+
* <p>This pipeline also includes a flag {@code --triggerStreamingWrite} which, when enabled, will
56+
* start the {@link IcebergRestCatalogStreamingWriteExample} in a separate thread to populate the
57+
* source table. This is useful for execute the end-to-end functionality.
58+
*
59+
* <p>This example is a demonstration of the Iceberg REST Catalog. For more information, see the
60+
* documentation at {@link https://cloud.google.com/bigquery/docs/blms-rest-catalog}.
61+
*
62+
* <p>For more information on Apache Beam Iceberg Managed-IO features, see the documentation at
63+
* {@link https://beam.apache.org/documentation/io/managed-io/}.
64+
*/
65+
public class IcebergRestCatalogCDCExample {
66+
67+
// Schema for the source table containing minute-level aggregated data
68+
public static final Schema SOURCE_SCHEMA =
69+
Schema.builder().addDateTimeField("ride_minute").addInt64Field("passenger_count").build();
70+
71+
// Schema for the destination table containing hourly aggregated data
72+
public static final Schema HOURLY_PASSENGER_COUNT_SCHEMA =
73+
Schema.builder().addDateTimeField("ride_hour").addInt64Field("passenger_count").build();
74+
75+
public static void main(String[] args) throws IOException {
76+
IcebergCdcOptions options =
77+
PipelineOptionsFactory.fromArgs(args).withValidation().as(IcebergCdcOptions.class);
78+
79+
if (options.getTriggerStreamingWrite()) {
80+
new Thread(
81+
() -> {
82+
try {
83+
IcebergRestCatalogStreamingWriteExample.main(
84+
new String[] {
85+
"--runner=" + options.getRunner().getSimpleName(),
86+
"--project=" + options.getProject(),
87+
"--icebergTable=" + options.getSourceTable(),
88+
"--catalogUri=" + options.getCatalogUri(),
89+
"--warehouse=" + options.getWarehouse(),
90+
"--catalogName=" + options.getCatalogName()
91+
});
92+
} catch (IOException e) {
93+
throw new RuntimeException(e);
94+
}
95+
})
96+
.start();
97+
}
98+
99+
final String sourceTable = options.getSourceTable();
100+
final String destinationTable = options.getDestinationTable();
101+
final String catalogUri = options.getCatalogUri();
102+
final String warehouseLocation = options.getWarehouse();
103+
final String projectName = options.getProject();
104+
final String catalogName = options.getCatalogName();
105+
final int pollIntervalSeconds = 120;
106+
final int triggeringFrequencySeconds = 30;
107+
108+
// Note: The token expires in 1 hour. Users may need to re-run the pipeline.
109+
// Future updates to Iceberg and the BigLake Metastore will support token refreshing.
110+
Map<String, String> catalogProps =
111+
ImmutableMap.<String, String>builder()
112+
.put("type", "rest")
113+
.put("uri", catalogUri)
114+
.put("warehouse", warehouseLocation)
115+
.put("header.x-goog-user-project", projectName)
116+
.put("oauth2-server-uri", "https://oauth2.googleapis.com/token")
117+
.put(
118+
"token",
119+
GoogleCredentials.getApplicationDefault().refreshAccessToken().getTokenValue())
120+
.put("rest-metrics-reporting-enabled", "false")
121+
.build();
122+
123+
Pipeline p = Pipeline.create(options);
124+
125+
// Configure the Iceberg CDC read
126+
Map<String, Object> icebergReadConfig =
127+
ImmutableMap.<String, Object>builder()
128+
.put("table", sourceTable)
129+
.put("filter", "\"ride_minute\" IS NOT NULL AND \"passenger_count\" IS NOT NULL")
130+
.put("keep", ImmutableList.of("ride_minute", "passenger_count"))
131+
.put("catalog_name", catalogName)
132+
.put("catalog_properties", catalogProps)
133+
.put("streaming", true)
134+
.put("poll_interval_seconds", pollIntervalSeconds)
135+
.build();
136+
137+
// Read CDC events from the source Iceberg table
138+
PCollection<Row> cdcEvents =
139+
p.apply("ReadFromIceberg", Managed.read(ICEBERG_CDC).withConfig(icebergReadConfig))
140+
.getSinglePCollection()
141+
.setRowSchema(SOURCE_SCHEMA);
142+
143+
// Aggregate passenger counts per hour
144+
PCollection<Row> aggregatedRows =
145+
cdcEvents
146+
.apply(
147+
"ApplyHourlyWindow",
148+
Window.<Row>into(FixedWindows.of(Duration.standardMinutes(10))))
149+
.apply("ExtractHourAndCount", ParDo.of(new ExtractHourAndPassengerCount()))
150+
.apply("SumPassengerCountPerHour", Sum.longsPerKey())
151+
.apply("FormatToRow", ParDo.of(new FormatAggregatedRow()))
152+
.setCoder(RowCoder.of(HOURLY_PASSENGER_COUNT_SCHEMA));
153+
154+
// Configure the Iceberg write
155+
Map<String, Object> icebergWriteConfig =
156+
ImmutableMap.<String, Object>builder()
157+
.put("table", destinationTable)
158+
.put("partition_fields", ImmutableList.of("day(ride_hour)"))
159+
.put("catalog_properties", catalogProps)
160+
.put("catalog_name", catalogName)
161+
.put("triggering_frequency_seconds", triggeringFrequencySeconds)
162+
.build();
163+
164+
// Write the aggregated results to the destination Iceberg table
165+
aggregatedRows.apply(
166+
"WriteToIceberg", Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));
167+
168+
p.run().waitUntilFinish();
169+
}
170+
171+
private static class FormatAggregatedRow extends DoFn<KV<String, Long>, Row> {
172+
@ProcessElement
173+
public void processElement(@Element KV<String, Long> kv, OutputReceiver<Row> out) {
174+
Row row =
175+
Row.withSchema(HOURLY_PASSENGER_COUNT_SCHEMA)
176+
.withFieldValue("ride_hour", DateTime.parse(kv.getKey()))
177+
.withFieldValue("passenger_count", kv.getValue())
178+
.build();
179+
out.output(row);
180+
}
181+
}
182+
183+
private static class ExtractHourAndPassengerCount extends DoFn<Row, KV<String, Long>> {
184+
@ProcessElement
185+
public void processElement(@Element Row row, OutputReceiver<KV<String, Long>> out) {
186+
DateTime rideHour =
187+
((DateTime) Preconditions.checkStateNotNull(row.getDateTime("ride_minute")))
188+
.withSecondOfMinute(0)
189+
.withMillisOfSecond(0);
190+
out.output(
191+
KV.of(
192+
rideHour.toString(),
193+
Preconditions.checkStateNotNull(row.getInt64("passenger_count"))));
194+
}
195+
}
196+
197+
/** Pipeline options for this example. */
198+
public interface IcebergCdcOptions extends GcpOptions {
199+
@Description("The source Iceberg table to read CDC events from")
200+
@Default.String("taxi_dataset.ride_metrics_by_minute")
201+
String getSourceTable();
202+
203+
void setSourceTable(String sourceTable);
204+
205+
@Description("The destination Iceberg table to write aggregated results to")
206+
@Default.String("taxi_dataset.passenger_count_by_hour")
207+
String getDestinationTable();
208+
209+
void setDestinationTable(String destinationTable);
210+
211+
@Description("Warehouse location for the Iceberg catalog")
212+
@Default.String("gs://biglake_taxi_ride_metrics")
213+
String getWarehouse();
214+
215+
void setWarehouse(String warehouse);
216+
217+
@Description("The URI for the REST catalog")
218+
@Default.String("https://biglake.googleapis.com/iceberg/v1beta/restcatalog")
219+
String getCatalogUri();
220+
221+
void setCatalogUri(String value);
222+
223+
@Description("The name of the Iceberg catalog")
224+
@Default.String("taxi_rides")
225+
String getCatalogName();
226+
227+
void setCatalogName(String catalogName);
228+
229+
@Description("Trigger the streaming write example")
230+
@Default.Boolean(false)
231+
boolean getTriggerStreamingWrite();
232+
233+
void setTriggerStreamingWrite(boolean triggerStreamingWrite);
234+
}
235+
}

0 commit comments

Comments
 (0)