From 7d6208700b86edd4019c684216355c2799da5589 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Wed, 14 Apr 2021 15:27:26 +0800 Subject: [PATCH] all feast types supported by avro decoder Signed-off-by: Oleksii Moskalenko --- .../serving/it/ServingServiceBigTableIT.java | 143 ++++++++++++++- storage/api/pom.xml | 6 + .../storage/api/retriever/AvroFeature.java | 171 ++++++++++++++++++ .../storage/api/retriever/NativeFeature.java | 95 ---------- .../retriever/BigTableOnlineRetriever.java | 6 +- .../retriever/CassandraOnlineRetriever.java | 6 +- 6 files changed, 317 insertions(+), 110 deletions(-) create mode 100644 storage/api/src/main/java/feast/storage/api/retriever/AvroFeature.java delete mode 100644 storage/api/src/main/java/feast/storage/api/retriever/NativeFeature.java diff --git a/serving/src/test/java/feast/serving/it/ServingServiceBigTableIT.java b/serving/src/test/java/feast/serving/it/ServingServiceBigTableIT.java index f90a956..0844cbc 100644 --- a/serving/src/test/java/feast/serving/it/ServingServiceBigTableIT.java +++ b/serving/src/test/java/feast/serving/it/ServingServiceBigTableIT.java @@ -47,6 +47,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.HashMap; import java.util.List; @@ -54,10 +55,12 @@ import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; -import org.apache.avro.io.*; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; import org.junit.ClassRule; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -140,9 +143,6 @@ static void globalSetup() throws IOException { + ":" + environment.getServicePort("bigtable_1", BIGTABLE_PORT); channel = ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build(); - TransportChannelProvider channelProvider = - FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); - NoCredentialsProvider credentialsProvider = NoCredentialsProvider.create(); /** Feast resource creation Workflow */ String projectName = "default"; @@ -210,9 +210,6 @@ static void globalSetup() throws IOException { ImmutableList compoundColumnFamilies = ImmutableList.of(rideMerchantFeatureTableName, metadataColumnFamily); - createTable(channelProvider, credentialsProvider, btTableName, columnFamilies); - createTable(channelProvider, credentialsProvider, compoundBtTableName, compoundColumnFamilies); - /** Single Entity Ingestion Workflow */ Schema ftSchema = SchemaBuilder.record("DriverData") @@ -319,7 +316,9 @@ private static void createTable( for (String columnFamily : columnFamilies) { createTableRequest.addFamily(columnFamily); } - client.createTable(createTableRequest); + if (!client.exists(tableName)) { + client.createTable(createTableRequest); + } } } @@ -348,17 +347,31 @@ private static byte[] createEntityValue( return entityFeatureValue; } + private static byte[] schemaReference(Schema schema) { + return Hashing.murmur3_32().hashBytes(schema.toString().getBytes()).asBytes(); + } + private static void ingestData( String featureTableName, String btTableName, byte[] btEntityFeatureKey, byte[] btEntityFeatureValue, byte[] btSchemaKey, - Schema btSchema) { + Schema btSchema) + throws IOException { String emptyQualifier = ""; String avroQualifier = "avro"; String metadataColumnFamily = "metadata"; + TransportChannelProvider channelProvider = + FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); + NoCredentialsProvider credentialsProvider = NoCredentialsProvider.create(); + createTable( + channelProvider, + credentialsProvider, + btTableName, + ImmutableList.of(featureTableName, metadataColumnFamily)); + // Update Compound Entity-Feature Row client.mutateRow( RowMutation.create(btTableName, ByteString.copyFrom(btEntityFeatureKey)) @@ -601,6 +614,118 @@ public void shouldReturnCorrectRowCount() { assertEquals(expectedFieldValuesList, featureResponse.getFieldValuesList()); } + @Test + public void shouldSupportAllFeastTypes() throws IOException { + EntityProto.EntitySpecV2 entitySpec = + EntityProto.EntitySpecV2.newBuilder() + .setName("entity") + .setDescription("") + .setValueType(ValueProto.ValueType.Enum.STRING) + .build(); + TestUtils.applyEntity(coreClient, "default", entitySpec); + + ImmutableMap allTypesFeatures = + new ImmutableMap.Builder() + .put("f_int64", ValueProto.ValueType.Enum.INT64) + .put("f_int32", ValueProto.ValueType.Enum.INT32) + .put("f_float", ValueProto.ValueType.Enum.FLOAT) + .put("f_double", ValueProto.ValueType.Enum.DOUBLE) + .put("f_string", ValueProto.ValueType.Enum.STRING) + .put("f_bytes", ValueProto.ValueType.Enum.BYTES) + .put("f_bool", ValueProto.ValueType.Enum.BOOL) + .put("f_int64_list", ValueProto.ValueType.Enum.INT64_LIST) + .put("f_int32_list", ValueProto.ValueType.Enum.INT32_LIST) + .put("f_float_list", ValueProto.ValueType.Enum.FLOAT_LIST) + .put("f_double_list", ValueProto.ValueType.Enum.DOUBLE_LIST) + .put("f_string_list", ValueProto.ValueType.Enum.STRING_LIST) + .put("f_bytes_list", ValueProto.ValueType.Enum.BYTES_LIST) + .put("f_bool_list", ValueProto.ValueType.Enum.BOOL_LIST) + .build(); + + TestUtils.applyFeatureTable( + coreClient, "default", "all_types", ImmutableList.of("entity"), allTypesFeatures, 7200); + + Schema schema = + SchemaBuilder.record("AllTypesRecord") + .namespace("") + .fields() + .requiredLong("f_int64") + .requiredInt("f_int32") + .requiredFloat("f_float") + .requiredDouble("f_double") + .requiredString("f_string") + .requiredBytes("f_bytes") + .requiredBoolean("f_bool") + .name("f_int64_list") + .type(SchemaBuilder.array().items(SchemaBuilder.builder().longType())) + .noDefault() + .name("f_int32_list") + .type(SchemaBuilder.array().items(SchemaBuilder.builder().intType())) + .noDefault() + .name("f_float_list") + .type(SchemaBuilder.array().items(SchemaBuilder.builder().floatType())) + .noDefault() + .name("f_double_list") + .type(SchemaBuilder.array().items(SchemaBuilder.builder().doubleType())) + .noDefault() + .name("f_string_list") + .type(SchemaBuilder.array().items(SchemaBuilder.builder().stringType())) + .noDefault() + .name("f_bytes_list") + .type(SchemaBuilder.array().items(SchemaBuilder.builder().bytesType())) + .noDefault() + .name("f_bool_list") + .type(SchemaBuilder.array().items(SchemaBuilder.builder().booleanType())) + .noDefault() + .endRecord(); + + GenericData.Record record = + new GenericRecordBuilder(schema) + .set("f_int64", 10L) + .set("f_int32", 10) + .set("f_float", 10.0) + .set("f_double", 10.0D) + .set("f_string", "test") + .set("f_bytes", ByteBuffer.wrap("test".getBytes())) + .set("f_bool", true) + .set("f_int64_list", ImmutableList.of(10L)) + .set("f_int32_list", ImmutableList.of(10)) + .set("f_float_list", ImmutableList.of(10.0)) + .set("f_double_list", ImmutableList.of(10.0D)) + .set("f_string_list", ImmutableList.of("test")) + .set("f_bytes_list", ImmutableList.of(ByteBuffer.wrap("test".getBytes()))) + .set("f_bool_list", ImmutableList.of(true)) + .build(); + + ValueProto.Value entity = DataGenerator.createStrValue("key"); + + ingestData( + "all_types", + "default__entity", + entity.getStringVal().getBytes(), + createEntityValue(schema, schemaReference(schema), record), + createSchemaKey(schemaReference(schema)), + schema); + + GetOnlineFeaturesRequestV2 onlineFeatureRequest = + TestUtils.createOnlineFeatureRequest( + "default", + allTypesFeatures.keySet().stream() + .map( + f -> + FeatureReferenceV2.newBuilder() + .setFeatureTable("all_types") + .setName(f) + .build()) + .collect(Collectors.toList()), + ImmutableList.of(DataGenerator.createEntityRow("entity", entity, 100))); + GetOnlineFeaturesResponse featureResponse = + servingStub.getOnlineFeaturesV2(onlineFeatureRequest); + + assert featureResponse.getFieldValues(0).getStatusesMap().values().stream() + .allMatch(status -> status.equals(GetOnlineFeaturesResponse.FieldStatus.PRESENT)); + } + @TestConfiguration public static class TestConfig { @Bean diff --git a/storage/api/pom.xml b/storage/api/pom.xml index cc2f84e..4e7ad39 100644 --- a/storage/api/pom.xml +++ b/storage/api/pom.xml @@ -61,6 +61,12 @@ 3.9 + + org.apache.avro + avro + 1.10.2 + + junit junit diff --git a/storage/api/src/main/java/feast/storage/api/retriever/AvroFeature.java b/storage/api/src/main/java/feast/storage/api/retriever/AvroFeature.java new file mode 100644 index 0000000..96f19cc --- /dev/null +++ b/storage/api/src/main/java/feast/storage/api/retriever/AvroFeature.java @@ -0,0 +1,171 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2021 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.storage.api.retriever; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import feast.proto.serving.ServingAPIProto; +import feast.proto.types.ValueProto; +import java.nio.ByteBuffer; +import java.util.stream.Collectors; +import org.apache.avro.generic.GenericData; +import org.apache.avro.util.Utf8; + +public class AvroFeature implements Feature { + private final ServingAPIProto.FeatureReferenceV2 featureReference; + + private final Timestamp eventTimestamp; + + private final Object featureValue; + + public AvroFeature( + ServingAPIProto.FeatureReferenceV2 featureReference, + Timestamp eventTimestamp, + Object featureValue) { + this.featureReference = featureReference; + this.eventTimestamp = eventTimestamp; + this.featureValue = featureValue; + } + + /** + * Casts feature value of Object type based on Feast valueType. Empty object i.e new Object() is + * interpreted as VAL_NOT_SET Feast valueType. + * + * @param valueType Feast valueType of feature as specified in FeatureSpec + * @return ValueProto.Value representation of feature + */ + @Override + public ValueProto.Value getFeatureValue(ValueProto.ValueType.Enum valueType) { + ValueProto.Value finalValue; + + try { + switch (valueType) { + case STRING: + finalValue = + ValueProto.Value.newBuilder().setStringVal(((Utf8) featureValue).toString()).build(); + break; + case INT32: + finalValue = ValueProto.Value.newBuilder().setInt32Val((Integer) featureValue).build(); + break; + case INT64: + finalValue = ValueProto.Value.newBuilder().setInt64Val((Long) featureValue).build(); + break; + case DOUBLE: + finalValue = ValueProto.Value.newBuilder().setDoubleVal((Double) featureValue).build(); + break; + case FLOAT: + finalValue = ValueProto.Value.newBuilder().setFloatVal((Float) featureValue).build(); + break; + case BYTES: + finalValue = + ValueProto.Value.newBuilder() + .setBytesVal(ByteString.copyFrom(((ByteBuffer) featureValue).array())) + .build(); + break; + case BOOL: + finalValue = ValueProto.Value.newBuilder().setBoolVal((Boolean) featureValue).build(); + break; + case STRING_LIST: + finalValue = + ValueProto.Value.newBuilder() + .setStringListVal( + ValueProto.StringList.newBuilder() + .addAllVal( + ((GenericData.Array) featureValue) + .stream().map(Utf8::toString).collect(Collectors.toList())) + .build()) + .build(); + break; + case INT64_LIST: + finalValue = + ValueProto.Value.newBuilder() + .setInt64ListVal( + ValueProto.Int64List.newBuilder() + .addAllVal(((GenericData.Array) featureValue)) + .build()) + .build(); + break; + case INT32_LIST: + finalValue = + ValueProto.Value.newBuilder() + .setInt32ListVal( + ValueProto.Int32List.newBuilder() + .addAllVal(((GenericData.Array) featureValue)) + .build()) + .build(); + break; + case FLOAT_LIST: + finalValue = + ValueProto.Value.newBuilder() + .setFloatListVal( + ValueProto.FloatList.newBuilder() + .addAllVal(((GenericData.Array) featureValue)) + .build()) + .build(); + break; + case DOUBLE_LIST: + finalValue = + ValueProto.Value.newBuilder() + .setDoubleListVal( + ValueProto.DoubleList.newBuilder() + .addAllVal(((GenericData.Array) featureValue)) + .build()) + .build(); + break; + case BOOL_LIST: + finalValue = + ValueProto.Value.newBuilder() + .setBoolListVal( + ValueProto.BoolList.newBuilder() + .addAllVal(((GenericData.Array) featureValue)) + .build()) + .build(); + break; + case BYTES_LIST: + finalValue = + ValueProto.Value.newBuilder() + .setBytesListVal( + ValueProto.BytesList.newBuilder() + .addAllVal( + ((GenericData.Array) featureValue) + .stream() + .map(byteBuffer -> ByteString.copyFrom(byteBuffer.array())) + .collect(Collectors.toList())) + .build()) + .build(); + break; + default: + throw new RuntimeException("FeatureType is not supported"); + } + } catch (ClassCastException e) { + // Feature type has changed + finalValue = ValueProto.Value.newBuilder().build(); + } + + return finalValue; + } + + @Override + public ServingAPIProto.FeatureReferenceV2 getFeatureReference() { + return this.featureReference; + } + + @Override + public Timestamp getEventTimestamp() { + return this.eventTimestamp; + } +} diff --git a/storage/api/src/main/java/feast/storage/api/retriever/NativeFeature.java b/storage/api/src/main/java/feast/storage/api/retriever/NativeFeature.java deleted file mode 100644 index db421f9..0000000 --- a/storage/api/src/main/java/feast/storage/api/retriever/NativeFeature.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2021 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.storage.api.retriever; - -import com.google.protobuf.ByteString; -import com.google.protobuf.Timestamp; -import feast.proto.serving.ServingAPIProto; -import feast.proto.types.ValueProto; - -public class NativeFeature implements Feature { - private final ServingAPIProto.FeatureReferenceV2 featureReference; - - private final Timestamp eventTimestamp; - - private final Object featureValue; - - public NativeFeature( - ServingAPIProto.FeatureReferenceV2 featureReference, - Timestamp eventTimestamp, - Object featureValue) { - this.featureReference = featureReference; - this.eventTimestamp = eventTimestamp; - this.featureValue = featureValue; - } - - /** - * Casts feature value of Object type based on Feast valueType. Empty object i.e new Object() is - * interpreted as VAL_NOT_SET Feast valueType. - * - * @param valueType Feast valueType of feature as specified in FeatureSpec - * @return ValueProto.Value representation of feature - */ - @Override - public ValueProto.Value getFeatureValue(ValueProto.ValueType.Enum valueType) { - ValueProto.Value finalValue; - - try { - // Add various type cases - switch (valueType) { - case STRING: - finalValue = ValueProto.Value.newBuilder().setStringVal((String) featureValue).build(); - break; - case INT32: - finalValue = ValueProto.Value.newBuilder().setInt32Val((Integer) featureValue).build(); - break; - case INT64: - finalValue = ValueProto.Value.newBuilder().setInt64Val((Long) featureValue).build(); - break; - case DOUBLE: - finalValue = ValueProto.Value.newBuilder().setDoubleVal((Double) featureValue).build(); - break; - case FLOAT: - finalValue = ValueProto.Value.newBuilder().setFloatVal((Float) featureValue).build(); - break; - case BYTES: - finalValue = ValueProto.Value.newBuilder().setBytesVal((ByteString) featureValue).build(); - break; - case BOOL: - finalValue = ValueProto.Value.newBuilder().setBoolVal((Boolean) featureValue).build(); - break; - default: - throw new RuntimeException("FeatureType is not supported"); - } - } catch (ClassCastException e) { - // Feature type has changed - finalValue = ValueProto.Value.newBuilder().build(); - } - - return finalValue; - } - - @Override - public ServingAPIProto.FeatureReferenceV2 getFeatureReference() { - return this.featureReference; - } - - @Override - public Timestamp getEventTimestamp() { - return this.eventTimestamp; - } -} diff --git a/storage/connectors/bigtable/src/main/java/feast/storage/connectors/bigtable/retriever/BigTableOnlineRetriever.java b/storage/connectors/bigtable/src/main/java/feast/storage/connectors/bigtable/retriever/BigTableOnlineRetriever.java index cf82c14..6e67782 100644 --- a/storage/connectors/bigtable/src/main/java/feast/storage/connectors/bigtable/retriever/BigTableOnlineRetriever.java +++ b/storage/connectors/bigtable/src/main/java/feast/storage/connectors/bigtable/retriever/BigTableOnlineRetriever.java @@ -25,8 +25,8 @@ import com.google.protobuf.Timestamp; import feast.proto.serving.ServingAPIProto.FeatureReferenceV2; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2.EntityRow; +import feast.storage.api.retriever.AvroFeature; import feast.storage.api.retriever.Feature; -import feast.storage.api.retriever.NativeFeature; import feast.storage.connectors.sstable.retriever.SSTableOnlineRetriever; import java.io.IOException; import java.util.*; @@ -195,12 +195,12 @@ private List decodeFeatures( return null; } if (featureValue != null) { - return new NativeFeature( + return new AvroFeature( featureReference, Timestamp.newBuilder().setSeconds(timestamp / 1000).build(), featureValue); } - return new NativeFeature( + return new AvroFeature( featureReference, Timestamp.newBuilder().setSeconds(timestamp / 1000).build(), new Object()); diff --git a/storage/connectors/cassandra/src/main/java/feast/storage/connectors/cassandra/retriever/CassandraOnlineRetriever.java b/storage/connectors/cassandra/src/main/java/feast/storage/connectors/cassandra/retriever/CassandraOnlineRetriever.java index 55198e0..efa1119 100644 --- a/storage/connectors/cassandra/src/main/java/feast/storage/connectors/cassandra/retriever/CassandraOnlineRetriever.java +++ b/storage/connectors/cassandra/src/main/java/feast/storage/connectors/cassandra/retriever/CassandraOnlineRetriever.java @@ -24,8 +24,8 @@ import com.google.protobuf.Timestamp; import feast.proto.serving.ServingAPIProto.FeatureReferenceV2; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2.EntityRow; +import feast.storage.api.retriever.AvroFeature; import feast.storage.api.retriever.Feature; -import feast.storage.api.retriever.NativeFeature; import feast.storage.connectors.sstable.retriever.SSTableOnlineRetriever; import java.io.IOException; import java.nio.ByteBuffer; @@ -209,12 +209,12 @@ private List decodeFeatures( return null; } if (featureValue != null) { - return new NativeFeature( + return new AvroFeature( featureReference, Timestamp.newBuilder().setSeconds(timestamp / 1000).build(), featureValue); } - return new NativeFeature( + return new AvroFeature( featureReference, Timestamp.newBuilder().setSeconds(timestamp / 1000).build(), new Object());