2626import feast .proto .serving .ServingServiceGrpc .ServingServiceBlockingStub ;
2727import feast .proto .types .ValueProto ;
2828import io .grpc .CallCredentials ;
29- import io .grpc .Deadline ;
3029import io .grpc .ManagedChannel ;
3130import io .grpc .ManagedChannelBuilder ;
3231import io .grpc .netty .shaded .io .grpc .netty .GrpcSslContexts ;
@@ -50,6 +49,7 @@ public class FeastClient implements AutoCloseable {
5049
5150 private final ManagedChannel channel ;
5251 private final ServingServiceBlockingStub stub ;
52+ private final long requestTimeout ;
5353
5454 /**
5555 * Create a client to access Feast Serving.
@@ -68,12 +68,14 @@ public static FeastClient create(String host, int port) {
6868 *
6969 * @param host hostname or ip address of Feast serving GRPC server
7070 * @param port port number of Feast serving GRPC server
71- * @param deadline GRPC deadline of Feast serving GRPC server {@link Deadline}
71+ * @param requestTimeout maximum duration for online retrievals from the GRPC server in
72+ * milliseconds, use 0 for no timeout
7273 * @return {@link FeastClient}
7374 */
74- public static FeastClient create (String host , int port , Deadline deadline ) {
75+ public static FeastClient create (String host , int port , long requestTimeout ) {
7576 // configure client with no security config.
76- return FeastClient .createSecure (host , port , SecurityConfig .newBuilder ().build (), deadline );
77+ return FeastClient .createSecure (
78+ host , port , SecurityConfig .newBuilder ().build (), requestTimeout );
7779 }
7880
7981 /**
@@ -86,7 +88,7 @@ public static FeastClient create(String host, int port, Deadline deadline) {
8688 * @return {@link FeastClient}
8789 */
8890 public static FeastClient createSecure (String host , int port , SecurityConfig securityConfig ) {
89- return createSecure (host , port , securityConfig , null );
91+ return FeastClient . createSecure (host , port , securityConfig , 0 );
9092 }
9193
9294 /**
@@ -96,11 +98,17 @@ public static FeastClient createSecure(String host, int port, SecurityConfig sec
9698 * @param port port number of Feast serving GRPC server
9799 * @param securityConfig security options to configure the Feast client. See {@link
98100 * SecurityConfig} for options.
99- * @param deadline GRPC deadline of Feast serving GRPC server {@link Deadline}
101+ * @param requestTimeout maximum duration for online retrievals from the GRPC server in
102+ * milliseconds
100103 * @return {@link FeastClient}
101104 */
102105 public static FeastClient createSecure (
103- String host , int port , SecurityConfig securityConfig , Deadline deadline ) {
106+ String host , int port , SecurityConfig securityConfig , long requestTimeout ) {
107+
108+ if (requestTimeout < 0 ) {
109+ throw new IllegalArgumentException ("Request timeout can't be negative" );
110+ }
111+
104112 // Configure client TLS
105113 ManagedChannel channel = null ;
106114 if (securityConfig .isTLSEnabled ()) {
@@ -127,7 +135,7 @@ public static FeastClient createSecure(
127135 channel = ManagedChannelBuilder .forAddress (host , port ).usePlaintext ().build ();
128136 }
129137
130- return new FeastClient (channel , securityConfig .getCredentials (), Optional . ofNullable ( deadline ) );
138+ return new FeastClient (channel , securityConfig .getCredentials (), requestTimeout );
131139 }
132140
133141 /**
@@ -158,7 +166,10 @@ public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> entities)
158166
159167 requestBuilder .putAllEntities (getEntityValuesMap (entities ));
160168
161- GetOnlineFeaturesResponse response = stub .getOnlineFeatures (requestBuilder .build ());
169+ ServingServiceGrpc .ServingServiceBlockingStub timedStub =
170+ requestTimeout != 0 ? stub .withDeadlineAfter (requestTimeout , TimeUnit .MILLISECONDS ) : stub ;
171+
172+ GetOnlineFeaturesResponse response = timedStub .getOnlineFeatures (requestBuilder .build ());
162173
163174 List <Row > results = Lists .newArrayList ();
164175 if (response .getResultsCount () == 0 ) {
@@ -231,12 +242,13 @@ public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> rows, Str
231242 }
232243
233244 protected FeastClient (ManagedChannel channel , Optional <CallCredentials > credentials ) {
234- this (channel , credentials , Optional . empty () );
245+ this (channel , credentials , - 1 );
235246 }
236247
237248 protected FeastClient (
238- ManagedChannel channel , Optional <CallCredentials > credentials , Optional < Deadline > deadline ) {
249+ ManagedChannel channel , Optional <CallCredentials > credentials , long requestTimeout ) {
239250 this .channel = channel ;
251+ this .requestTimeout = requestTimeout ;
240252 TracingClientInterceptor tracingInterceptor =
241253 TracingClientInterceptor .newBuilder ().withTracer (GlobalTracer .get ()).build ();
242254
@@ -247,10 +259,6 @@ protected FeastClient(
247259 servingStub = servingStub .withCallCredentials (credentials .get ());
248260 }
249261
250- if (deadline .isPresent ()) {
251- servingStub = servingStub .withDeadline (deadline .get ());
252- }
253-
254262 this .stub = servingStub ;
255263 }
256264
0 commit comments