Skip to content

Commit 67259e0

Browse files
authored
Add PubSub RateLimiter example (#37992)
* Add PubSub RateLimiter example * remove shuffle
1 parent 303a076 commit 67259e0

1 file changed

Lines changed: 135 additions & 0 deletions

File tree

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.examples;
19+
20+
import org.apache.beam.sdk.Pipeline;
21+
import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterContext;
22+
import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterFactory;
23+
import org.apache.beam.sdk.io.components.ratelimiter.RateLimiter;
24+
import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterContext;
25+
import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterFactory;
26+
import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterOptions;
27+
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
28+
import org.apache.beam.sdk.options.Description;
29+
import org.apache.beam.sdk.options.PipelineOptions;
30+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
31+
import org.apache.beam.sdk.options.Validation;
32+
import org.apache.beam.sdk.transforms.DoFn;
33+
import org.apache.beam.sdk.transforms.ParDo;
34+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
35+
import org.checkerframework.checker.nullness.qual.Nullable;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
/**
40+
* A simple example demonstrating how to read from a Pub/Sub topic, rate limit the stream using an
41+
* Envoy Rate Limit Service, and simply log the raw records.
42+
*
43+
* <p>To run this example, you need a running Envoy Rate Limit Service and access to the GCP Pub/Sub
44+
* topic.
45+
*/
46+
public class RateLimitedPubSubReader {
47+
48+
public interface Options extends PipelineOptions {
49+
@Description("Address of the Envoy Rate Limit Service (eg: localhost:8081)")
50+
@Validation.Required
51+
String getRateLimiterAddress();
52+
53+
void setRateLimiterAddress(String value);
54+
55+
@Description("Domain for the Rate Limit Service (eg: mydomain)")
56+
@Validation.Required
57+
String getRateLimiterDomain();
58+
59+
void setRateLimiterDomain(String value);
60+
61+
@Description(
62+
"The Pub/Sub topic to read from (eg: projects/pubsub-public-data/topics/taxirides-realtime)")
63+
@Validation.Required
64+
String getTopic();
65+
66+
void setTopic(String value);
67+
}
68+
69+
static class RateLimitAndLogFn extends DoFn<String, String> {
70+
private final String rlsAddress;
71+
private final String rlsDomain;
72+
private transient @Nullable RateLimiter rateLimiter;
73+
private static final Logger LOG = LoggerFactory.getLogger(RateLimitAndLogFn.class);
74+
75+
public RateLimitAndLogFn(String rlsAddress, String rlsDomain) {
76+
this.rlsAddress = rlsAddress;
77+
this.rlsDomain = rlsDomain;
78+
}
79+
80+
@Setup
81+
public void setup() {
82+
// Create the RateLimiterOptions.
83+
RateLimiterOptions options = RateLimiterOptions.builder().setAddress(rlsAddress).build();
84+
85+
// Static RateLimiter with pre-configured domain and descriptors
86+
RateLimiterFactory factory = new EnvoyRateLimiterFactory(options);
87+
RateLimiterContext context =
88+
EnvoyRateLimiterContext.builder()
89+
.setDomain(rlsDomain)
90+
.addDescriptor("database", "users") // generic descriptors
91+
.build();
92+
this.rateLimiter = factory.getLimiter(context);
93+
}
94+
95+
@Teardown
96+
public void teardown() {
97+
if (rateLimiter != null) {
98+
try {
99+
rateLimiter.close();
100+
} catch (Exception e) {
101+
LOG.warn("Failed to close RateLimiter", e);
102+
}
103+
}
104+
}
105+
106+
@ProcessElement
107+
public void processElement(ProcessContext c) throws Exception {
108+
String element = c.element();
109+
try {
110+
Preconditions.checkNotNull(rateLimiter).allow(1);
111+
} catch (Exception e) {
112+
throw new RuntimeException("Failed to acquire rate limit token", e);
113+
}
114+
115+
// Simulate external API call or simply log the read entry
116+
Thread.sleep(100);
117+
LOG.info("Received and rate-limited record: {}", element);
118+
c.output(element);
119+
}
120+
}
121+
122+
public static void main(String[] args) {
123+
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
124+
Pipeline p = Pipeline.create(options);
125+
126+
p.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getTopic()))
127+
.apply(
128+
"RateLimitAndLog",
129+
ParDo.of(
130+
new RateLimitAndLogFn(
131+
options.getRateLimiterAddress(), options.getRateLimiterDomain())));
132+
133+
p.run().waitUntilFinish();
134+
}
135+
}

0 commit comments

Comments
 (0)