Skip to content

Commit e14f89b

Browse files
committed
Checkpoint
1 parent 1753e7b commit e14f89b

4 files changed

Lines changed: 102 additions & 3 deletions

File tree

serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public ServingService servingService(
6969
BigQueryConfig bqConfig = store.getBigqueryConfig();
7070
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
7171
servingService =
72-
new BigQueryServingService(bigquery, bqConfig.getProjectId(), bqConfig.getDatasetId());
72+
new BigQueryServingService(bigquery, bqConfig.getProjectId(), bqConfig.getDatasetId(), specService);
7373
break;
7474
case CASSANDRA:
7575
case UNRECOGNIZED:

serving/src/main/java/feast/serving/service/BigQueryServingService.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package feast.serving.service;
22

33
import com.google.cloud.bigquery.BigQuery;
4+
import feast.core.CoreServiceProto.GetFeatureSetsRequest;
5+
import feast.core.CoreServiceProto.GetFeatureSetsRequest.Filter;
6+
import feast.core.FeatureSetProto.FeatureSetSpec;
47
import feast.serving.ServingAPIProto.FeastServingType;
58
import feast.serving.ServingAPIProto.GetBatchFeaturesFromCompletedJobRequest;
69
import feast.serving.ServingAPIProto.GetBatchFeaturesFromCompletedJobResponse;
@@ -15,17 +18,23 @@
1518
import feast.serving.ServingAPIProto.LoadBatchFeaturesResponse;
1619
import feast.serving.ServingAPIProto.ReloadJobStatusRequest;
1720
import feast.serving.ServingAPIProto.ReloadJobStatusResponse;
21+
import feast.serving.util.BigQueryUtil;
1822
import io.grpc.Status;
23+
import java.util.List;
24+
import java.util.stream.Collectors;
1925

2026
public class BigQueryServingService implements ServingService {
2127
private final BigQuery bigquery;
2228
private final String projectId;
2329
private final String datasetId;
30+
private final SpecService specService;
2431

25-
public BigQueryServingService(BigQuery bigquery, String projectId, String datasetId) {
32+
public BigQueryServingService(
33+
BigQuery bigquery, String projectId, String datasetId, SpecService specService) {
2634
this.bigquery = bigquery;
2735
this.projectId = projectId;
2836
this.datasetId = datasetId;
37+
this.specService = specService;
2938
}
3039

3140
@Override
@@ -43,7 +52,26 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetFeaturesRequest getFeature
4352

4453
@Override
4554
public GetBatchFeaturesResponse getBatchFeatures(GetFeaturesRequest getFeaturesRequest) {
46-
return null;
55+
List<FeatureSetSpec> featureSetSpecs =
56+
getFeaturesRequest.getFeatureSetsList().stream()
57+
.map(
58+
featureSet ->
59+
specService.getFeatureSets(
60+
GetFeatureSetsRequest.newBuilder()
61+
.setFilter(
62+
Filter.newBuilder().setFeatureSetName(featureSet.getName()).build())
63+
.build()))
64+
.map(response -> response.getFeatureSetsList().get(0))
65+
.collect(Collectors.toList());
66+
String query =
67+
BigQueryUtil.createQuery(
68+
getFeaturesRequest.getFeatureSetsList(),
69+
featureSetSpecs,
70+
getFeaturesRequest.getEntityDataset().getEntityNamesList(),
71+
getFeaturesRequest.getEntityDataset().getEntityDatasetRowsList(),
72+
datasetId);
73+
System.out.println(query);
74+
return GetBatchFeaturesResponse.getDefaultInstance();
4775
}
4876

4977
@Override
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package feast.serving.service;
2+
3+
import feast.serving.ServingAPIProto.Job;
4+
import java.util.Optional;
5+
6+
// JobService interface specifies the operations to manage Job instances internally in Feast
7+
8+
public interface JobService {
9+
10+
/**
11+
* Get Job by job id.
12+
*
13+
* @param id job id
14+
* @return feast.serving.ServingAPIProto.Job
15+
*/
16+
public Optional<Job> get(String id);
17+
18+
/**
19+
* Update or create a job (if not exists)
20+
*
21+
* @param job feast.serving.ServingAPIProto.Job
22+
*/
23+
public void upsert(Job job);
24+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package feast.serving.service;
2+
3+
import com.google.protobuf.InvalidProtocolBufferException;
4+
import com.google.protobuf.util.JsonFormat;
5+
import feast.serving.ServingAPIProto.Job;
6+
import feast.serving.ServingAPIProto.Job.Builder;
7+
import java.util.Optional;
8+
import lombok.extern.slf4j.Slf4j;
9+
import redis.clients.jedis.JedisPool;
10+
11+
@Slf4j
12+
public class RedisBackedJobService implements JobService {
13+
private final JedisPool jedisPool;
14+
15+
public RedisBackedJobService(JedisPool jedisPool) {
16+
this.jedisPool = jedisPool;
17+
}
18+
19+
@Override
20+
public Optional<Job> get(String id) {
21+
String json = jedisPool.getResource().get(id);
22+
if (json == null) {
23+
return Optional.empty();
24+
}
25+
Job job = null;
26+
Builder builder = Job.newBuilder();
27+
try {
28+
JsonFormat.parser().merge(json, builder);
29+
job = builder.build();
30+
} catch (InvalidProtocolBufferException e) {
31+
log.error(String.format("Failed to parse JSON for Feast job: %s", e.getMessage()));
32+
}
33+
34+
return Optional.ofNullable(job);
35+
}
36+
37+
@Override
38+
public void upsert(Job job) {
39+
try {
40+
jedisPool
41+
.getResource()
42+
.set(job.getId(), JsonFormat.printer().omittingInsignificantWhitespace().print(job));
43+
} catch (InvalidProtocolBufferException e) {
44+
log.error(String.format("Failed to upsert job: %s", e.getMessage()));
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)