Skip to content

Commit cbf3674

Browse files
Add Support for EnvoyRateLimiter Implementation (#37573)
* Add EnvoyRateLimiter Implementation * Add example * fix style check * simplify teardown Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * add more jitter Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * handle thread interrupt Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * add connection keep-alive configs --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 72ce207 commit cbf3674

13 files changed

Lines changed: 1023 additions & 0 deletions

File tree

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,7 @@ class BeamModulePlugin implements Plugin<Project> {
726726
commons_logging : "commons-logging:commons-logging:1.2",
727727
commons_math3 : "org.apache.commons:commons-math3:3.6.1",
728728
dbcp2 : "org.apache.commons:commons-dbcp2:$dbcp2_version",
729+
envoy_control_plane_api : "io.envoyproxy.controlplane:api:1.0.49",
729730
error_prone_annotations : "com.google.errorprone:error_prone_annotations:$errorprone_version",
730731
failsafe : "dev.failsafe:failsafe:3.3.0",
731732
flogger_system_backend : "com.google.flogger:flogger-system-backend:0.7.4",

examples/java/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ dependencies {
5252
implementation project(":sdks:java:extensions:avro")
5353
implementation project(":sdks:java:extensions:google-cloud-platform-core")
5454
implementation project(":sdks:java:extensions:python")
55+
implementation project(":sdks:java:io:components")
5556
implementation project(":sdks:java:io:google-cloud-platform")
5657
implementation project(":sdks:java:io:kafka")
5758
implementation project(":sdks:java:extensions:ml")
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.examples;
19+
20+
import java.util.stream.Collectors;
21+
import java.util.stream.IntStream;
22+
import org.apache.beam.sdk.Pipeline;
23+
import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterContext;
24+
import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterFactory;
25+
import org.apache.beam.sdk.io.components.ratelimiter.RateLimiter;
26+
import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterContext;
27+
import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterFactory;
28+
import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterOptions;
29+
import org.apache.beam.sdk.options.Description;
30+
import org.apache.beam.sdk.options.PipelineOptions;
31+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
32+
import org.apache.beam.sdk.transforms.Create;
33+
import org.apache.beam.sdk.transforms.DoFn;
34+
import org.apache.beam.sdk.transforms.ParDo;
35+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
36+
import org.checkerframework.checker.nullness.qual.Nullable;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
/**
41+
* A simple example demonstrating how to use the {@link RateLimiter} in a custom {@link DoFn}.
42+
*
43+
* <p>This pipeline creates a small set of elements and processes them using a DoFn that calls an
44+
* external service (simulated). The processing is rate-limited using an Envoy Rate Limit Service.
45+
*
46+
* <p>To run this example, you need a running Envoy Rate Limit Service.
47+
*/
48+
public class RateLimiterSimple {
49+
50+
public interface Options extends PipelineOptions {
51+
@Description("Address of the Envoy Rate Limit Service(eg:localhost:8081)")
52+
String getRateLimiterAddress();
53+
54+
void setRateLimiterAddress(String value);
55+
56+
@Description("Domain for the Rate Limit Service(eg:mydomain)")
57+
String getRateLimiterDomain();
58+
59+
void setRateLimiterDomain(String value);
60+
}
61+
62+
static class CallExternalServiceFn extends DoFn<String, String> {
63+
private final String rlsAddress;
64+
private final String rlsDomain;
65+
private transient @Nullable RateLimiter rateLimiter;
66+
private static final Logger LOG = LoggerFactory.getLogger(CallExternalServiceFn.class);
67+
68+
public CallExternalServiceFn(String rlsAddress, String rlsDomain) {
69+
this.rlsAddress = rlsAddress;
70+
this.rlsDomain = rlsDomain;
71+
}
72+
73+
@Setup
74+
public void setup() {
75+
// Create the RateLimiterOptions.
76+
RateLimiterOptions options = RateLimiterOptions.builder().setAddress(rlsAddress).build();
77+
78+
// Static RateLimtier with pre-configured domain and descriptors
79+
RateLimiterFactory factory = new EnvoyRateLimiterFactory(options);
80+
RateLimiterContext context =
81+
EnvoyRateLimiterContext.builder()
82+
.setDomain(rlsDomain)
83+
.addDescriptor("database", "users")
84+
.build();
85+
this.rateLimiter = factory.getLimiter(context);
86+
}
87+
88+
@Teardown
89+
public void teardown() {
90+
if (rateLimiter != null) {
91+
try {
92+
rateLimiter.close();
93+
} catch (Exception e) {
94+
LOG.warn("Failed to close RateLimiter", e);
95+
}
96+
}
97+
}
98+
99+
@ProcessElement
100+
public void processElement(ProcessContext c) throws Exception {
101+
String element = c.element();
102+
try {
103+
Preconditions.checkNotNull(rateLimiter).allow(1);
104+
} catch (Exception e) {
105+
throw new RuntimeException("Failed to acquire rate limit token", e);
106+
}
107+
108+
// Simulate external API call
109+
LOG.info("Processing: " + element);
110+
Thread.sleep(100);
111+
c.output("Processed: " + element);
112+
}
113+
}
114+
115+
public static void main(String[] args) {
116+
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
117+
Pipeline p = Pipeline.create(options);
118+
119+
p.apply(
120+
"CreateItems",
121+
Create.of(
122+
IntStream.range(0, 100).mapToObj(i -> "item" + i).collect(Collectors.toList())))
123+
.apply(
124+
"CallExternalService",
125+
ParDo.of(
126+
new CallExternalServiceFn(
127+
options.getRateLimiterAddress(), options.getRateLimiterDomain())));
128+
129+
p.run().waitUntilFinish();
130+
}
131+
}

sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByKeyIT.*" />
6363
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*ValidateRunnerXlangTest.*" />
6464
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*ml.*" />
65+
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*components.*ratelimiter.*" />
6566
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*gcp.*" />
6667
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*googleads.*DummyRateLimitPolicy\.java" />
6768
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*googleads.*GoogleAds.*\.java" />

sdks/java/io/components/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ ext.summary = "Components for building fully featured IOs"
2626

2727
dependencies {
2828
implementation project(path: ":sdks:java:core", configuration: "shadow")
29+
implementation library.java.auto_value_annotations
30+
implementation library.java.envoy_control_plane_api
31+
implementation library.java.grpc_api
32+
implementation library.java.grpc_stub
33+
implementation library.java.grpc_protobuf
2934
implementation library.java.protobuf_java
3035
permitUnusedDeclared library.java.protobuf_java // BEAM-11761
3136
implementation library.java.slf4j_api
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.components.ratelimiter;
19+
20+
import java.io.IOException;
21+
22+
/**
23+
* A lightweight handle for an Envoy-based rate limiter.
24+
*
25+
* <p>Delegates work to the {@link EnvoyRateLimiterFactory} using the baked-in {@link
26+
* EnvoyRateLimiterContext}.
27+
*/
28+
public class EnvoyRateLimiter implements RateLimiter {
29+
private final EnvoyRateLimiterFactory factory;
30+
private final EnvoyRateLimiterContext context;
31+
32+
public EnvoyRateLimiter(EnvoyRateLimiterFactory factory, EnvoyRateLimiterContext context) {
33+
this.factory = factory;
34+
this.context = context;
35+
}
36+
37+
@Override
38+
public boolean allow(int permits) throws IOException, InterruptedException {
39+
return factory.allow(context, permits);
40+
}
41+
42+
@Override
43+
public void close() throws Exception {
44+
factory.close();
45+
}
46+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.components.ratelimiter;
19+
20+
import com.google.auto.value.AutoValue;
21+
import java.util.Map;
22+
import org.apache.beam.sdk.schemas.AutoValueSchema;
23+
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
24+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
25+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
26+
import org.checkerframework.checker.nullness.qual.NonNull;
27+
28+
/**
29+
* Context for an Envoy Rate Limiter call.
30+
*
31+
* <p>Contains the domain and descriptors required to define a specific rate limit bucket.
32+
*/
33+
@DefaultSchema(AutoValueSchema.class)
34+
@AutoValue
35+
public abstract class EnvoyRateLimiterContext implements RateLimiterContext {
36+
37+
@SchemaFieldDescription("Domain of the rate limiter.")
38+
public abstract String getDomain();
39+
40+
@SchemaFieldDescription("Descriptors for the rate limiter.")
41+
public abstract ImmutableMap<String, String> getDescriptors();
42+
43+
public static Builder builder() {
44+
return new AutoValue_EnvoyRateLimiterContext.Builder();
45+
}
46+
47+
@AutoValue.Builder
48+
public abstract static class Builder {
49+
public abstract Builder setDomain(@NonNull String domain);
50+
51+
public abstract ImmutableMap.Builder<String, String> descriptorsBuilder();
52+
53+
public Builder addDescriptor(@NonNull String key, @NonNull String value) {
54+
descriptorsBuilder().put(key, value);
55+
return this;
56+
}
57+
58+
public Builder setDescriptors(@NonNull Map<String, String> descriptors) {
59+
descriptorsBuilder().putAll(descriptors);
60+
return this;
61+
}
62+
63+
public abstract EnvoyRateLimiterContext build();
64+
}
65+
}

0 commit comments

Comments
 (0)