Skip to content

Commit 256351d

Browse files
committed
fix: Make Java gRPC client use timeouts as expected
Signed-off-by: Jose Acevedo <sharp.acevedo@gmail.com>
1 parent c08e6f9 commit 256351d

File tree

2 files changed

+25
-18
lines changed

2 files changed

+25
-18
lines changed

java/serving-client/src/main/java/dev/feast/FeastClient.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import feast.proto.serving.ServingServiceGrpc.ServingServiceBlockingStub;
2727
import feast.proto.types.ValueProto;
2828
import io.grpc.CallCredentials;
29-
import io.grpc.Deadline;
3029
import io.grpc.ManagedChannel;
3130
import io.grpc.ManagedChannelBuilder;
3231
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
@@ -50,6 +49,7 @@ public class FeastClient implements AutoCloseable {
5049

5150
private final ManagedChannel channel;
5251
private final ServingServiceBlockingStub stub;
52+
private final long requestTimeout;
5353

5454
/**
5555
* Create a client to access Feast Serving.
@@ -68,12 +68,14 @@ public static FeastClient create(String host, int port) {
6868
*
6969
* @param host hostname or ip address of Feast serving GRPC server
7070
* @param port port number of Feast serving GRPC server
71-
* @param deadline GRPC deadline of Feast serving GRPC server {@link Deadline}
71+
* @param requestTimeout maximum duration for online retrievals from the GRPC server in
72+
* milliseconds, use 0 for no timeout
7273
* @return {@link FeastClient}
7374
*/
74-
public static FeastClient create(String host, int port, Deadline deadline) {
75+
public static FeastClient create(String host, int port, long requestTimeout) {
7576
// configure client with no security config.
76-
return FeastClient.createSecure(host, port, SecurityConfig.newBuilder().build(), deadline);
77+
return FeastClient.createSecure(
78+
host, port, SecurityConfig.newBuilder().build(), requestTimeout);
7779
}
7880

7981
/**
@@ -86,7 +88,7 @@ public static FeastClient create(String host, int port, Deadline deadline) {
8688
* @return {@link FeastClient}
8789
*/
8890
public static FeastClient createSecure(String host, int port, SecurityConfig securityConfig) {
89-
return createSecure(host, port, securityConfig, null);
91+
return FeastClient.createSecure(host, port, securityConfig, 0);
9092
}
9193

9294
/**
@@ -96,11 +98,17 @@ public static FeastClient createSecure(String host, int port, SecurityConfig sec
9698
* @param port port number of Feast serving GRPC server
9799
* @param securityConfig security options to configure the Feast client. See {@link
98100
* SecurityConfig} for options.
99-
* @param deadline GRPC deadline of Feast serving GRPC server {@link Deadline}
101+
* @param requestTimeout maximum duration for online retrievals from the GRPC server in
102+
* milliseconds
100103
* @return {@link FeastClient}
101104
*/
102105
public static FeastClient createSecure(
103-
String host, int port, SecurityConfig securityConfig, Deadline deadline) {
106+
String host, int port, SecurityConfig securityConfig, long requestTimeout) {
107+
108+
if (requestTimeout < 0) {
109+
throw new IllegalArgumentException("Request timeout can't be negative");
110+
}
111+
104112
// Configure client TLS
105113
ManagedChannel channel = null;
106114
if (securityConfig.isTLSEnabled()) {
@@ -127,7 +135,7 @@ public static FeastClient createSecure(
127135
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
128136
}
129137

130-
return new FeastClient(channel, securityConfig.getCredentials(), Optional.ofNullable(deadline));
138+
return new FeastClient(channel, securityConfig.getCredentials(), requestTimeout);
131139
}
132140

133141
/**
@@ -158,7 +166,10 @@ public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> entities)
158166

159167
requestBuilder.putAllEntities(getEntityValuesMap(entities));
160168

161-
GetOnlineFeaturesResponse response = stub.getOnlineFeatures(requestBuilder.build());
169+
ServingServiceGrpc.ServingServiceBlockingStub timedStub =
170+
requestTimeout != 0 ? stub.withDeadlineAfter(requestTimeout, TimeUnit.MILLISECONDS) : stub;
171+
172+
GetOnlineFeaturesResponse response = timedStub.getOnlineFeatures(requestBuilder.build());
162173

163174
List<Row> results = Lists.newArrayList();
164175
if (response.getResultsCount() == 0) {
@@ -231,12 +242,13 @@ public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> rows, Str
231242
}
232243

233244
protected FeastClient(ManagedChannel channel, Optional<CallCredentials> credentials) {
234-
this(channel, credentials, Optional.empty());
245+
this(channel, credentials, -1);
235246
}
236247

237248
protected FeastClient(
238-
ManagedChannel channel, Optional<CallCredentials> credentials, Optional<Deadline> deadline) {
249+
ManagedChannel channel, Optional<CallCredentials> credentials, long requestTimeout) {
239250
this.channel = channel;
251+
this.requestTimeout = requestTimeout;
240252
TracingClientInterceptor tracingInterceptor =
241253
TracingClientInterceptor.newBuilder().withTracer(GlobalTracer.get()).build();
242254

@@ -247,10 +259,6 @@ protected FeastClient(
247259
servingStub = servingStub.withCallCredentials(credentials.get());
248260
}
249261

250-
if (deadline.isPresent()) {
251-
servingStub = servingStub.withDeadline(deadline.get());
252-
}
253-
254262
this.stub = servingStub;
255263
}
256264

java/serving-client/src/test/java/dev/feast/FeastClientTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,14 @@
3838
import java.util.HashMap;
3939
import java.util.List;
4040
import java.util.Optional;
41-
import java.util.concurrent.TimeUnit;
4241
import java.util.concurrent.atomic.AtomicBoolean;
4342
import org.junit.Before;
4443
import org.junit.Rule;
4544
import org.junit.Test;
4645

4746
public class FeastClientTest {
4847
private final String AUTH_TOKEN = "test token";
49-
private final Deadline DEADLINE = Deadline.after(2, TimeUnit.SECONDS);
48+
private final long TIMEOUT_MILLIS = 100;
5049

5150
@Rule public GrpcCleanupRule grpcRule;
5251
private AtomicBoolean isAuthenticated;
@@ -88,7 +87,7 @@ public void setup() throws Exception {
8887
ManagedChannel channel =
8988
this.grpcRule.register(
9089
InProcessChannelBuilder.forName(serverName).directExecutor().build());
91-
this.client = new FeastClient(channel, Optional.empty(), Optional.of(DEADLINE));
90+
this.client = new FeastClient(channel, Optional.empty(), TIMEOUT_MILLIS);
9291
}
9392

9493
@Test

0 commit comments

Comments
 (0)