diff --git a/.github/workflows/complete.yml b/.github/workflows/complete.yml index 7f6c3fe..c33ff8f 100644 --- a/.github/workflows/complete.yml +++ b/.github/workflows/complete.yml @@ -42,6 +42,16 @@ jobs: integration-test: runs-on: ubuntu-latest needs: unit-test-java + services: + redis: + image: redis + ports: + - 6389:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 steps: - uses: actions/checkout@v2 with: @@ -54,7 +64,7 @@ jobs: architecture: x64 - uses: actions/setup-python@v2 with: - python-version: '3.6' + python-version: '3.7' architecture: 'x64' - uses: actions/cache@v2 with: diff --git a/deps/feast b/deps/feast index 8010d2f..db43faf 160000 --- a/deps/feast +++ b/deps/feast @@ -1 +1 @@ -Subproject commit 8010d2f35d3f876db54a31b8012b13009cd5eba2 +Subproject commit db43faf7bd1385eb46f8e489766f6609abf753b9 diff --git a/serving/src/main/java/feast/serving/config/ContextClosedHandler.java b/serving/src/main/java/feast/serving/config/ContextClosedHandler.java index 2bc9743..cdf791c 100644 --- a/serving/src/main/java/feast/serving/config/ContextClosedHandler.java +++ b/serving/src/main/java/feast/serving/config/ContextClosedHandler.java @@ -18,11 +18,13 @@ import java.util.concurrent.ScheduledExecutorService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextClosedEvent; import org.springframework.stereotype.Component; @Component +@ConditionalOnBean(CoreCondition.class) public class ContextClosedHandler implements ApplicationListener { @Autowired ScheduledExecutorService executor; diff --git a/serving/src/main/java/feast/serving/config/CoreCondition.java b/serving/src/main/java/feast/serving/config/CoreCondition.java new file mode 100644 index 0000000..10dabfa --- /dev/null +++ b/serving/src/main/java/feast/serving/config/CoreCondition.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2021 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.config; + +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.core.env.Environment; +import org.springframework.core.type.AnnotatedTypeMetadata; + +/** + * A {@link Condition} to signal that the ServingService should get feature definitions and metadata + * from Core service. + */ +public class CoreCondition implements Condition { + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { + final Environment env = context.getEnvironment(); + return env.getProperty("feast.registry") == null; + } +} diff --git a/serving/src/main/java/feast/serving/config/FeastProperties.java b/serving/src/main/java/feast/serving/config/FeastProperties.java index 1b62d84..9a60923 100644 --- a/serving/src/main/java/feast/serving/config/FeastProperties.java +++ b/serving/src/main/java/feast/serving/config/FeastProperties.java @@ -72,6 +72,16 @@ public FeastProperties() {} /* Feast Core port to connect to. */ @Positive private int coreGrpcPort; + private String registry; + + public String getRegistry() { + return registry; + } + + public void setRegistry(final String registry) { + this.registry = registry; + } + private CoreAuthenticationProperties coreAuthentication; public CoreAuthenticationProperties getCoreAuthentication() { @@ -82,7 +92,6 @@ public void setCoreAuthentication(CoreAuthenticationProperties coreAuthenticatio this.coreAuthentication = coreAuthentication; } - /* Feast Core port to connect to. */ @Positive private int coreCacheRefreshInterval; private SecurityProperties security; diff --git a/serving/src/main/java/feast/serving/config/RegistryCondition.java b/serving/src/main/java/feast/serving/config/RegistryCondition.java new file mode 100644 index 0000000..621d124 --- /dev/null +++ b/serving/src/main/java/feast/serving/config/RegistryCondition.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2021 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.config; + +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.core.env.Environment; +import org.springframework.core.type.AnnotatedTypeMetadata; + +/** + * A {@link Condition} to signal that the ServingService should get feature definitions and metadata + * from the Registry object. This is needed for versions of the feature store written by feast + * 0.10+. + */ +public class RegistryCondition implements Condition { + + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { + final Environment env = context.getEnvironment(); + return env.getProperty("feast.registry") != null; + } +} diff --git a/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java b/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java index 9d50a6a..d1ac636 100644 --- a/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java +++ b/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java @@ -20,9 +20,14 @@ import com.datastax.oss.driver.api.core.CqlSessionBuilder; import com.google.cloud.bigtable.data.v2.BigtableDataClient; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.protobuf.AbstractMessageLite; +import feast.serving.registry.LocalRegistryRepo; import feast.serving.service.OnlineServingServiceV2; import feast.serving.service.ServingServiceV2; import feast.serving.specs.CachedSpecService; +import feast.serving.specs.CoreFeatureSpecRetriever; +import feast.serving.specs.FeatureSpecRetriever; +import feast.serving.specs.RegistryFeatureSpecRetriever; import feast.storage.api.retriever.OnlineRetrieverV2; import feast.storage.connectors.bigtable.retriever.BigTableOnlineRetriever; import feast.storage.connectors.bigtable.retriever.BigTableStoreConfig; @@ -32,6 +37,7 @@ import io.opentracing.Tracer; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.file.Paths; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -39,6 +45,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; @@ -64,27 +71,26 @@ public BigtableDataClient bigtableClient(FeastProperties feastProperties) throws } @Bean + @Conditional(CoreCondition.class) public ServingServiceV2 servingServiceV2( FeastProperties feastProperties, CachedSpecService specService, Tracer tracer) { - ServingServiceV2 servingService = null; - FeastProperties.Store store = feastProperties.getActiveStore(); + final ServingServiceV2 servingService; + final FeastProperties.Store store = feastProperties.getActiveStore(); + OnlineRetrieverV2 retrieverV2; switch (store.getType()) { case REDIS_CLUSTER: RedisClientAdapter redisClusterClient = RedisClusterClient.create(store.getRedisClusterConfig()); - OnlineRetrieverV2 redisClusterRetriever = new OnlineRetriever(redisClusterClient); - servingService = new OnlineServingServiceV2(redisClusterRetriever, specService, tracer); + retrieverV2 = new OnlineRetriever(redisClusterClient, (AbstractMessageLite::toByteArray)); break; case REDIS: RedisClientAdapter redisClient = RedisClient.create(store.getRedisConfig()); - OnlineRetrieverV2 redisRetriever = new OnlineRetriever(redisClient); - servingService = new OnlineServingServiceV2(redisRetriever, specService, tracer); + retrieverV2 = new OnlineRetriever(redisClient, (AbstractMessageLite::toByteArray)); break; case BIGTABLE: BigtableDataClient bigtableClient = context.getBean(BigtableDataClient.class); - OnlineRetrieverV2 bigtableRetriever = new BigTableOnlineRetriever(bigtableClient); - servingService = new OnlineServingServiceV2(bigtableRetriever, specService, tracer); + retrieverV2 = new BigTableOnlineRetriever(bigtableClient); break; case CASSANDRA: CassandraStoreConfig config = feastProperties.getActiveStore().getCassandraConfig(); @@ -109,11 +115,57 @@ public ServingServiceV2 servingServiceV2( .withLocalDatacenter(dataCenter) .withKeyspace(keySpace) .build(); - OnlineRetrieverV2 cassandraRetriever = new CassandraOnlineRetriever(session); - servingService = new OnlineServingServiceV2(cassandraRetriever, specService, tracer); + retrieverV2 = new CassandraOnlineRetriever(session); break; + default: + throw new RuntimeException( + String.format("Unable to identify online store type: %s", store.getType())); } + final FeatureSpecRetriever featureSpecRetriever; + log.info("Created CoreFeatureSpecRetriever"); + featureSpecRetriever = new CoreFeatureSpecRetriever(specService); + + servingService = new OnlineServingServiceV2(retrieverV2, tracer, featureSpecRetriever); + + return servingService; + } + + @Bean + @Conditional(RegistryCondition.class) + public ServingServiceV2 registryBasedServingServiceV2( + FeastProperties feastProperties, Tracer tracer) { + final ServingServiceV2 servingService; + final FeastProperties.Store store = feastProperties.getActiveStore(); + + OnlineRetrieverV2 retrieverV2; + // TODO: Support more store types, and potentially use a plugin model here. + switch (store.getType()) { + case REDIS_CLUSTER: + RedisClientAdapter redisClusterClient = + RedisClusterClient.create(store.getRedisClusterConfig()); + retrieverV2 = new OnlineRetriever(redisClusterClient, new EntityKeySerializerV2()); + break; + case REDIS: + RedisClientAdapter redisClient = RedisClient.create(store.getRedisConfig()); + log.info("Created EntityKeySerializerV2"); + retrieverV2 = new OnlineRetriever(redisClient, new EntityKeySerializerV2()); + break; + default: + throw new RuntimeException( + String.format( + "Unable to identify online store type: %s for Regsitry Backed Serving Service", + store.getType())); + } + + final FeatureSpecRetriever featureSpecRetriever; + log.info("Created RegistryFeatureSpecRetriever"); + log.info("Working Directory = " + System.getProperty("user.dir")); + final LocalRegistryRepo repo = new LocalRegistryRepo(Paths.get(feastProperties.getRegistry())); + featureSpecRetriever = new RegistryFeatureSpecRetriever(repo); + + servingService = new OnlineServingServiceV2(retrieverV2, tracer, featureSpecRetriever); + return servingService; } } diff --git a/serving/src/main/java/feast/serving/config/SpecServiceConfig.java b/serving/src/main/java/feast/serving/config/SpecServiceConfig.java index 369d543..29d3bf0 100644 --- a/serving/src/main/java/feast/serving/config/SpecServiceConfig.java +++ b/serving/src/main/java/feast/serving/config/SpecServiceConfig.java @@ -16,8 +16,6 @@ */ package feast.serving.config; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.protobuf.InvalidProtocolBufferException; import feast.serving.specs.CachedSpecService; import feast.serving.specs.CoreSpecService; import io.grpc.CallCredentials; @@ -28,15 +26,16 @@ import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; @Configuration public class SpecServiceConfig { private static final Logger log = org.slf4j.LoggerFactory.getLogger(SpecServiceConfig.class); - private String feastCoreHost; - private int feastCorePort; - private int feastCachedSpecServiceRefreshInterval; + private final String feastCoreHost; + private final int feastCorePort; + private final int feastCachedSpecServiceRefreshInterval; @Autowired public SpecServiceConfig(FeastProperties feastProperties) { @@ -46,6 +45,7 @@ public SpecServiceConfig(FeastProperties feastProperties) { } @Bean + @Conditional(CoreCondition.class) public ScheduledExecutorService cachedSpecServiceScheduledExecutorService( CachedSpecService cachedSpecStorage) { ScheduledExecutorService scheduledExecutorService = @@ -60,8 +60,8 @@ public ScheduledExecutorService cachedSpecServiceScheduledExecutorService( } @Bean - public CachedSpecService specService(ObjectProvider callCredentials) - throws InvalidProtocolBufferException, JsonProcessingException { + @Conditional(CoreCondition.class) + public CachedSpecService specService(ObjectProvider callCredentials) { CoreSpecService coreService = new CoreSpecService(feastCoreHost, feastCorePort, callCredentials); CachedSpecService cachedSpecStorage = new CachedSpecService(coreService); diff --git a/serving/src/main/java/feast/serving/controller/HealthServiceController.java b/serving/src/main/java/feast/serving/controller/HealthServiceController.java index 4bee981..ef675d4 100644 --- a/serving/src/main/java/feast/serving/controller/HealthServiceController.java +++ b/serving/src/main/java/feast/serving/controller/HealthServiceController.java @@ -19,7 +19,6 @@ import feast.proto.serving.ServingAPIProto.GetFeastServingInfoRequest; import feast.serving.interceptors.GrpcMonitoringInterceptor; import feast.serving.service.ServingServiceV2; -import feast.serving.specs.CachedSpecService; import io.grpc.health.v1.HealthGrpc.HealthImplBase; import io.grpc.health.v1.HealthProto.HealthCheckRequest; import io.grpc.health.v1.HealthProto.HealthCheckResponse; @@ -32,12 +31,10 @@ @GrpcService(interceptors = {GrpcMonitoringInterceptor.class}) public class HealthServiceController extends HealthImplBase { - private CachedSpecService specService; - private ServingServiceV2 servingService; + private final ServingServiceV2 servingService; @Autowired - public HealthServiceController(CachedSpecService specService, ServingServiceV2 servingService) { - this.specService = specService; + public HealthServiceController(final ServingServiceV2 servingService) { this.servingService = servingService; } @@ -47,7 +44,7 @@ public void check( // TODO: Implement proper logic to determine if ServingServiceV2 is healthy e.g. // if it's online service check that it the service can retrieve dummy/random // feature table. - // Implement similary for batch service. + // Implement similarly for batch service. try { servingService.getFeastServingInfo(GetFeastServingInfoRequest.getDefaultInstance()); diff --git a/serving/src/main/java/feast/serving/registry/LocalRegistryRepo.java b/serving/src/main/java/feast/serving/registry/LocalRegistryRepo.java new file mode 100644 index 0000000..ff41dd4 --- /dev/null +++ b/serving/src/main/java/feast/serving/registry/LocalRegistryRepo.java @@ -0,0 +1,78 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2021 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.registry; + +import feast.proto.core.FeatureProto; +import feast.proto.core.FeatureViewProto; +import feast.proto.core.RegistryProto; +import feast.proto.serving.ServingAPIProto; +import feast.serving.exception.SpecRetrievalException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class LocalRegistryRepo implements RegistryRepository { + private final RegistryProto.Registry registry; + + public LocalRegistryRepo(Path localRegistryPath) { + if (!localRegistryPath.toFile().exists()) { + throw new RuntimeException( + String.format("Local registry not found at path %s", localRegistryPath)); + } + try { + final byte[] registryContents = Files.readAllBytes(localRegistryPath); + this.registry = RegistryProto.Registry.parseFrom(registryContents); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public RegistryProto.Registry getRegistry() { + return this.registry; + } + + @Override + public FeatureViewProto.FeatureViewSpec getFeatureViewSpec( + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference) { + final RegistryProto.Registry registry = this.getRegistry(); + for (final FeatureViewProto.FeatureView featureView : registry.getFeatureViewsList()) { + if (featureView.getSpec().getName().equals(featureReference.getFeatureTable())) { + return featureView.getSpec(); + } + } + throw new SpecRetrievalException( + String.format( + "Unable to find feature view with name: %s", featureReference.getFeatureTable())); + } + + @Override + public FeatureProto.FeatureSpecV2 getFeatureSpec( + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference) { + final FeatureViewProto.FeatureViewSpec spec = + this.getFeatureViewSpec(projectName, featureReference); + for (final FeatureProto.FeatureSpecV2 featureSpec : spec.getFeaturesList()) { + if (featureSpec.getName().equals(featureReference.getName())) { + return featureSpec; + } + } + + throw new SpecRetrievalException( + String.format( + "Unable to find feature with name: %s in feature view: %s", + featureReference.getName(), featureReference.getFeatureTable())); + } +} diff --git a/serving/src/main/java/feast/serving/registry/RegistryRepository.java b/serving/src/main/java/feast/serving/registry/RegistryRepository.java new file mode 100644 index 0000000..79634ee --- /dev/null +++ b/serving/src/main/java/feast/serving/registry/RegistryRepository.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2021 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.registry; + +import feast.proto.core.FeatureProto; +import feast.proto.core.FeatureViewProto; +import feast.proto.core.RegistryProto; +import feast.proto.serving.ServingAPIProto; + +/** + * RegistryRepository allows the ServingService to retrieve feature definitions from a Registry + * object. This approach is needed for a feature store created using feast 0.10+. + */ +public interface RegistryRepository { + RegistryProto.Registry getRegistry(); + + FeatureViewProto.FeatureViewSpec getFeatureViewSpec( + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference); + + FeatureProto.FeatureSpecV2 getFeatureSpec( + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference); +} diff --git a/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java b/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java index 1d35edd..a9a1b8f 100644 --- a/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java +++ b/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java @@ -28,7 +28,7 @@ import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse; import feast.proto.types.ValueProto; import feast.serving.exception.SpecRetrievalException; -import feast.serving.specs.CachedSpecService; +import feast.serving.specs.FeatureSpecRetriever; import feast.serving.util.Metrics; import feast.storage.api.retriever.Feature; import feast.storage.api.retriever.OnlineRetrieverV2; @@ -45,15 +45,15 @@ public class OnlineServingServiceV2 implements ServingServiceV2 { private static final Logger log = org.slf4j.LoggerFactory.getLogger(OnlineServingServiceV2.class); - private final CachedSpecService specService; private final Tracer tracer; private final OnlineRetrieverV2 retriever; + private final FeatureSpecRetriever featureSpecRetriever; public OnlineServingServiceV2( - OnlineRetrieverV2 retriever, CachedSpecService specService, Tracer tracer) { + OnlineRetrieverV2 retriever, Tracer tracer, FeatureSpecRetriever featureSpecRetriever) { this.retriever = retriever; - this.specService = specService; this.tracer = tracer; + this.featureSpecRetriever = featureSpecRetriever; } /** {@inheritDoc} */ @@ -94,10 +94,10 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequestV2 re .collect( Collectors.toMap( Function.identity(), - ref -> specService.getFeatureTableSpec(finalProjectName, ref).getMaxAge())); + ref -> this.featureSpecRetriever.getMaxAge(finalProjectName, ref))); List entityNames = featureReferences.stream() - .map(ref -> specService.getFeatureTableSpec(finalProjectName, ref).getEntitiesList()) + .map(ref -> this.featureSpecRetriever.getEntitiesList(finalProjectName, ref)) .findFirst() .get(); @@ -109,7 +109,9 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequestV2 re Function.identity(), ref -> { try { - return specService.getFeatureSpec(finalProjectName, ref).getValueType(); + return this.featureSpecRetriever + .getFeatureSpec(finalProjectName, ref) + .getValueType(); } catch (SpecRetrievalException e) { return ValueProto.ValueType.Enum.INVALID; } diff --git a/serving/src/main/java/feast/serving/specs/CoreFeatureSpecRetriever.java b/serving/src/main/java/feast/serving/specs/CoreFeatureSpecRetriever.java new file mode 100644 index 0000000..fc24a10 --- /dev/null +++ b/serving/src/main/java/feast/serving/specs/CoreFeatureSpecRetriever.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2021 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.specs; + +import com.google.protobuf.Duration; +import feast.proto.core.FeatureProto; +import feast.proto.serving.ServingAPIProto; +import java.util.List; + +public class CoreFeatureSpecRetriever implements FeatureSpecRetriever { + private final CachedSpecService specService; + + public CoreFeatureSpecRetriever(CachedSpecService specService) { + this.specService = specService; + } + + @Override + public Duration getMaxAge( + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference) { + return this.specService.getFeatureTableSpec(projectName, featureReference).getMaxAge(); + } + + @Override + public List getEntitiesList( + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference) { + return this.specService.getFeatureTableSpec(projectName, featureReference).getEntitiesList(); + } + + @Override + public FeatureProto.FeatureSpecV2 getFeatureSpec( + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference) { + return this.specService.getFeatureSpec(projectName, featureReference); + } +} diff --git a/serving/src/main/java/feast/serving/specs/FeatureSpecRetriever.java b/serving/src/main/java/feast/serving/specs/FeatureSpecRetriever.java new file mode 100644 index 0000000..91bc7fe --- /dev/null +++ b/serving/src/main/java/feast/serving/specs/FeatureSpecRetriever.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2021 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.specs; + +import com.google.protobuf.Duration; +import feast.proto.core.FeatureProto; +import feast.proto.serving.ServingAPIProto; +import java.util.List; + +public interface FeatureSpecRetriever { + + Duration getMaxAge(String projectName, ServingAPIProto.FeatureReferenceV2 featureReference); + + List getEntitiesList( + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference); + + FeatureProto.FeatureSpecV2 getFeatureSpec( + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference); +} diff --git a/serving/src/main/java/feast/serving/specs/RegistryFeatureSpecRetriever.java b/serving/src/main/java/feast/serving/specs/RegistryFeatureSpecRetriever.java new file mode 100644 index 0000000..24026b1 --- /dev/null +++ b/serving/src/main/java/feast/serving/specs/RegistryFeatureSpecRetriever.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2021 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.specs; + +import com.google.protobuf.Duration; +import feast.proto.core.FeatureProto; +import feast.proto.core.FeatureViewProto; +import feast.proto.core.RegistryProto; +import feast.proto.serving.ServingAPIProto; +import feast.serving.exception.SpecRetrievalException; +import feast.serving.registry.RegistryRepository; +import java.util.List; + +public class RegistryFeatureSpecRetriever implements FeatureSpecRetriever { + private final RegistryRepository registryRepository; + + public RegistryFeatureSpecRetriever(RegistryRepository registryRepository) { + this.registryRepository = registryRepository; + } + + @Override + public Duration getMaxAge( + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference) { + final RegistryProto.Registry registry = this.registryRepository.getRegistry(); + for (final FeatureViewProto.FeatureView featureView : registry.getFeatureViewsList()) { + if (featureView.getSpec().getName().equals(featureReference.getFeatureTable())) { + return featureView.getSpec().getTtl(); + } + } + throw new SpecRetrievalException( + String.format( + "Unable to find feature view with name: %s", featureReference.getFeatureTable())); + } + + @Override + public List getEntitiesList( + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference) { + final RegistryProto.Registry registry = this.registryRepository.getRegistry(); + for (final FeatureViewProto.FeatureView featureView : registry.getFeatureViewsList()) { + if (featureView.getSpec().getName().equals(featureReference.getFeatureTable())) { + return featureView.getSpec().getEntitiesList(); + } + } + throw new SpecRetrievalException( + String.format( + "Unable to find feature view with name: %s", featureReference.getFeatureTable())); + } + + @Override + public FeatureProto.FeatureSpecV2 getFeatureSpec( + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference) { + return this.registryRepository.getFeatureSpec(projectName, featureReference); + } +} diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml index f8187e9..3e4e07b 100644 --- a/serving/src/main/resources/application.yml +++ b/serving/src/main/resources/application.yml @@ -3,7 +3,7 @@ feast: # Feast Serving requires connection to Feast Core to retrieve and reload Feast metadata (e.g. FeatureSpecs, Store information) core-host: ${FEAST_CORE_HOST:localhost} core-grpc-port: ${FEAST_CORE_GRPC_PORT:6565} - + core-authentication: enabled: false # should be set to true if authentication is enabled on core. provider: google # can be set to `oauth` or `google` diff --git a/serving/src/test/java/feast/serving/it/ServingServiceFeast10IT.java b/serving/src/test/java/feast/serving/it/ServingServiceFeast10IT.java new file mode 100644 index 0000000..c1e7a15 --- /dev/null +++ b/serving/src/test/java/feast/serving/it/ServingServiceFeast10IT.java @@ -0,0 +1,135 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.it; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Timestamp; +import feast.common.it.DataGenerator; +import feast.proto.serving.ServingAPIProto; +import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2; +import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse; +import feast.proto.serving.ServingServiceGrpc; +import io.grpc.ManagedChannel; +import java.io.File; +import java.util.concurrent.TimeUnit; +import org.junit.ClassRule; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@ActiveProfiles("it") +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { + "feast.registry:src/test/resources/docker-compose/feast10/registry.db", + }) +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +@Testcontainers +public class ServingServiceFeast10IT extends BaseAuthIT { + + public static final Logger log = LoggerFactory.getLogger(ServingServiceFeast10IT.class); + + static final String timestampPrefix = "_ts"; + static ServingServiceGrpc.ServingServiceBlockingStub servingStub; + + static final int FEAST_SERVING_PORT = 6568; + @LocalServerPort private int metricsPort; + + @ClassRule @Container + public static DockerComposeContainer environment = + new DockerComposeContainer( + new File("src/test/resources/docker-compose/docker-compose-feast10-it.yml")) + .withExposedService(REDIS, REDIS_PORT); + + @DynamicPropertySource + static void initialize(DynamicPropertyRegistry registry) { + registry.add("grpc.server.port", () -> FEAST_SERVING_PORT); + } + + @BeforeAll + static void globalSetup() { + servingStub = TestUtils.getServingServiceStub(false, FEAST_SERVING_PORT, null); + } + + @AfterAll + static void tearDown() throws Exception { + ((ManagedChannel) servingStub.getChannel()).shutdown().awaitTermination(10, TimeUnit.SECONDS); + } + + @Test + @DirtiesContext(methodMode = DirtiesContext.MethodMode.AFTER_METHOD) + public void shouldGetOnlineFeatures() { + // getOnlineFeatures Information + String projectName = "feast_project"; + String entityName = "driver_id"; + + // Instantiate EntityRows + final Timestamp timestamp = Timestamp.getDefaultInstance(); + GetOnlineFeaturesRequestV2.EntityRow entityRow1 = + DataGenerator.createEntityRow( + entityName, DataGenerator.createInt64Value(1001), timestamp.getSeconds()); + ImmutableList entityRows = ImmutableList.of(entityRow1); + + // Instantiate FeatureReferences + ServingAPIProto.FeatureReferenceV2 feature1Reference = + DataGenerator.createFeatureReference("driver_hourly_stats", "conv_rate"); + ServingAPIProto.FeatureReferenceV2 feature2Reference = + DataGenerator.createFeatureReference("driver_hourly_stats", "avg_daily_trips"); + ImmutableList featureReferences = + ImmutableList.of(feature1Reference, feature2Reference); + + // Build GetOnlineFeaturesRequestV2 + GetOnlineFeaturesRequestV2 onlineFeatureRequest = + TestUtils.createOnlineFeatureRequest(projectName, featureReferences, entityRows); + GetOnlineFeaturesResponse featureResponse = + servingStub.getOnlineFeaturesV2(onlineFeatureRequest); + + assertEquals(1, featureResponse.getFieldValuesCount()); + + final GetOnlineFeaturesResponse.FieldValues fieldValue = featureResponse.getFieldValues(0); + for (final String key : + ImmutableList.of( + "driver_hourly_stats:avg_daily_trips", "driver_hourly_stats:conv_rate", "driver_id")) { + assertTrue(fieldValue.containsFields(key)); + assertTrue(fieldValue.containsStatuses(key)); + assertEquals( + GetOnlineFeaturesResponse.FieldStatus.PRESENT, fieldValue.getStatusesOrThrow(key)); + } + + assertEquals( + 721, fieldValue.getFieldsOrThrow("driver_hourly_stats:avg_daily_trips").getInt64Val()); + assertEquals(1001, fieldValue.getFieldsOrThrow("driver_id").getInt64Val()); + assertEquals( + 0.74203354, + fieldValue.getFieldsOrThrow("driver_hourly_stats:conv_rate").getDoubleVal(), + 0.0001); + } +} diff --git a/serving/src/test/java/feast/serving/it/ServingServiceIT.java b/serving/src/test/java/feast/serving/it/ServingServiceIT.java index 8e0a82e..37cbbc3 100644 --- a/serving/src/test/java/feast/serving/it/ServingServiceIT.java +++ b/serving/src/test/java/feast/serving/it/ServingServiceIT.java @@ -212,7 +212,7 @@ public void shouldAllowUnauthenticatedAccessToMetricsEndpoint() throws IOExcepti .build(); Response response = new OkHttpClient().newCall(request).execute(); assertTrue(response.isSuccessful()); - assertTrue(!response.body().string().isEmpty()); + assertFalse(response.body().string().isEmpty()); } @Test diff --git a/serving/src/test/java/feast/serving/service/OnlineServingServiceTest.java b/serving/src/test/java/feast/serving/service/OnlineServingServiceTest.java index 83dbdf0..0f260b9 100644 --- a/serving/src/test/java/feast/serving/service/OnlineServingServiceTest.java +++ b/serving/src/test/java/feast/serving/service/OnlineServingServiceTest.java @@ -34,6 +34,7 @@ import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues; import feast.proto.types.ValueProto; import feast.serving.specs.CachedSpecService; +import feast.serving.specs.CoreFeatureSpecRetriever; import feast.storage.api.retriever.Feature; import feast.storage.api.retriever.ProtoFeature; import feast.storage.connectors.redis.retriever.OnlineRetriever; @@ -61,7 +62,8 @@ public class OnlineServingServiceTest { @Before public void setUp() { initMocks(this); - onlineServingServiceV2 = new OnlineServingServiceV2(retrieverV2, specService, tracer); + onlineServingServiceV2 = + new OnlineServingServiceV2(retrieverV2, tracer, new CoreFeatureSpecRetriever(specService)); mockedFeatureRows = new ArrayList<>(); mockedFeatureRows.add( diff --git a/serving/src/test/resources/docker-compose/docker-compose-feast10-it.yml b/serving/src/test/resources/docker-compose/docker-compose-feast10-it.yml new file mode 100644 index 0000000..33d65f4 --- /dev/null +++ b/serving/src/test/resources/docker-compose/docker-compose-feast10-it.yml @@ -0,0 +1,18 @@ +version: '3' + +services: + db: + image: postgres:12-alpine + environment: + POSTGRES_PASSWORD: password + ports: + - "5432:5432" + redis: + image: redis:5-alpine + ports: + - "6379:6379" + materialize: + build: feast10 + links: + - redis + diff --git a/serving/src/test/resources/docker-compose/feast10/Dockerfile b/serving/src/test/resources/docker-compose/feast10/Dockerfile new file mode 100644 index 0000000..bde9f11 --- /dev/null +++ b/serving/src/test/resources/docker-compose/feast10/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.7 + +WORKDIR /usr/src/ + +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +CMD [ "python", "./materialize.py" ] diff --git a/serving/src/test/resources/docker-compose/feast10/driver_stats.parquet b/serving/src/test/resources/docker-compose/feast10/driver_stats.parquet new file mode 100644 index 0000000..df8cbba Binary files /dev/null and b/serving/src/test/resources/docker-compose/feast10/driver_stats.parquet differ diff --git a/serving/src/test/resources/docker-compose/feast10/feature_store.yaml b/serving/src/test/resources/docker-compose/feast10/feature_store.yaml new file mode 100644 index 0000000..87e3310 --- /dev/null +++ b/serving/src/test/resources/docker-compose/feast10/feature_store.yaml @@ -0,0 +1,9 @@ +project: feast_project +provider: local +online_store: + type: redis + connection_string: "redis:6379" +offline_store: {} +flags: + alpha_features: true + on_demand_transforms: true diff --git a/serving/src/test/resources/docker-compose/feast10/materialize.py b/serving/src/test/resources/docker-compose/feast10/materialize.py new file mode 100644 index 0000000..6338c16 --- /dev/null +++ b/serving/src/test/resources/docker-compose/feast10/materialize.py @@ -0,0 +1,45 @@ +# This is an example feature definition file + +from google.protobuf.duration_pb2 import Duration + +from datetime import datetime +from feast import Entity, Feature, FeatureView, FileSource, ValueType, FeatureService, FeatureStore + +print("Running materialize.py") + +# Read data from parquet files. Parquet is convenient for local development mode. For +# production, you can use your favorite DWH, such as BigQuery. See Feast documentation +# for more info. +file_path = "driver_stats.parquet" +driver_hourly_stats = FileSource( + path=file_path, + event_timestamp_column="event_timestamp", + created_timestamp_column="created", +) + +# Define an entity for the driver. You can think of entity as a primary key used to +# fetch features. +driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) + +# Our parquet files contain sample data that includes a driver_id column, timestamps and +# three feature column. Here we define a Feature View that will allow us to serve this +# data to our model online. +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=["driver_id"], + ttl=Duration(seconds=86400 * 365), + features=[ + Feature(name="conv_rate", dtype=ValueType.DOUBLE), + Feature(name="acc_rate", dtype=ValueType.FLOAT), + Feature(name="avg_daily_trips", dtype=ValueType.INT64), + ], + online=True, + batch_source=driver_hourly_stats, + tags={}, +) + +fs = FeatureStore(".") +fs.apply([driver_hourly_stats_view, driver]) + +now = datetime.now() +fs.materialize_incremental(now) diff --git a/serving/src/test/resources/docker-compose/feast10/registry.db b/serving/src/test/resources/docker-compose/feast10/registry.db new file mode 100644 index 0000000..774b493 Binary files /dev/null and b/serving/src/test/resources/docker-compose/feast10/registry.db differ diff --git a/serving/src/test/resources/docker-compose/feast10/requirements.txt b/serving/src/test/resources/docker-compose/feast10/requirements.txt new file mode 100644 index 0000000..cb579a2 --- /dev/null +++ b/serving/src/test/resources/docker-compose/feast10/requirements.txt @@ -0,0 +1 @@ +feast[redis]>=0.13,<1 diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/common/RedisHashDecoder.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/common/RedisHashDecoder.java index f78e22d..ce7d200 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/common/RedisHashDecoder.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/common/RedisHashDecoder.java @@ -39,7 +39,7 @@ public class RedisHashDecoder { */ public static List retrieveFeature( List> redisHashValues, - Map byteToFeatureReferenceMap, + Map byteToFeatureReferenceMap, String timestampPrefix) throws InvalidProtocolBufferException { List allFeatures = new ArrayList<>(); @@ -57,7 +57,7 @@ public static List retrieveFeature( featureTableTimestampMap.put(new String(redisValueK), eventTimestamp); } else { ServingAPIProto.FeatureReferenceV2 featureReference = - byteToFeatureReferenceMap.get(redisValueK.toString()); + byteToFeatureReferenceMap.get(redisValueK); ValueProto.Value featureValue = ValueProto.Value.parseFrom(redisValueV); featureMap.put(featureReference, featureValue); diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializer.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializer.java new file mode 100644 index 0000000..6220dd2 --- /dev/null +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializer.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2021 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.storage.connectors.redis.retriever; + +import feast.proto.storage.RedisProto; + +@FunctionalInterface +public interface EntityKeySerializer { + byte[] serialize(final RedisProto.RedisKeyV2 entityKey); +} diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java new file mode 100644 index 0000000..922a09d --- /dev/null +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2021 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.storage.connectors.redis.retriever; + +import com.google.common.primitives.UnsignedBytes; +import com.google.protobuf.ProtocolStringList; +import feast.proto.storage.RedisProto; +import feast.proto.types.ValueProto; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import org.apache.commons.lang3.tuple.Pair; + +// This is derived from +// https://github.com/feast-dev/feast/blob/b1ccf8dd1535f721aee8bea937ee38feff80bec5/sdk/python/feast/infra/key_encoding_utils.py#L22 +// and must be kept up to date with any changes in that logic. +public class EntityKeySerializerV2 implements EntityKeySerializer { + + @Override + public byte[] serialize(RedisProto.RedisKeyV2 entityKey) { + final ProtocolStringList joinKeys = entityKey.getEntityNamesList(); + final List values = entityKey.getEntityValuesList(); + + assert joinKeys.size() == values.size(); + + final List buffer = new ArrayList<>(); + + final List> tuples = new ArrayList<>(joinKeys.size()); + for (int i = 0; i < joinKeys.size(); i++) { + tuples.add(Pair.of(joinKeys.get(i), values.get(i))); + } + tuples.sort(Comparator.comparing(Pair::getLeft)); + + ByteBuffer stringBytes = ByteBuffer.allocate(Integer.BYTES); + stringBytes.order(ByteOrder.LITTLE_ENDIAN); + stringBytes.putInt(ValueProto.ValueType.Enum.STRING.getNumber()); + + for (Pair pair : tuples) { + for (final byte b : stringBytes.array()) { + buffer.add(b); + } + for (final byte b : pair.getLeft().getBytes(StandardCharsets.UTF_8)) { + buffer.add(b); + } + } + + for (Pair pair : tuples) { + final ValueProto.Value val = pair.getRight(); + switch (val.getValCase()) { + case STRING_VAL: + buffer.add(UnsignedBytes.checkedCast(ValueProto.ValueType.Enum.STRING.getNumber())); + buffer.add( + UnsignedBytes.checkedCast( + val.getStringVal().getBytes(StandardCharsets.UTF_8).length)); + for (final byte b : val.getStringVal().getBytes(StandardCharsets.UTF_8)) { + buffer.add(b); + } + break; + case BYTES_VAL: + buffer.add(UnsignedBytes.checkedCast(ValueProto.ValueType.Enum.BYTES.getNumber())); + for (final byte b : val.getBytesVal().toByteArray()) { + buffer.add(b); + } + break; + case INT32_VAL: + ByteBuffer int32ByteBuffer = + ByteBuffer.allocate(Integer.BYTES + Integer.BYTES + Integer.BYTES); + int32ByteBuffer.order(ByteOrder.LITTLE_ENDIAN); + int32ByteBuffer.putInt(ValueProto.ValueType.Enum.INT32.getNumber()); + int32ByteBuffer.putInt(Integer.BYTES); + int32ByteBuffer.putInt(val.getInt32Val()); + for (final byte b : int32ByteBuffer.array()) { + buffer.add(b); + } + break; + case INT64_VAL: + ByteBuffer int64ByteBuffer = + ByteBuffer.allocate(Integer.BYTES + Integer.BYTES + Integer.BYTES); + int64ByteBuffer.order(ByteOrder.LITTLE_ENDIAN); + int64ByteBuffer.putInt(ValueProto.ValueType.Enum.INT64.getNumber()); + int64ByteBuffer.putInt(Integer.BYTES); + /* This is super dumb - but in https://github.com/feast-dev/feast/blob/dcae1606f53028ce5413567fb8b66f92cfef0f8e/sdk/python/feast/infra/key_encoding_utils.py#L9 + we use `struct.pack("> getFeaturesFromRedis( List featureReferences) { List> features = new ArrayList<>(); // To decode bytes back to Feature Reference - Map byteToFeatureReferenceMap = new HashMap<>(); + Map byteToFeatureReferenceMap = new HashMap<>(); // Serialize using proto List binaryRedisKeys = - redisKeys.stream().map(redisKey -> redisKey.toByteArray()).collect(Collectors.toList()); + redisKeys.stream().map(this.keySerializer::serialize).collect(Collectors.toList()); List featureReferenceWithTsByteList = new ArrayList<>(); featureReferences.stream() @@ -73,7 +78,7 @@ private List> getFeaturesFromRedis( byte[] featureReferenceBytes = RedisHashDecoder.getFeatureReferenceRedisHashKeyBytes(featureReference); featureReferenceWithTsByteList.add(featureReferenceBytes); - byteToFeatureReferenceMap.put(featureReferenceBytes.toString(), featureReference); + byteToFeatureReferenceMap.put(featureReferenceBytes, featureReference); // eg. <_ts:featuretable_name> byte[] featureTableTsBytes = @@ -97,6 +102,7 @@ private List> getFeaturesFromRedis( future -> { try { List> redisValuesList = future.get(); + List curRedisKeyFeatures = RedisHashDecoder.retrieveFeature( redisValuesList, byteToFeatureReferenceMap, timestampPrefix);