Skip to content

Commit b075406

Browse files
authored
Add Iceberg Rest Catalog Example (#35599)
* Add Iceberg Rest Catalog Example * Add Iceberg Rest Catalog Example * fix imports * fix imports * fix comments * change default bucket name * fix null check * simplify kv
1 parent c040895 commit b075406

1 file changed

Lines changed: 198 additions & 0 deletions

File tree

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.examples.cookbook;
19+
20+
import com.google.auth.oauth2.GoogleCredentials;
21+
import java.io.IOException;
22+
import java.util.Map;
23+
import org.apache.beam.sdk.Pipeline;
24+
import org.apache.beam.sdk.coders.RowCoder;
25+
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
26+
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
27+
import org.apache.beam.sdk.managed.Managed;
28+
import org.apache.beam.sdk.options.Default;
29+
import org.apache.beam.sdk.options.Description;
30+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
31+
import org.apache.beam.sdk.options.Validation;
32+
import org.apache.beam.sdk.schemas.Schema;
33+
import org.apache.beam.sdk.transforms.DoFn;
34+
import org.apache.beam.sdk.transforms.Filter;
35+
import org.apache.beam.sdk.transforms.JsonToRow;
36+
import org.apache.beam.sdk.transforms.MapElements;
37+
import org.apache.beam.sdk.transforms.ParDo;
38+
import org.apache.beam.sdk.transforms.Sum;
39+
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
40+
import org.apache.beam.sdk.transforms.windowing.Window;
41+
import org.apache.beam.sdk.util.Preconditions;
42+
import org.apache.beam.sdk.values.KV;
43+
import org.apache.beam.sdk.values.PCollection;
44+
import org.apache.beam.sdk.values.Row;
45+
import org.apache.beam.sdk.values.TypeDescriptors;
46+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
47+
import org.joda.time.Duration;
48+
49+
/**
50+
* Reads real-time NYC taxi ride information from {@code
51+
* projects/pubsub-public-data/topics/taxirides-realtime} and writes aggregated passenger count data
52+
* to an Iceberg table using Beam's {@link Managed} IcebergIO sink.
53+
*
54+
* <p>This is a streaming pipeline that processes taxi ride events, filters for 'dropoff' status,
55+
* aggregates passenger counts within fixed 10-second windows by minute of the ride, and writes the
56+
* results to a single Iceberg table. The Iceberg sink triggers writes every 30 seconds, creating
57+
* new snapshots.
58+
*
59+
* <p>This example is a demonstration of the Iceberg REST Catalog. For more information, see the
60+
* documentation at {@link https://cloud.google.com/bigquery/docs/blms-rest-catalog}.
61+
*/
62+
public class IcebergRestCatalogStreamingWriteExample {
63+
64+
public static final Schema TAXIRIDES_SCHEMA =
65+
Schema.builder()
66+
.addInt64Field("passenger_count")
67+
.addStringField("ride_status")
68+
.addDateTimeField("timestamp")
69+
.build();
70+
71+
public static final Schema AGGREGATED_SCHEMA =
72+
Schema.builder().addStringField("ride_minute").addInt64Field("passenger_count").build();
73+
74+
/**
75+
* Main entry point for the pipeline.
76+
*
77+
* @param args Command line arguments
78+
* @throws IOException if there's an issue with GoogleCredentials
79+
*/
80+
public static void main(String[] args) throws IOException {
81+
IcebergPipelineOptions options =
82+
PipelineOptionsFactory.fromArgs(args).withValidation().as(IcebergPipelineOptions.class);
83+
options.setProject("apache-beam-testing");
84+
85+
final String tableIdentifier = "taxi_dataset.passenger_count_by_minute";
86+
final String pubsubTopic = options.getTopic();
87+
final String catalogUri = options.getCatalogUri();
88+
final String warehouseLocation = options.getWarehouse();
89+
final String projectName = options.getProject();
90+
final String catalogName = options.getCatalogName();
91+
final int triggeringFrequencySeconds = 30;
92+
93+
// Note: The token expires in 1 hour, and users may need to re-run the pipeline.
94+
// Upcoming changes in Iceberg and the BigLake Metastore with the Iceberg REST Catalog
95+
// will support token refreshing and credential vending.
96+
Map<String, String> catalogProps =
97+
ImmutableMap.<String, String>builder()
98+
.put("type", "rest")
99+
.put("uri", catalogUri)
100+
.put("warehouse", warehouseLocation)
101+
.put("header.x-goog-user-project", projectName)
102+
.put("oauth2-server-uri", "https://oauth2.googleapis.com/token")
103+
.put(
104+
"token",
105+
GoogleCredentials.getApplicationDefault().refreshAccessToken().getTokenValue())
106+
.put("rest-metrics-reporting-enabled", "false")
107+
.build();
108+
109+
Map<String, Object> icebergWriteConfig =
110+
ImmutableMap.<String, Object>builder()
111+
.put("table", tableIdentifier)
112+
.put("catalog_properties", catalogProps)
113+
.put("catalog_name", catalogName)
114+
.put("triggering_frequency_seconds", triggeringFrequencySeconds)
115+
.build();
116+
117+
Pipeline p = Pipeline.create(options);
118+
119+
PCollection<Row> aggregatedRows =
120+
p.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(pubsubTopic))
121+
.apply("ConvertJsonToRow", JsonToRow.withSchema(TAXIRIDES_SCHEMA))
122+
.apply(
123+
"FilterNullFields",
124+
Filter.by(
125+
(Row row) ->
126+
row.getInt64("passenger_count") != null
127+
&& row.getDateTime("timestamp") != null))
128+
.apply(
129+
"FilterDropoffRides",
130+
Filter.by((Row row) -> "dropoff".equals(row.getString("ride_status"))))
131+
.apply(
132+
"ApplyFixedWindow", Window.<Row>into(FixedWindows.of(Duration.standardSeconds(10))))
133+
.apply(
134+
"ExtractMinuteAndPassengerCount",
135+
MapElements.into(
136+
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.longs()))
137+
.via(
138+
row ->
139+
KV.of(
140+
Preconditions.checkStateNotNull(row.getDateTime("timestamp"))
141+
.toString("yyyy-MM-dd HH:mm"),
142+
row.getInt64("passenger_count"))))
143+
.apply("SumPassengerCountPerMinute", Sum.longsPerKey())
144+
.apply(
145+
"FormatAggregatedRowForIceberg",
146+
ParDo.of(
147+
new DoFn<KV<String, Long>, Row>() {
148+
@ProcessElement
149+
public void processElement(
150+
@Element KV<String, Long> kv, OutputReceiver<Row> out) {
151+
Row row =
152+
Row.withSchema(AGGREGATED_SCHEMA)
153+
.withFieldValue("ride_minute", kv.getKey())
154+
.withFieldValue("passenger_count", kv.getValue())
155+
.build();
156+
out.output(row);
157+
}
158+
}))
159+
.setCoder(RowCoder.of(AGGREGATED_SCHEMA));
160+
161+
aggregatedRows.apply(
162+
"WriteToIceberg", Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));
163+
164+
p.run().waitUntilFinish();
165+
}
166+
167+
/** Pipeline options for the IcebergRestCatalogStreamingWriteExample. */
168+
public interface IcebergPipelineOptions extends GcpOptions {
169+
@Description(
170+
"Warehouse location where the table's data will be written to. "
171+
+ "As of 07/14/25 BigLake only supports Single Region buckets")
172+
@Validation.Required
173+
@Default.String("gs://biglake_taxi_rides")
174+
String getWarehouse();
175+
176+
void setWarehouse(String warehouse);
177+
178+
@Description("The URI for the REST catalog.")
179+
@Validation.Required
180+
@Default.String("https://biglake.googleapis.com/iceberg/v1beta/restcatalog")
181+
String getCatalogUri();
182+
183+
void setCatalogUri(String value);
184+
185+
@Description("The Pub/Sub topic to read from.")
186+
@Validation.Required
187+
@Default.String("projects/pubsub-public-data/topics/taxirides-realtime")
188+
String getTopic();
189+
190+
void setTopic(String value);
191+
192+
@Validation.Required
193+
@Default.String("taxi_rides")
194+
String getCatalogName();
195+
196+
void setCatalogName(String catalogName);
197+
}
198+
}

0 commit comments

Comments
 (0)