Skip to content

Commit e0719c4

Browse files
committed
Rename ReloadJob to GetJob in ServingService API
1 parent 1cef42c commit e0719c4

5 files changed

Lines changed: 50 additions & 59 deletions

File tree

protos/feast/serving/ServingService.proto

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ service ServingService {
4747
// The client is assumed to have access to these file URIs.
4848
rpc GetBatchFeatures (GetFeaturesRequest) returns (GetBatchFeaturesResponse);
4949

50-
// Reload the job status with the latest state.
51-
rpc ReloadJob(ReloadJobRequest) returns (ReloadJobResponse);
50+
// Get the latest job status for batch feature retrieval.
51+
rpc GetJob (GetJobRequest) returns (GetJobResponse);
5252
}
5353

5454
message GetFeastServingVersionRequest {}
@@ -115,11 +115,11 @@ message GetBatchFeaturesResponse {
115115
Job job = 1;
116116
}
117117

118-
message ReloadJobRequest {
118+
message GetJobRequest {
119119
Job job = 1;
120120
}
121121

122-
message ReloadJobResponse {
122+
message GetJobResponse {
123123
Job job = 1;
124124
}
125125

serving/src/main/java/feast/serving/controller/ServingServiceGRpcController.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
import feast.serving.ServingAPIProto.GetFeastServingVersionRequest;
88
import feast.serving.ServingAPIProto.GetFeastServingVersionResponse;
99
import feast.serving.ServingAPIProto.GetFeaturesRequest;
10+
import feast.serving.ServingAPIProto.GetJobRequest;
11+
import feast.serving.ServingAPIProto.GetJobResponse;
1012
import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse;
11-
import feast.serving.ServingAPIProto.ReloadJobRequest;
12-
import feast.serving.ServingAPIProto.ReloadJobResponse;
1313
import feast.serving.ServingServiceGrpc.ServingServiceImplBase;
1414
import feast.serving.service.ServingService;
1515
import feast.serving.util.RequestHelper;
@@ -79,13 +79,12 @@ public void getBatchFeatures(
7979
responseObserver.onError(e);
8080
}
8181
}
82-
82+
8383
@Override
84-
public void reloadJob(
85-
ReloadJobRequest request, StreamObserver<ReloadJobResponse> responseObserver) {
84+
public void getJob(GetJobRequest request, StreamObserver<GetJobResponse> responseObserver) {
8685
try {
87-
ReloadJobResponse reloadJobStatusResponse = servingService.reloadJob(request);
88-
responseObserver.onNext(reloadJobStatusResponse);
86+
GetJobResponse response = servingService.getJob(request);
87+
responseObserver.onNext(response);
8988
responseObserver.onCompleted();
9089
} catch (Exception e) {
9190
responseObserver.onError(e);

serving/src/main/java/feast/serving/service/BigQueryServingService.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222
import feast.serving.ServingAPIProto.GetFeastServingTypeResponse;
2323
import feast.serving.ServingAPIProto.GetFeaturesRequest;
2424
import feast.serving.ServingAPIProto.GetFeaturesRequest.EntityRow;
25+
import feast.serving.ServingAPIProto.GetJobRequest;
26+
import feast.serving.ServingAPIProto.GetJobResponse;
2527
import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse;
2628
import feast.serving.ServingAPIProto.JobStatus;
2729
import feast.serving.ServingAPIProto.JobType;
28-
import feast.serving.ServingAPIProto.ReloadJobRequest;
29-
import feast.serving.ServingAPIProto.ReloadJobResponse;
3030
import feast.serving.util.BigQueryUtil;
3131
import io.grpc.Status;
3232
import java.util.ArrayList;
@@ -106,12 +106,11 @@ public GetBatchFeaturesResponse getBatchFeatures(GetFeaturesRequest getFeaturesR
106106
if (getFeaturesRequest.getEntityRowsCount() < 1) {
107107
throw Status.INVALID_ARGUMENT
108108
.withDescription(
109-
"entity_dataset_rows is required for batch retrieval in order to filter the retrieved entities.")
109+
"entity_row is required for batch retrieval in order to filter the retrieved entities.")
110110
.asRuntimeException();
111111
}
112112

113-
for (EntityRow entityRow :
114-
getFeaturesRequest.getEntityRowsList()) {
113+
for (EntityRow entityRow : getFeaturesRequest.getEntityRowsList()) {
115114
if (entityRow.getEntityTimestamp().getSeconds() == 0) {
116115
throw Status.INVALID_ARGUMENT
117116
.withDescription(
@@ -169,10 +168,11 @@ public GetBatchFeaturesResponse getBatchFeatures(GetFeaturesRequest getFeaturesR
169168

170169
try {
171170
queryConfig = queryJob.getConfiguration();
171+
172+
// Hardcode the format to Avro for now
172173
String exportTableDestinationUri =
173174
String.format("%s/%s/*.avro", jobStagingLocation, feastJobId);
174175

175-
// Hardcode the format to Avro for now
176176
ExtractJobConfiguration extractConfig =
177177
ExtractJobConfiguration.of(
178178
queryConfig.getDestinationTable(), exportTableDestinationUri, "Avro");
@@ -221,13 +221,13 @@ public GetBatchFeaturesResponse getBatchFeatures(GetFeaturesRequest getFeaturesR
221221
}
222222

223223
@Override
224-
public ReloadJobResponse reloadJob(ReloadJobRequest reloadJobRequest) {
225-
Optional<ServingAPIProto.Job> job = jobService.get(reloadJobRequest.getJob().getId());
224+
public GetJobResponse getJob(GetJobRequest request) {
225+
Optional<ServingAPIProto.Job> job = jobService.get(request.getJob().getId());
226226
if (!job.isPresent()) {
227227
throw Status.NOT_FOUND
228-
.withDescription(String.format("Job not found: %s", reloadJobRequest.getJob().getId()))
228+
.withDescription(String.format("Job not found: %s", request.getJob().getId()))
229229
.asRuntimeException();
230230
}
231-
return ReloadJobResponse.newBuilder().setJob(job.get()).build();
231+
return GetJobResponse.newBuilder().setJob(job.get()).build();
232232
}
233233
}

serving/src/main/java/feast/serving/service/RedisServingService.java

Lines changed: 27 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131
import feast.serving.ServingAPIProto.GetFeaturesRequest;
3232
import feast.serving.ServingAPIProto.GetFeaturesRequest.EntityRow;
3333
import feast.serving.ServingAPIProto.GetFeaturesRequest.FeatureSet;
34+
import feast.serving.ServingAPIProto.GetJobRequest;
35+
import feast.serving.ServingAPIProto.GetJobResponse;
3436
import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse;
3537
import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues;
36-
import feast.serving.ServingAPIProto.ReloadJobRequest;
37-
import feast.serving.ServingAPIProto.ReloadJobResponse;
3838
import feast.storage.RedisProto.RedisKey;
3939
import feast.types.FeatureRowProto.FeatureRow;
4040
import feast.types.FieldProto.Field;
@@ -62,9 +62,7 @@ public RedisServingService(JedisPool jedisPool, SpecService specService, Tracer
6262
this.tracer = tracer;
6363
}
6464

65-
/**
66-
* {@inheritDoc}
67-
*/
65+
/** {@inheritDoc} */
6866
@Override
6967
public GetFeastServingTypeResponse getFeastServingType(
7068
GetFeastServingTypeRequest getFeastServingTypeRequest) {
@@ -73,14 +71,12 @@ public GetFeastServingTypeResponse getFeastServingType(
7371
.build();
7472
}
7573

76-
/**
77-
* {@inheritDoc}
78-
*/
74+
/** {@inheritDoc} */
7975
@Override
8076
public GetOnlineFeaturesResponse getOnlineFeatures(GetFeaturesRequest request) {
8177
try (Scope scope = tracer.buildSpan("Redis-getOnlineFeatures").startActive(true)) {
82-
GetOnlineFeaturesResponse.Builder getOnlineFeaturesResponseBuilder = GetOnlineFeaturesResponse
83-
.newBuilder();
78+
GetOnlineFeaturesResponse.Builder getOnlineFeaturesResponseBuilder =
79+
GetOnlineFeaturesResponse.newBuilder();
8480

8581
List<EntityRow> entityRows = request.getEntityRowsList();
8682
Map<EntityRow, Map<String, Value>> featureValuesMap =
@@ -112,8 +108,8 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetFeaturesRequest request) {
112108
featureSetRequest = featureSetRequest.toBuilder().setMaxAge(defaultMaxAge).build();
113109
}
114110

115-
List<RedisKey> redisKeys = getRedisKeys(featureSetEntityNames, entityRows,
116-
featureSetRequest);
111+
List<RedisKey> redisKeys =
112+
getRedisKeys(featureSetEntityNames, entityRows, featureSetRequest);
117113

118114
try {
119115
sendAndProcessMultiGet(redisKeys, entityRows, featureValuesMap, featureSetRequest);
@@ -124,9 +120,10 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetFeaturesRequest request) {
124120
.asRuntimeException();
125121
}
126122
}
127-
List<FieldValues> fieldValues = featureValuesMap.values().stream()
128-
.map(m -> FieldValues.newBuilder().putAllFields(m).build())
129-
.collect(Collectors.toList());
123+
List<FieldValues> fieldValues =
124+
featureValuesMap.values().stream()
125+
.map(m -> FieldValues.newBuilder().putAllFields(m).build())
126+
.collect(Collectors.toList());
130127
return getOnlineFeaturesResponseBuilder.addAllFieldValues(fieldValues).build();
131128
}
132129
}
@@ -136,9 +133,8 @@ public GetBatchFeaturesResponse getBatchFeatures(GetFeaturesRequest getFeaturesR
136133
throw Status.UNIMPLEMENTED.withDescription("Method not implemented").asRuntimeException();
137134
}
138135

139-
140136
@Override
141-
public ReloadJobResponse reloadJob(ReloadJobRequest reloadJobRequest) {
137+
public GetJobResponse getJob(GetJobRequest getJobRequest) {
142138
throw Status.UNIMPLEMENTED.withDescription("Method not implemented").asRuntimeException();
143139
}
144140

@@ -158,8 +154,7 @@ private List<RedisKey> getRedisKeys(
158154
String featureSetId =
159155
String.format("%s:%s", featureSetRequest.getName(), featureSetRequest.getVersion());
160156
List<RedisKey> redisKeys =
161-
entityRows
162-
.stream()
157+
entityRows.stream()
163158
.map(row -> makeRedisKey(featureSetId, featureSetEntityNames, row))
164159
.collect(Collectors.toList());
165160
return redisKeys;
@@ -175,16 +170,13 @@ private List<RedisKey> getRedisKeys(
175170
* @return {@link RedisKey}
176171
*/
177172
private RedisKey makeRedisKey(
178-
String featureSet,
179-
List<String> featureSetEntityNames,
180-
EntityRow entityRow) {
173+
String featureSet, List<String> featureSetEntityNames, EntityRow entityRow) {
181174
RedisKey.Builder builder = RedisKey.newBuilder().setFeatureSet(featureSet);
182175
Map<String, Value> fieldsMap = entityRow.getFieldsMap();
183176
for (int i = 0; i < featureSetEntityNames.size(); i++) {
184177
String entityName = featureSetEntityNames.get(i);
185-
builder.addEntities(Field.newBuilder()
186-
.setName(entityName)
187-
.setValue(fieldsMap.get(entityName)));
178+
builder.addEntities(
179+
Field.newBuilder().setName(entityName).setValue(fieldsMap.get(entityName)));
188180
}
189181
return builder.build();
190182
}
@@ -199,11 +191,13 @@ private void sendAndProcessMultiGet(
199191
List<byte[]> jedisResps = sendMultiGet(redisKeys);
200192

201193
try (Scope scope = tracer.buildSpan("Redis-processResponse").startActive(true)) {
202-
String featureSetId = String
203-
.format("%s:%d", featureSetRequest.getName(), featureSetRequest.getVersion());
204-
Map<String, Value> nullValues = featureSetRequest.getFeatureNamesList().stream()
205-
.collect(Collectors
206-
.toMap(name -> featureSetId + ":" + name, name -> Value.newBuilder().build()));
194+
String featureSetId =
195+
String.format("%s:%d", featureSetRequest.getName(), featureSetRequest.getVersion());
196+
Map<String, Value> nullValues =
197+
featureSetRequest.getFeatureNamesList().stream()
198+
.collect(
199+
Collectors.toMap(
200+
name -> featureSetId + ":" + name, name -> Value.newBuilder().build()));
207201
for (int i = 0; i < jedisResps.size(); i++) {
208202
EntityRow entityRow = entityRows.get(i);
209203
Map<String, Value> featureValues = featureValuesMap.get(entityRow);
@@ -225,18 +219,16 @@ private void sendAndProcessMultiGet(
225219
}
226220
}
227221

228-
private boolean isStale(FeatureSet featureSetRequest, EntityRow entityRow,
229-
FeatureRow featureRow) {
222+
private boolean isStale(
223+
FeatureSet featureSetRequest, EntityRow entityRow, FeatureRow featureRow) {
230224
if (featureSetRequest.getMaxAge() == Duration.getDefaultInstance()) {
231225
return false;
232226
}
233227
long givenTimestamp = entityRow.getEntityTimestamp().getSeconds();
234228
if (givenTimestamp == 0) {
235229
givenTimestamp = System.currentTimeMillis();
236230
}
237-
long timeDifference =
238-
givenTimestamp - featureRow.getEventTimestamp()
239-
.getSeconds();
231+
long timeDifference = givenTimestamp - featureRow.getEventTimestamp().getSeconds();
240232
return timeDifference > featureSetRequest.getMaxAge().getSeconds();
241233
}
242234

serving/src/main/java/feast/serving/service/ServingService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
import feast.serving.ServingAPIProto.GetFeastServingTypeRequest;
55
import feast.serving.ServingAPIProto.GetFeastServingTypeResponse;
66
import feast.serving.ServingAPIProto.GetFeaturesRequest;
7+
import feast.serving.ServingAPIProto.GetJobRequest;
8+
import feast.serving.ServingAPIProto.GetJobResponse;
79
import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse;
8-
import feast.serving.ServingAPIProto.ReloadJobRequest;
9-
import feast.serving.ServingAPIProto.ReloadJobResponse;
1010

1111
public interface ServingService {
1212
GetFeastServingTypeResponse getFeastServingType(
@@ -16,5 +16,5 @@ GetFeastServingTypeResponse getFeastServingType(
1616

1717
GetBatchFeaturesResponse getBatchFeatures(GetFeaturesRequest getFeaturesRequest);
1818

19-
ReloadJobResponse reloadJob(ReloadJobRequest reloadJobStatusRequest);
19+
GetJobResponse getJob(GetJobRequest getJobRequest);
2020
}

0 commit comments

Comments
 (0)