Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7534bfb
Update feast dep to 0.12
achals Sep 20, 2021
cd0f2b9
Port feast 0.10+ data model to feast-serving
achals Sep 22, 2021
3bef6ec
Fix tests
achals Sep 22, 2021
c68fb3d
Fix integ tests
achals Sep 22, 2021
42efa34
Fix integ tests
achals Sep 22, 2021
265b984
remove logging
achals Sep 22, 2021
7f779ce
Fix ilnt
achals Sep 22, 2021
f4d68fb
Fix serialization
achals Sep 22, 2021
34ec33c
Implement EntityKeySerialization correctly
achals Sep 25, 2021
676d57e
Update workflows
achals Sep 25, 2021
e0edb68
Update python version
achals Sep 25, 2021
771df7b
Change redis ports
achals Sep 26, 2021
42d7147
materialize into redis
achals Sep 27, 2021
087f4e6
fix path
achals Sep 27, 2021
702094e
Install redis vairant
achals Sep 27, 2021
83039f8
Remove odfv
achals Sep 27, 2021
3f8d26e
Include test file
achals Sep 27, 2021
76ce225
update source
achals Sep 27, 2021
66c8d7e
update source
achals Sep 27, 2021
100e37f
update source
achals Sep 28, 2021
e1e1f36
update source
achals Sep 28, 2021
7ae280e
Wrestling with spring
achals Oct 3, 2021
efc13b9
Tests
achals Oct 5, 2021
dfff7b9
Remove github action
achals Oct 5, 2021
c544ea1
Add registry
achals Oct 5, 2021
2e1a447
Remove redundant stuff
achals Oct 5, 2021
9de9d01
Rename test
achals Oct 5, 2021
f303101
awaitTermination
achals Oct 5, 2021
588f149
lint
achals Oct 6, 2021
2bbac62
lint
achals Oct 6, 2021
79757e4
dynamic properties instead
achals Oct 6, 2021
749c4d0
dirtiescontext
achals Oct 6, 2021
ebce9ac
python 3.7
achals Oct 6, 2021
174d757
spotless
achals Oct 6, 2021
8e7ed44
Dirty Context after test method as well
achals Oct 6, 2021
cd011c3
Cleanup
achals Oct 6, 2021
28a2b2b
Cleanup
achals Oct 6, 2021
f8f1c0a
cr
achals Oct 6, 2021
ba1420a
spotless
achals Oct 6, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .github/workflows/complete.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ jobs:
integration-test:
runs-on: ubuntu-latest
needs: unit-test-java
services:
redis:
image: redis
ports:
- 6389:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v2
with:
Expand All @@ -54,7 +64,7 @@ jobs:
architecture: x64
- uses: actions/setup-python@v2
with:
python-version: '3.6'
python-version: '3.7'
architecture: 'x64'
- uses: actions/cache@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion deps/feast
Submodule feast updated 883 files
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import java.util.concurrent.ScheduledExecutorService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;

@Component
@ConditionalOnBean(CoreCondition.class)
Comment thread
woop marked this conversation as resolved.
public class ContextClosedHandler implements ApplicationListener<ContextClosedEvent> {

@Autowired ScheduledExecutorService executor;
Expand Down
34 changes: 34 additions & 0 deletions serving/src/main/java/feast/serving/config/CoreCondition.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2021 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.serving.config;

import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;

/**
* A {@link Condition} to signal that the ServingService should get feature definitions and metadata
* from Core service.
*/
public class CoreCondition implements Condition {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add javadocs?

@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
final Environment env = context.getEnvironment();
return env.getProperty("feast.registry") == null;
}
}
11 changes: 10 additions & 1 deletion serving/src/main/java/feast/serving/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ public FeastProperties() {}
/* Feast Core port to connect to. */
@Positive private int coreGrpcPort;

private String registry;

public String getRegistry() {
return registry;
}

public void setRegistry(final String registry) {
this.registry = registry;
}

private CoreAuthenticationProperties coreAuthentication;

public CoreAuthenticationProperties getCoreAuthentication() {
Expand All @@ -82,7 +92,6 @@ public void setCoreAuthentication(CoreAuthenticationProperties coreAuthenticatio
this.coreAuthentication = coreAuthentication;
}

/* Feast Core port to connect to. */
@Positive private int coreCacheRefreshInterval;

private SecurityProperties security;
Expand Down
36 changes: 36 additions & 0 deletions serving/src/main/java/feast/serving/config/RegistryCondition.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2021 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.serving.config;

import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;

/**
* A {@link Condition} to signal that the ServingService should get feature definitions and metadata
* from the Registry object. This is needed for versions of the feature store written by feast
* 0.10+.
*/
public class RegistryCondition implements Condition {
Comment thread
adchia marked this conversation as resolved.

@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
final Environment env = context.getEnvironment();
return env.getProperty("feast.registry") != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.protobuf.AbstractMessageLite;
import feast.serving.registry.LocalRegistryRepo;
import feast.serving.service.OnlineServingServiceV2;
import feast.serving.service.ServingServiceV2;
import feast.serving.specs.CachedSpecService;
import feast.serving.specs.CoreFeatureSpecRetriever;
import feast.serving.specs.FeatureSpecRetriever;
import feast.serving.specs.RegistryFeatureSpecRetriever;
import feast.storage.api.retriever.OnlineRetrieverV2;
import feast.storage.connectors.bigtable.retriever.BigTableOnlineRetriever;
import feast.storage.connectors.bigtable.retriever.BigTableStoreConfig;
Expand All @@ -32,13 +37,15 @@
import io.opentracing.Tracer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

Expand All @@ -64,27 +71,26 @@ public BigtableDataClient bigtableClient(FeastProperties feastProperties) throws
}

@Bean
@Conditional(CoreCondition.class)
public ServingServiceV2 servingServiceV2(
FeastProperties feastProperties, CachedSpecService specService, Tracer tracer) {
ServingServiceV2 servingService = null;
FeastProperties.Store store = feastProperties.getActiveStore();
final ServingServiceV2 servingService;
final FeastProperties.Store store = feastProperties.getActiveStore();

OnlineRetrieverV2 retrieverV2;
switch (store.getType()) {
case REDIS_CLUSTER:
RedisClientAdapter redisClusterClient =
RedisClusterClient.create(store.getRedisClusterConfig());
OnlineRetrieverV2 redisClusterRetriever = new OnlineRetriever(redisClusterClient);
servingService = new OnlineServingServiceV2(redisClusterRetriever, specService, tracer);
retrieverV2 = new OnlineRetriever(redisClusterClient, (AbstractMessageLite::toByteArray));
break;
case REDIS:
RedisClientAdapter redisClient = RedisClient.create(store.getRedisConfig());
OnlineRetrieverV2 redisRetriever = new OnlineRetriever(redisClient);
servingService = new OnlineServingServiceV2(redisRetriever, specService, tracer);
retrieverV2 = new OnlineRetriever(redisClient, (AbstractMessageLite::toByteArray));
break;
case BIGTABLE:
BigtableDataClient bigtableClient = context.getBean(BigtableDataClient.class);
OnlineRetrieverV2 bigtableRetriever = new BigTableOnlineRetriever(bigtableClient);
servingService = new OnlineServingServiceV2(bigtableRetriever, specService, tracer);
retrieverV2 = new BigTableOnlineRetriever(bigtableClient);
break;
case CASSANDRA:
CassandraStoreConfig config = feastProperties.getActiveStore().getCassandraConfig();
Expand All @@ -109,11 +115,57 @@ public ServingServiceV2 servingServiceV2(
.withLocalDatacenter(dataCenter)
.withKeyspace(keySpace)
.build();
OnlineRetrieverV2 cassandraRetriever = new CassandraOnlineRetriever(session);
servingService = new OnlineServingServiceV2(cassandraRetriever, specService, tracer);
retrieverV2 = new CassandraOnlineRetriever(session);
break;
default:
throw new RuntimeException(
String.format("Unable to identify online store type: %s", store.getType()));
}

final FeatureSpecRetriever featureSpecRetriever;
log.info("Created CoreFeatureSpecRetriever");
featureSpecRetriever = new CoreFeatureSpecRetriever(specService);

servingService = new OnlineServingServiceV2(retrieverV2, tracer, featureSpecRetriever);

return servingService;
}

@Bean
@Conditional(RegistryCondition.class)
public ServingServiceV2 registryBasedServingServiceV2(
FeastProperties feastProperties, Tracer tracer) {
final ServingServiceV2 servingService;
final FeastProperties.Store store = feastProperties.getActiveStore();

OnlineRetrieverV2 retrieverV2;
// TODO: Support more store types, and potentially use a plugin model here.
switch (store.getType()) {
Comment thread
adchia marked this conversation as resolved.
case REDIS_CLUSTER:
RedisClientAdapter redisClusterClient =
RedisClusterClient.create(store.getRedisClusterConfig());
retrieverV2 = new OnlineRetriever(redisClusterClient, new EntityKeySerializerV2());
break;
case REDIS:
RedisClientAdapter redisClient = RedisClient.create(store.getRedisConfig());
log.info("Created EntityKeySerializerV2");
retrieverV2 = new OnlineRetriever(redisClient, new EntityKeySerializerV2());
break;
default:
throw new RuntimeException(
String.format(
"Unable to identify online store type: %s for Regsitry Backed Serving Service",
Comment thread
felixwang9817 marked this conversation as resolved.
store.getType()));
}

final FeatureSpecRetriever featureSpecRetriever;
log.info("Created RegistryFeatureSpecRetriever");
log.info("Working Directory = " + System.getProperty("user.dir"));
final LocalRegistryRepo repo = new LocalRegistryRepo(Paths.get(feastProperties.getRegistry()));
featureSpecRetriever = new RegistryFeatureSpecRetriever(repo);

servingService = new OnlineServingServiceV2(retrieverV2, tracer, featureSpecRetriever);

return servingService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package feast.serving.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.serving.specs.CachedSpecService;
import feast.serving.specs.CoreSpecService;
import io.grpc.CallCredentials;
Expand All @@ -28,15 +26,16 @@
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SpecServiceConfig {

private static final Logger log = org.slf4j.LoggerFactory.getLogger(SpecServiceConfig.class);
private String feastCoreHost;
private int feastCorePort;
private int feastCachedSpecServiceRefreshInterval;
private final String feastCoreHost;
private final int feastCorePort;
private final int feastCachedSpecServiceRefreshInterval;

@Autowired
public SpecServiceConfig(FeastProperties feastProperties) {
Expand All @@ -46,6 +45,7 @@ public SpecServiceConfig(FeastProperties feastProperties) {
}

@Bean
@Conditional(CoreCondition.class)
public ScheduledExecutorService cachedSpecServiceScheduledExecutorService(
CachedSpecService cachedSpecStorage) {
ScheduledExecutorService scheduledExecutorService =
Expand All @@ -60,8 +60,8 @@ public ScheduledExecutorService cachedSpecServiceScheduledExecutorService(
}

@Bean
public CachedSpecService specService(ObjectProvider<CallCredentials> callCredentials)
throws InvalidProtocolBufferException, JsonProcessingException {
@Conditional(CoreCondition.class)
public CachedSpecService specService(ObjectProvider<CallCredentials> callCredentials) {
CoreSpecService coreService =
new CoreSpecService(feastCoreHost, feastCorePort, callCredentials);
CachedSpecService cachedSpecStorage = new CachedSpecService(coreService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import feast.proto.serving.ServingAPIProto.GetFeastServingInfoRequest;
import feast.serving.interceptors.GrpcMonitoringInterceptor;
import feast.serving.service.ServingServiceV2;
import feast.serving.specs.CachedSpecService;
import io.grpc.health.v1.HealthGrpc.HealthImplBase;
import io.grpc.health.v1.HealthProto.HealthCheckRequest;
import io.grpc.health.v1.HealthProto.HealthCheckResponse;
Expand All @@ -32,12 +31,10 @@

@GrpcService(interceptors = {GrpcMonitoringInterceptor.class})
public class HealthServiceController extends HealthImplBase {
private CachedSpecService specService;
private ServingServiceV2 servingService;
private final ServingServiceV2 servingService;

@Autowired
public HealthServiceController(CachedSpecService specService, ServingServiceV2 servingService) {
this.specService = specService;
public HealthServiceController(final ServingServiceV2 servingService) {
this.servingService = servingService;
}

Expand All @@ -47,7 +44,7 @@ public void check(
// TODO: Implement proper logic to determine if ServingServiceV2 is healthy e.g.
// if it's online service check that it the service can retrieve dummy/random
// feature table.
// Implement similary for batch service.
// Implement similarly for batch service.

try {
servingService.getFeastServingInfo(GetFeastServingInfoRequest.getDefaultInstance());
Expand Down
Loading