From fd9046072ba2ceb9132cc263f789a8684d66011a Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Thu, 4 Mar 2021 16:31:51 +0800 Subject: [PATCH 1/6] add redis timeout option Signed-off-by: Oleksii Moskalenko --- .../feast/serving/config/FeastProperties.java | 266 ++---------------- .../config/ServingServiceConfigV2.java | 14 +- serving/src/main/resources/application.yml | 13 +- .../storage/api/retriever/StoreConfig.java} | 11 +- storage/connectors/redis/pom.xml | 8 + .../redis/retriever/RedisClient.java | 3 +- .../redis/retriever/RedisClusterClient.java | 67 ++--- .../retriever/RedisClusterStoreConfig.java | 45 +++ .../RedisStoreConfig.java} | 29 +- .../RedisKeyPrefixSerializerV2.java | 41 --- 10 files changed, 130 insertions(+), 367 deletions(-) rename storage/{connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeySerializerV2.java => api/src/main/java/feast/storage/api/retriever/StoreConfig.java} (73%) create mode 100644 storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java rename storage/connectors/redis/src/main/java/feast/storage/connectors/redis/{serializer/RedisKeyProtoSerializerV2.java => retriever/RedisStoreConfig.java} (51%) delete mode 100644 storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerV2.java diff --git a/serving/src/main/java/feast/serving/config/FeastProperties.java b/serving/src/main/java/feast/serving/config/FeastProperties.java index bf04845..e4f883b 100644 --- a/serving/src/main/java/feast/serving/config/FeastProperties.java +++ b/serving/src/main/java/feast/serving/config/FeastProperties.java @@ -21,24 +21,18 @@ // https://www.baeldung.com/configuration-properties-in-spring-boot // https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-external-config.html#boot-features-external-config-typesafe-configuration-properties -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.util.JsonFormat; import feast.common.auth.config.SecurityProperties; import feast.common.auth.config.SecurityProperties.AuthenticationProperties; import feast.common.auth.config.SecurityProperties.AuthorizationProperties; import feast.common.auth.credentials.CoreAuthenticationProperties; import feast.common.logging.config.LoggingProperties; -import feast.proto.core.StoreProto; +import feast.storage.connectors.redis.retriever.RedisClusterStoreConfig; +import feast.storage.connectors.redis.retriever.RedisStoreConfig; +import io.lettuce.core.ReadFrom; +import java.time.Duration; import java.util.*; -import java.util.stream.Collectors; import javax.annotation.PostConstruct; -import javax.validation.ConstraintViolation; -import javax.validation.ConstraintViolationException; -import javax.validation.Validation; -import javax.validation.Validator; -import javax.validation.ValidatorFactory; +import javax.validation.*; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; import javax.validation.constraints.Positive; @@ -259,8 +253,6 @@ public static class Store { private Map config = new HashMap<>(); - private List subscriptions = new ArrayList<>(); - /** * Gets name of this store. This is unique to this specific instance. * @@ -280,12 +272,12 @@ public void setName(String name) { } /** - * Gets the store type. Example are REDIS or BIGQUERY + * Gets the store type. Example are REDIS or REDIS_CLUSTER * * @return the store type as a String. */ - public String getType() { - return type; + public StoreType getType() { + return StoreType.valueOf(this.type); } /** @@ -297,64 +289,6 @@ public void setType(String type) { this.type = type; } - /** - * Converts this {@link Store} to a {@link StoreProto.Store} - * - * @return {@link StoreProto.Store} with configuration set - * @throws InvalidProtocolBufferException the invalid protocol buffer exception - * @throws JsonProcessingException the json processing exception - */ - public StoreProto.Store toProto() - throws InvalidProtocolBufferException, JsonProcessingException { - List subscriptions = getSubscriptions(); - List subscriptionProtos = - subscriptions.stream().map(Subscription::toProto).collect(Collectors.toList()); - - StoreProto.Store.Builder storeProtoBuilder = - StoreProto.Store.newBuilder() - .setName(name) - .setType(StoreProto.Store.StoreType.valueOf(type)) - .addAllSubscriptions(subscriptionProtos); - - ObjectMapper jsonWriter = new ObjectMapper(); - - // TODO: All of this logic should be moved to the store layer. Only a Map - // should be sent to a store and it should do its own validation. - switch (StoreProto.Store.StoreType.valueOf(type)) { - case REDIS_CLUSTER: - StoreProto.Store.RedisClusterConfig.Builder redisClusterConfig = - StoreProto.Store.RedisClusterConfig.newBuilder(); - JsonFormat.parser().merge(jsonWriter.writeValueAsString(config), redisClusterConfig); - return storeProtoBuilder.setRedisClusterConfig(redisClusterConfig.build()).build(); - case REDIS: - StoreProto.Store.RedisConfig.Builder redisConfig = - StoreProto.Store.RedisConfig.newBuilder(); - JsonFormat.parser().merge(jsonWriter.writeValueAsString(config), redisConfig); - return storeProtoBuilder.setRedisConfig(redisConfig.build()).build(); - default: - throw new InvalidProtocolBufferException("Invalid store set"); - } - } - - /** - * Get the subscriptions to this specific store. The subscriptions indicate which feature sets a - * store subscribes to. - * - * @return List of subscriptions. - */ - public List getSubscriptions() { - return subscriptions; - } - - /** - * Sets the store specific configuration. See getSubscriptions() for more details. - * - * @param subscriptions the subscriptions list - */ - public void setSubscriptions(List subscriptions) { - this.subscriptions = subscriptions; - } - /** * Gets the configuration to this specific store. This is a map of strings. These options are * unique to the store. Please see protos/feast/core/Store.proto for the store specific @@ -366,6 +300,20 @@ public Map getConfig() { return config; } + public RedisClusterStoreConfig getRedisClusterConfig() { + return new RedisClusterStoreConfig( + this.config.get("connection_string"), + ReadFrom.valueOf(this.config.get("read_from")), + Duration.parse(this.config.get("timeout"))); + } + + public RedisStoreConfig getRedisConfig() { + return new RedisStoreConfig( + this.config.get("host"), + Integer.valueOf(this.config.get("port")), + Boolean.valueOf(this.config.getOrDefault("ssl", "false"))); + } + /** * Sets the store config. Please protos/feast/core/Store.proto for the specific options for each * store. @@ -375,129 +323,11 @@ public Map getConfig() { public void setConfig(Map config) { this.config = config; } - - /** - * The Subscription type. - * - *

Note: Please see protos/feast/core/CoreService.proto for details on how to subscribe to - * feature sets. - */ - public static class Subscription { - /** Feast project to subscribe to. */ - String project; - - /** Feature set to subscribe to. */ - String name; - - /** Feature set versions to subscribe to. */ - String version; - - /** Project/Feature set exclude flag to subscribe to. */ - boolean exclude; - - /** - * Gets Feast project subscribed to. - * - * @return the project string - */ - public String getProject() { - return project; - } - - /** - * Sets Feast project to subscribe to for this store. - * - * @param project the project - */ - public void setProject(String project) { - this.project = project; - } - - /** - * Gets the feature set name to subscribe to. - * - * @return the name - */ - public String getName() { - return name; - } - - /** - * Sets the feature set name to subscribe to. - * - * @param name the name - */ - public void setName(String name) { - this.name = name; - } - - /** - * Gets the feature set version that is being subscribed to by this store. - * - * @return the version - */ - public String getVersion() { - return version; - } - - /** - * Sets the feature set version that is being subscribed to by this store. - * - * @param version the version - */ - public void setVersion(String version) { - this.version = version; - } - - /** - * Gets the exclude flag to subscribe to. - * - * @return the exclude flag - */ - public boolean getExclude() { - return exclude; - } - - /** - * Sets the exclude flag to subscribe to. - * - * @param exclude the exclude flag - */ - public void setExclude(boolean exclude) { - this.exclude = exclude; - } - - /** - * Convert this {@link Subscription} to a {@link StoreProto.Store.Subscription}. - * - * @return the store proto . store . subscription - */ - public StoreProto.Store.Subscription toProto() { - return StoreProto.Store.Subscription.newBuilder() - .setName(getName()) - .setProject(getProject()) - .setExclude(getExclude()) - .build(); - } - } } - /** - * Gets job store properties - * - * @return the job store properties - */ - public JobStoreProperties getJobStore() { - return jobStore; - } - - /** - * Set job store properties - * - * @param jobStore Job store properties to set - */ - public void setJobStore(JobStoreProperties jobStore) { - this.jobStore = jobStore; + public enum StoreType { + REDIS, + REDIS_CLUSTER; } /** @@ -532,52 +362,6 @@ public void setLogging(LoggingProperties logging) { this.logging = logging; } - /** The type Job store properties. */ - public static class JobStoreProperties { - - /** Job Store Redis Host */ - private String redisHost; - - /** Job Store Redis Host */ - private int redisPort; - - /** - * Gets redis host. - * - * @return the redis host - */ - public String getRedisHost() { - return redisHost; - } - - /** - * Sets redis host. - * - * @param redisHost the redis host - */ - public void setRedisHost(String redisHost) { - this.redisHost = redisHost; - } - - /** - * Gets redis port. - * - * @return the redis port - */ - public int getRedisPort() { - return redisPort; - } - - /** - * Sets redis port. - * - * @param redisPort the redis port - */ - public void setRedisPort(int redisPort) { - this.redisPort = redisPort; - } - } - /** Trace metric collection properties */ public static class TracingProperties { diff --git a/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java b/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java index 9ec35c3..518f3a1 100644 --- a/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java +++ b/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java @@ -18,7 +18,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.protobuf.InvalidProtocolBufferException; -import feast.proto.core.StoreProto; import feast.serving.service.OnlineServingServiceV2; import feast.serving.service.ServingServiceV2; import feast.serving.specs.CachedSpecService; @@ -39,26 +38,19 @@ public ServingServiceV2 servingServiceV2( throws InvalidProtocolBufferException, JsonProcessingException { ServingServiceV2 servingService = null; FeastProperties.Store store = feastProperties.getActiveStore(); - StoreProto.Store.StoreType storeType = store.toProto().getType(); - switch (storeType) { + switch (store.getType()) { case REDIS_CLUSTER: RedisClientAdapter redisClusterClient = - RedisClusterClient.create(store.toProto().getRedisClusterConfig()); + RedisClusterClient.create(store.getRedisClusterConfig()); OnlineRetrieverV2 redisClusterRetriever = new OnlineRetriever(redisClusterClient); servingService = new OnlineServingServiceV2(redisClusterRetriever, specService, tracer); break; case REDIS: - RedisClientAdapter redisClient = RedisClient.create(store.toProto().getRedisConfig()); + RedisClientAdapter redisClient = RedisClient.create(store.getRedisConfig()); OnlineRetrieverV2 redisRetriever = new OnlineRetriever(redisClient); servingService = new OnlineServingServiceV2(redisRetriever, specService, tracer); break; - case UNRECOGNIZED: - case INVALID: - throw new IllegalArgumentException( - String.format( - "Unsupported store type '%s' for store name '%s'", - store.getType(), store.getName())); } return servingService; diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml index 288ec7e..b20fd8e 100644 --- a/serving/src/main/resources/application.yml +++ b/serving/src/main/resources/application.yml @@ -41,26 +41,19 @@ feast: stores: # Please see https://api.docs.feast.dev/grpc/feast.core.pb.html#Store for configuration options - name: online # Name of the store (referenced by active_store) - type: REDIS # Type of the store. REDIS, REDIS_CLUSTER, BIGQUERY are available options + type: REDIS # Type of the store. REDIS, REDIS_CLUSTER are available options config: # Store specific configuration. See host: localhost port: 6379 # Subscriptions indicate which feature sets needs to be retrieved and used to populate this store - subscriptions: - # Wildcards match all options. No filtering is done. - - name: "*" - project: "*" - name: online_cluster type: REDIS_CLUSTER config: # Store specific configuration. # Connection string specifies the host:port of Redis instances in the redis cluster. connection_string: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005" read_from: MASTER - subscriptions: - - name: "*" - project: "*" - version: "*" - + # Redis operation timeout in ISO-8601 format + timeout: PT0.5S tracing: # If true, Feast will provide tracing data (using OpenTracing API) for various RPC method calls # which can be useful to debug performance issues and perform benchmarking diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeySerializerV2.java b/storage/api/src/main/java/feast/storage/api/retriever/StoreConfig.java similarity index 73% rename from storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeySerializerV2.java rename to storage/api/src/main/java/feast/storage/api/retriever/StoreConfig.java index b79e158..dd7b0c1 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeySerializerV2.java +++ b/storage/api/src/main/java/feast/storage/api/retriever/StoreConfig.java @@ -1,6 +1,6 @@ /* * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors + * Copyright 2018-2021 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,11 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.storage.connectors.redis.serializer; +package feast.storage.api.retriever; -import feast.proto.storage.RedisProto.RedisKeyV2; - -public interface RedisKeySerializerV2 { - - byte[] serialize(RedisKeyV2 key); -} +public interface StoreConfig {} diff --git a/storage/connectors/redis/pom.xml b/storage/connectors/redis/pom.xml index bbda8da..f65cbd0 100644 --- a/storage/connectors/redis/pom.xml +++ b/storage/connectors/redis/pom.xml @@ -16,6 +16,14 @@ io.lettuce lettuce-core + 6.0.2.RELEASE + + + + io.netty + netty-transport-native-epoll + 4.1.52.Final + linux-x86_64 diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClient.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClient.java index 5a7f4b7..faa8e96 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClient.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClient.java @@ -16,7 +16,6 @@ */ package feast.storage.connectors.redis.retriever; -import feast.proto.core.StoreProto; import io.lettuce.core.KeyValue; import io.lettuce.core.RedisFuture; import io.lettuce.core.RedisURI; @@ -46,7 +45,7 @@ private RedisClient(StatefulRedisConnection connection) { this.asyncCommands.setAutoFlushCommands(false); } - public static RedisClientAdapter create(StoreProto.Store.RedisConfig config) { + public static RedisClientAdapter create(RedisStoreConfig config) { RedisURI uri = RedisURI.create(config.getHost(), config.getPort()); diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java index aeb8220..917f331 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java @@ -16,36 +16,19 @@ */ package feast.storage.connectors.redis.retriever; -import com.google.common.collect.ImmutableMap; -import feast.proto.core.StoreProto; -import feast.proto.core.StoreProto.Store.RedisClusterConfig; -import feast.storage.connectors.redis.serializer.RedisKeyPrefixSerializerV2; -import feast.storage.connectors.redis.serializer.RedisKeySerializerV2; -import io.lettuce.core.KeyValue; -import io.lettuce.core.ReadFrom; -import io.lettuce.core.RedisFuture; -import io.lettuce.core.RedisURI; +import io.lettuce.core.*; +import io.lettuce.core.cluster.ClusterClientOptions; +import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; import io.lettuce.core.codec.ByteArrayCodec; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; -import javax.annotation.Nullable; public class RedisClusterClient implements RedisClientAdapter { private final RedisAdvancedClusterAsyncCommands asyncCommands; - private final RedisKeySerializerV2 serializer; - @Nullable private final RedisKeySerializerV2 fallbackSerializer; - - private static final Map PROTO_TO_LETTUCE_TYPES = - ImmutableMap.of( - RedisClusterConfig.ReadFrom.MASTER, ReadFrom.MASTER, - RedisClusterConfig.ReadFrom.MASTER_PREFERRED, ReadFrom.MASTER_PREFERRED, - RedisClusterConfig.ReadFrom.REPLICA, ReadFrom.REPLICA, - RedisClusterConfig.ReadFrom.REPLICA_PREFERRED, ReadFrom.REPLICA_PREFERRED); @Override public RedisFuture>> hmget(byte[] key, byte[]... fields) { @@ -59,19 +42,9 @@ public void flushCommands() { static class Builder { private final StatefulRedisClusterConnection connection; - private final RedisKeySerializerV2 serializer; - @Nullable private RedisKeySerializerV2 fallbackSerializer; - Builder( - StatefulRedisClusterConnection connection, - RedisKeySerializerV2 serializer) { + Builder(StatefulRedisClusterConnection connection) { this.connection = connection; - this.serializer = serializer; - } - - Builder withFallbackSerializer(RedisKeySerializerV2 fallbackSerializer) { - this.fallbackSerializer = fallbackSerializer; - return this; } RedisClusterClient build() { @@ -81,8 +54,6 @@ RedisClusterClient build() { private RedisClusterClient(Builder builder) { this.asyncCommands = builder.connection.async(); - this.serializer = builder.serializer; - this.fallbackSerializer = builder.fallbackSerializer; // allows reading from replicas this.asyncCommands.readOnly(); @@ -91,7 +62,7 @@ private RedisClusterClient(Builder builder) { this.asyncCommands.setAutoFlushCommands(false); } - public static RedisClientAdapter create(StoreProto.Store.RedisClusterConfig config) { + public static RedisClientAdapter create(RedisClusterStoreConfig config) { List redisURIList = Arrays.stream(config.getConnectionString().split(",")) .map( @@ -100,22 +71,22 @@ public static RedisClientAdapter create(StoreProto.Store.RedisClusterConfig conf return RedisURI.create(hostPortSplit[0], Integer.parseInt(hostPortSplit[1])); }) .collect(Collectors.toList()); - StatefulRedisClusterConnection connection = - io.lettuce.core.cluster.RedisClusterClient.create(redisURIList) - .connect(new ByteArrayCodec()); - connection.setReadFrom(PROTO_TO_LETTUCE_TYPES.get(config.getReadFrom())); + io.lettuce.core.cluster.RedisClusterClient client = + io.lettuce.core.cluster.RedisClusterClient.create(redisURIList); + client.setOptions( + ClusterClientOptions.builder() + .socketOptions(SocketOptions.builder().keepAlive(true).tcpNoDelay(true).build()) + .timeoutOptions(TimeoutOptions.enabled(config.getTimeout())) + .pingBeforeActivateConnection(true) + .topologyRefreshOptions( + ClusterTopologyRefreshOptions.builder().enableAllAdaptiveRefreshTriggers().build()) + .build()); - RedisKeySerializerV2 serializer = new RedisKeyPrefixSerializerV2(config.getKeyPrefix()); - - Builder builder = new Builder(connection, serializer); - - if (config.getEnableFallback()) { - RedisKeySerializerV2 fallbackSerializer = - new RedisKeyPrefixSerializerV2(config.getKeyPrefix()); - builder = builder.withFallbackSerializer(fallbackSerializer); - } + StatefulRedisClusterConnection connection = + client.connect(new ByteArrayCodec()); + connection.setReadFrom(config.getmReadFrom()); - return builder.build(); + return new Builder(connection).build(); } } diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java new file mode 100644 index 0000000..c6d81ec --- /dev/null +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2021 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.storage.connectors.redis.retriever; + +import feast.storage.api.retriever.StoreConfig; +import io.lettuce.core.ReadFrom; +import java.time.Duration; + +public class RedisClusterStoreConfig implements StoreConfig { + private final String connectionString; + private final ReadFrom readFrom; + private final Duration timeout; + + public RedisClusterStoreConfig(String connectionString, ReadFrom readFrom, Duration timeout) { + this.connectionString = connectionString; + this.readFrom = readFrom; + this.timeout = timeout; + } + + public String getConnectionString() { + return this.connectionString; + } + + public ReadFrom getReadFrom() { + return this.readFrom; + } + + public Duration getTimeout() { + return this.timeout; + } +} diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyProtoSerializerV2.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisStoreConfig.java similarity index 51% rename from storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyProtoSerializerV2.java rename to storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisStoreConfig.java index 252d6d1..de46a53 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyProtoSerializerV2.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisStoreConfig.java @@ -1,6 +1,6 @@ /* * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors + * Copyright 2018-2021 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,13 +14,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.storage.connectors.redis.serializer; +package feast.storage.connectors.redis.retriever; -import feast.proto.storage.RedisProto.RedisKeyV2; +import feast.storage.api.retriever.StoreConfig; -public class RedisKeyProtoSerializerV2 implements RedisKeySerializerV2 { +public class RedisStoreConfig implements StoreConfig { + private final String host; + private final Integer port; + private final Boolean ssl; - public byte[] serialize(RedisKeyV2 redisKey) { - return redisKey.toByteArray(); + public RedisStoreConfig(String host, Integer port, Boolean ssl) { + this.host = host; + this.port = port; + this.ssl = ssl; + } + + public String getHost() { + return this.host; + } + + public Integer getPort() { + return this.port; + } + + public Boolean getSsl() { + return this.ssl; } } diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerV2.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerV2.java deleted file mode 100644 index 1c869b4..0000000 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerV2.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.storage.connectors.redis.serializer; - -import feast.proto.storage.RedisProto.RedisKeyV2; - -public class RedisKeyPrefixSerializerV2 implements RedisKeySerializerV2 { - - private final byte[] prefixBytes; - - public RedisKeyPrefixSerializerV2(String prefix) { - this.prefixBytes = prefix.getBytes(); - } - - public byte[] serialize(RedisKeyV2 redisKey) { - byte[] key = redisKey.toByteArray(); - - if (prefixBytes.length == 0) { - return key; - } - - byte[] keyWithPrefix = new byte[prefixBytes.length + key.length]; - System.arraycopy(prefixBytes, 0, keyWithPrefix, 0, prefixBytes.length); - System.arraycopy(key, 0, keyWithPrefix, prefixBytes.length, key.length); - return keyWithPrefix; - } -} From b3119059ca57d5cbff0272885cad486b379dbd31 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Thu, 4 Mar 2021 16:38:37 +0800 Subject: [PATCH 2/6] fix typo Signed-off-by: Oleksii Moskalenko --- .../storage/connectors/redis/retriever/RedisClusterClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java index 917f331..5395b72 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java @@ -85,7 +85,7 @@ public static RedisClientAdapter create(RedisClusterStoreConfig config) { StatefulRedisClusterConnection connection = client.connect(new ByteArrayCodec()); - connection.setReadFrom(config.getmReadFrom()); + connection.setReadFrom(config.getReadFrom()); return new Builder(connection).build(); } From 83b4ef135fc4e7b14298bd4386d29cbca582c8d1 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Thu, 4 Mar 2021 16:42:22 +0800 Subject: [PATCH 3/6] remove StoreProto.Store Signed-off-by: Oleksii Moskalenko --- .../feast/serving/config/FeastProperties.java | 3 -- .../serving/config/SpecServiceConfig.java | 7 +-- .../serving/specs/CachedSpecService.java | 4 +- .../service/CachedSpecServiceTest.java | 15 +------ .../RedisKeyPrefixSerializerTest.java | 45 ------------------- 5 files changed, 4 insertions(+), 70 deletions(-) delete mode 100644 storage/connectors/redis/src/test/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerTest.java diff --git a/serving/src/main/java/feast/serving/config/FeastProperties.java b/serving/src/main/java/feast/serving/config/FeastProperties.java index e4f883b..3b8548a 100644 --- a/serving/src/main/java/feast/serving/config/FeastProperties.java +++ b/serving/src/main/java/feast/serving/config/FeastProperties.java @@ -140,9 +140,6 @@ public void setActiveStore(String activeStore) { */ private List stores = new ArrayList<>(); - /* Job Store properties to retain state of async jobs. */ - private JobStoreProperties jobStore; - /* Metric tracing properties. */ private TracingProperties tracing; diff --git a/serving/src/main/java/feast/serving/config/SpecServiceConfig.java b/serving/src/main/java/feast/serving/config/SpecServiceConfig.java index b41a0f5..369d543 100644 --- a/serving/src/main/java/feast/serving/config/SpecServiceConfig.java +++ b/serving/src/main/java/feast/serving/config/SpecServiceConfig.java @@ -18,7 +18,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.protobuf.InvalidProtocolBufferException; -import feast.proto.core.StoreProto; import feast.serving.specs.CachedSpecService; import feast.serving.specs.CoreSpecService; import io.grpc.CallCredentials; @@ -61,13 +60,11 @@ public ScheduledExecutorService cachedSpecServiceScheduledExecutorService( } @Bean - public CachedSpecService specService( - FeastProperties feastProperties, ObjectProvider callCredentials) + public CachedSpecService specService(ObjectProvider callCredentials) throws InvalidProtocolBufferException, JsonProcessingException { CoreSpecService coreService = new CoreSpecService(feastCoreHost, feastCorePort, callCredentials); - StoreProto.Store storeProto = feastProperties.getActiveStore().toProto(); - CachedSpecService cachedSpecStorage = new CachedSpecService(coreService, storeProto); + CachedSpecService cachedSpecStorage = new CachedSpecService(coreService); try { cachedSpecStorage.populateCache(); } catch (Exception e) { diff --git a/serving/src/main/java/feast/serving/specs/CachedSpecService.java b/serving/src/main/java/feast/serving/specs/CachedSpecService.java index f54e08f..c226406 100644 --- a/serving/src/main/java/feast/serving/specs/CachedSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CachedSpecService.java @@ -26,7 +26,6 @@ import feast.proto.core.FeatureProto; import feast.proto.core.FeatureTableProto.FeatureTable; import feast.proto.core.FeatureTableProto.FeatureTableSpec; -import feast.proto.core.StoreProto; import feast.proto.core.StoreProto.Store; import feast.proto.serving.ServingAPIProto; import feast.serving.exception.SpecRetrievalException; @@ -69,9 +68,8 @@ public class CachedSpecService { ImmutablePair, FeatureProto.FeatureSpecV2> featureCache; - public CachedSpecService(CoreSpecService coreService, StoreProto.Store store) { + public CachedSpecService(CoreSpecService coreService) { this.coreService = coreService; - this.store = coreService.registerStore(store); CacheLoader, FeatureTableSpec> featureTableCacheLoader = CacheLoader.from(k -> retrieveSingleFeatureTable(k.getLeft(), k.getRight())); diff --git a/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java b/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java index 57932d4..9a3bd52 100644 --- a/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java +++ b/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java @@ -32,7 +32,6 @@ import feast.proto.core.CoreServiceProto.ListProjectsResponse; import feast.proto.core.FeatureTableProto; import feast.proto.core.FeatureTableProto.FeatureTableSpec; -import feast.proto.core.StoreProto.Store; import feast.proto.serving.ServingAPIProto.FeatureReferenceV2; import feast.proto.types.ValueProto; import feast.serving.specs.CachedSpecService; @@ -45,8 +44,6 @@ public class CachedSpecServiceTest { - private Store store; - @Rule public final ExpectedException expectedException = ExpectedException.none(); @Mock CoreSpecService coreService; @@ -63,8 +60,6 @@ public class CachedSpecServiceTest { public void setUp() { initMocks(this); - this.store = Store.newBuilder().build(); - this.setupProject("default"); this.featureTableEntities = ImmutableList.of("entity1"); this.featureTable1Features = @@ -94,8 +89,7 @@ public void setUp() { this.setupFeatureTableAndProject("default"); - when(this.coreService.registerStore(store)).thenReturn(store); - cachedSpecService = new CachedSpecService(this.coreService, this.store); + cachedSpecService = new CachedSpecService(this.coreService); } private void setupProject(String project) { @@ -125,13 +119,6 @@ public void shouldRegisterStoreWithCore() { verify(coreService, times(1)).registerStore(cachedSpecService.getStore()); } - @Test - public void shouldPopulateAndReturnStore() { - cachedSpecService.populateCache(); - Store actual = cachedSpecService.getStore(); - assertThat(actual, equalTo(store)); - } - @Test public void shouldPopulateAndReturnDifferentFeatureTables() { // test that CachedSpecService can retrieve fully qualified feature references. diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerTest.java deleted file mode 100644 index e663cf8..0000000 --- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.storage.connectors.redis.serializer; - -import static org.junit.Assert.*; - -import feast.proto.storage.RedisProto.RedisKeyV2; -import feast.proto.types.ValueProto; -import org.junit.Test; - -public class RedisKeyPrefixSerializerTest { - - private RedisKeyV2 key = - RedisKeyV2.newBuilder() - .addEntityNames("entity1") - .addEntityValues(ValueProto.Value.newBuilder().setInt64Val(1)) - .build(); - - @Test - public void shouldPrependKey() { - RedisKeyPrefixSerializerV2 serializer = new RedisKeyPrefixSerializerV2("namespace:"); - String keyWithPrefix = new String(serializer.serialize(key)); - assertEquals(String.format("namespace:%s", new String(key.toByteArray())), keyWithPrefix); - } - - @Test - public void shouldNotPrependKeyIfEmptyString() { - RedisKeyPrefixSerializerV2 serializer = new RedisKeyPrefixSerializerV2(""); - assertArrayEquals(key.toByteArray(), serializer.serialize(key)); - } -} From b7cfc91691683dcd23bec0824801c5de1342d2d1 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Thu, 4 Mar 2021 16:46:09 +0800 Subject: [PATCH 4/6] remove StoreProto.Store Signed-off-by: Oleksii Moskalenko --- .../controller/HealthServiceController.java | 2 -- .../serving/specs/CachedSpecService.java | 11 ---------- .../feast/serving/specs/CoreSpecService.java | 21 ------------------- 3 files changed, 34 deletions(-) diff --git a/serving/src/main/java/feast/serving/controller/HealthServiceController.java b/serving/src/main/java/feast/serving/controller/HealthServiceController.java index 6615cf5..4bee981 100644 --- a/serving/src/main/java/feast/serving/controller/HealthServiceController.java +++ b/serving/src/main/java/feast/serving/controller/HealthServiceController.java @@ -16,7 +16,6 @@ */ package feast.serving.controller; -import feast.proto.core.StoreProto.Store; import feast.proto.serving.ServingAPIProto.GetFeastServingInfoRequest; import feast.serving.interceptors.GrpcMonitoringInterceptor; import feast.serving.service.ServingServiceV2; @@ -51,7 +50,6 @@ public void check( // Implement similary for batch service. try { - Store store = specService.getStore(); servingService.getFeastServingInfo(GetFeastServingInfoRequest.getDefaultInstance()); responseObserver.onNext( HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build()); diff --git a/serving/src/main/java/feast/serving/specs/CachedSpecService.java b/serving/src/main/java/feast/serving/specs/CachedSpecService.java index c226406..440b224 100644 --- a/serving/src/main/java/feast/serving/specs/CachedSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CachedSpecService.java @@ -26,7 +26,6 @@ import feast.proto.core.FeatureProto; import feast.proto.core.FeatureTableProto.FeatureTable; import feast.proto.core.FeatureTableProto.FeatureTableSpec; -import feast.proto.core.StoreProto.Store; import feast.proto.serving.ServingAPIProto; import feast.serving.exception.SpecRetrievalException; import io.grpc.StatusRuntimeException; @@ -46,7 +45,6 @@ public class CachedSpecService { private static final String DEFAULT_PROJECT_NAME = "default"; private final CoreSpecService coreService; - private Store store; private static Gauge cacheLastUpdated = Gauge.build() @@ -83,15 +81,6 @@ public CachedSpecService(CoreSpecService coreService) { featureCache = CacheBuilder.newBuilder().build(featureCacheLoader); } - /** - * Get the current store configuration. - * - * @return StoreProto.Store store configuration for this serving instance - */ - public Store getStore() { - return this.store; - } - /** * Reload the store configuration from the given config path, then retrieve the necessary specs * from core to preload the cache. diff --git a/serving/src/main/java/feast/serving/specs/CoreSpecService.java b/serving/src/main/java/feast/serving/specs/CoreSpecService.java index 5429d22..eee50d8 100644 --- a/serving/src/main/java/feast/serving/specs/CoreSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CoreSpecService.java @@ -24,7 +24,6 @@ import feast.proto.core.CoreServiceProto.ListProjectsResponse; import feast.proto.core.CoreServiceProto.UpdateStoreRequest; import feast.proto.core.CoreServiceProto.UpdateStoreResponse; -import feast.proto.core.StoreProto.Store; import io.grpc.CallCredentials; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -53,26 +52,6 @@ public UpdateStoreResponse updateStore(UpdateStoreRequest updateStoreRequest) { return blockingStub.updateStore(updateStoreRequest); } - /** - * Register the given store entry in Feast Core. If store already exists in Feast Core, updates - * the store entry in feast core. - * - * @param store entry to register/update in Feast Core. - * @return The register/updated store entry - */ - public Store registerStore(Store store) { - UpdateStoreRequest request = UpdateStoreRequest.newBuilder().setStore(store).build(); - try { - UpdateStoreResponse updateStoreResponse = this.updateStore(request); - if (!updateStoreResponse.getStore().equals(store)) { - throw new RuntimeException("Core store config not matching current store config"); - } - return updateStoreResponse.getStore(); - } catch (Exception e) { - throw new RuntimeException("Unable to update store configuration", e); - } - } - public ListProjectsResponse listProjects(ListProjectsRequest listProjectsRequest) { return blockingStub.listProjects(listProjectsRequest); } From 8ec486176689f71eb1ee9e129f6d895a0fef2808 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Thu, 4 Mar 2021 16:52:06 +0800 Subject: [PATCH 5/6] fix test Signed-off-by: Oleksii Moskalenko --- .../java/feast/serving/service/CachedSpecServiceTest.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java b/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java index 9a3bd52..4e48b64 100644 --- a/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java +++ b/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java @@ -18,8 +18,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -114,11 +112,6 @@ private void setupFeatureTableAndProject(String project) { .build()); } - @Test - public void shouldRegisterStoreWithCore() { - verify(coreService, times(1)).registerStore(cachedSpecService.getStore()); - } - @Test public void shouldPopulateAndReturnDifferentFeatureTables() { // test that CachedSpecService can retrieve fully qualified feature references. From 7eb226d722016bfd575e5bf8f37730f9073c8825 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Thu, 4 Mar 2021 17:57:30 +0800 Subject: [PATCH 6/6] clean up Signed-off-by: Oleksii Moskalenko --- .../storage/api/retriever/StoreConfig.java | 19 ------------------- .../retriever/RedisClusterStoreConfig.java | 3 +-- .../redis/retriever/RedisStoreConfig.java | 4 +--- 3 files changed, 2 insertions(+), 24 deletions(-) delete mode 100644 storage/api/src/main/java/feast/storage/api/retriever/StoreConfig.java diff --git a/storage/api/src/main/java/feast/storage/api/retriever/StoreConfig.java b/storage/api/src/main/java/feast/storage/api/retriever/StoreConfig.java deleted file mode 100644 index dd7b0c1..0000000 --- a/storage/api/src/main/java/feast/storage/api/retriever/StoreConfig.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2021 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.storage.api.retriever; - -public interface StoreConfig {} diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java index c6d81ec..c179ffe 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java @@ -16,11 +16,10 @@ */ package feast.storage.connectors.redis.retriever; -import feast.storage.api.retriever.StoreConfig; import io.lettuce.core.ReadFrom; import java.time.Duration; -public class RedisClusterStoreConfig implements StoreConfig { +public class RedisClusterStoreConfig { private final String connectionString; private final ReadFrom readFrom; private final Duration timeout; diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisStoreConfig.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisStoreConfig.java index de46a53..5e4560a 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisStoreConfig.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisStoreConfig.java @@ -16,9 +16,7 @@ */ package feast.storage.connectors.redis.retriever; -import feast.storage.api.retriever.StoreConfig; - -public class RedisStoreConfig implements StoreConfig { +public class RedisStoreConfig { private final String host; private final Integer port; private final Boolean ssl;