diff --git a/serving/src/test/java/feast/serving/it/ServingServiceBigTableIT.java b/serving/src/test/java/feast/serving/it/ServingServiceBigTableIT.java index 0844cbc..423e7c8 100644 --- a/serving/src/test/java/feast/serving/it/ServingServiceBigTableIT.java +++ b/serving/src/test/java/feast/serving/it/ServingServiceBigTableIT.java @@ -158,9 +158,21 @@ static void globalSetup() throws IOException { .build(); TestUtils.applyEntity(coreClient, projectName, driverEntitySpec); + // Apply Entity (this_is_a_long_long_long_long_long_long_entity_id) + String superLongEntityName = "this_is_a_long_long_long_long_long_long_entity_id"; + String superLongEntityDescription = "My super long entity id"; + ValueProto.ValueType.Enum superLongEntityType = ValueProto.ValueType.Enum.INT64; + EntityProto.EntitySpecV2 superLongEntitySpec = + EntityProto.EntitySpecV2.newBuilder() + .setName(superLongEntityName) + .setDescription(superLongEntityDescription) + .setValueType(superLongEntityType) + .build(); + TestUtils.applyEntity(coreClient, projectName, superLongEntitySpec); + // Apply Entity (merchant_id) String merchantEntityName = "merchant_id"; - String merchantEntityDescription = "My driver id"; + String merchantEntityDescription = "My merchant id"; ValueProto.ValueType.Enum merchantEntityType = ValueProto.ValueType.Enum.INT64; EntityProto.EntitySpecV2 merchantEntitySpec = EntityProto.EntitySpecV2.newBuilder() @@ -186,6 +198,27 @@ static void globalSetup() throws IOException { TestUtils.applyFeatureTable( coreClient, projectName, ridesFeatureTableName, ridesEntities, ridesFeatures, 7200); + // Apply FeatureTable (superLong) + String superLongFeatureTableName = "superlong"; + ImmutableList superLongEntities = ImmutableList.of(superLongEntityName); + ImmutableMap superLongFeatures = + ImmutableMap.of( + "trip_cost", + ValueProto.ValueType.Enum.INT64, + "trip_distance", + ValueProto.ValueType.Enum.DOUBLE, + "trip_empty", + ValueProto.ValueType.Enum.DOUBLE, + "trip_wrong_type", + ValueProto.ValueType.Enum.STRING); + TestUtils.applyFeatureTable( + coreClient, + projectName, + superLongFeatureTableName, + superLongEntities, + superLongFeatures, + 7200); + // Apply FeatureTable (rides_merchant) String rideMerchantFeatureTableName = "rides_merchant"; ImmutableList ridesMerchantEntities = @@ -199,6 +232,13 @@ static void globalSetup() throws IOException { 7200); // BigTable Table names + String superLongBtTableName = String.format("%s__%s", projectName, superLongEntityName); + String hashSuffix = + Hashing.murmur3_32().hashBytes(superLongBtTableName.substring(42).getBytes()).toString(); + superLongBtTableName = + superLongBtTableName + .substring(0, Math.min(superLongBtTableName.length(), 42)) + .concat(hashSuffix); String btTableName = String.format("%s__%s", projectName, driverEntityName); String compoundBtTableName = String.format( @@ -237,6 +277,39 @@ static void globalSetup() throws IOException { ingestData( featureTableName, btTableName, entityFeatureKey, entityFeatureValue, schemaKey, ftSchema); + /** SuperLong Entity Ingestion Workflow */ + Schema superLongFtSchema = + SchemaBuilder.record("SuperLongData") + .namespace(superLongFeatureTableName) + .fields() + .requiredLong(feature1Reference.getName()) + .requiredDouble(feature2Reference.getName()) + .nullableString(feature3Reference.getName(), "null") + .requiredString(feature4Reference.getName()) + .endRecord(); + byte[] superLongSchemaReference = + Hashing.murmur3_32().hashBytes(superLongFtSchema.toString().getBytes()).asBytes(); + + GenericRecord superLongRecord = + new GenericRecordBuilder(superLongFtSchema) + .set("trip_cost", 5L) + .set("trip_distance", 3.5) + .set("trip_empty", null) + .set("trip_wrong_type", "test") + .build(); + byte[] superLongEntityFeatureKey = + String.valueOf(DataGenerator.createInt64Value(1).getInt64Val()).getBytes(); + byte[] superLongEntityFeatureValue = + createEntityValue(superLongFtSchema, superLongSchemaReference, superLongRecord); + byte[] superLongSchemaKey = createSchemaKey(superLongSchemaReference); + ingestData( + superLongFeatureTableName, + superLongBtTableName, + superLongEntityFeatureKey, + superLongEntityFeatureValue, + superLongSchemaKey, + superLongFtSchema); + /** Compound Entity Ingestion Workflow */ Schema compoundFtSchema = SchemaBuilder.record("DriverMerchantData") @@ -726,6 +799,62 @@ public void shouldSupportAllFeastTypes() throws IOException { .allMatch(status -> status.equals(GetOnlineFeaturesResponse.FieldStatus.PRESENT)); } + @Test + public void shouldRegisterSuperLongEntityAndGetOnlineFeatures() { + // getOnlineFeatures Information + String projectName = "default"; + String entityName = "this_is_a_long_long_long_long_long_long_entity_id"; + ValueProto.Value entityValue = ValueProto.Value.newBuilder().setInt64Val(1).build(); + + // Instantiate EntityRows + GetOnlineFeaturesRequestV2.EntityRow entityRow1 = + DataGenerator.createEntityRow(entityName, DataGenerator.createInt64Value(1), 100); + ImmutableList entityRows = ImmutableList.of(entityRow1); + + // Instantiate FeatureReferences + FeatureReferenceV2 featureReference = + DataGenerator.createFeatureReference("superlong", "trip_cost"); + FeatureReferenceV2 notFoundFeatureReference = + DataGenerator.createFeatureReference("superlong", "trip_transaction"); + + ImmutableList featureReferences = + ImmutableList.of(featureReference, notFoundFeatureReference); + + // Build GetOnlineFeaturesRequestV2 + GetOnlineFeaturesRequestV2 onlineFeatureRequest = + TestUtils.createOnlineFeatureRequest(projectName, featureReferences, entityRows); + GetOnlineFeaturesResponse featureResponse = + servingStub.getOnlineFeaturesV2(onlineFeatureRequest); + + ImmutableMap expectedValueMap = + ImmutableMap.of( + entityName, + entityValue, + FeatureV2.getFeatureStringRef(featureReference), + DataGenerator.createInt64Value(5), + FeatureV2.getFeatureStringRef(notFoundFeatureReference), + DataGenerator.createEmptyValue()); + + ImmutableMap expectedStatusMap = + ImmutableMap.of( + entityName, + GetOnlineFeaturesResponse.FieldStatus.PRESENT, + FeatureV2.getFeatureStringRef(featureReference), + GetOnlineFeaturesResponse.FieldStatus.PRESENT, + FeatureV2.getFeatureStringRef(notFoundFeatureReference), + GetOnlineFeaturesResponse.FieldStatus.NOT_FOUND); + + GetOnlineFeaturesResponse.FieldValues expectedFieldValues = + GetOnlineFeaturesResponse.FieldValues.newBuilder() + .putAllFields(expectedValueMap) + .putAllStatuses(expectedStatusMap) + .build(); + ImmutableList expectedFieldValuesList = + ImmutableList.of(expectedFieldValues); + + assertEquals(expectedFieldValuesList, featureResponse.getFieldValuesList()); + } + @TestConfiguration public static class TestConfig { @Bean diff --git a/storage/connectors/cassandra/src/main/java/feast/storage/connectors/cassandra/retriever/CassandraOnlineRetriever.java b/storage/connectors/cassandra/src/main/java/feast/storage/connectors/cassandra/retriever/CassandraOnlineRetriever.java index f97a9e0..9b9de7b 100644 --- a/storage/connectors/cassandra/src/main/java/feast/storage/connectors/cassandra/retriever/CassandraOnlineRetriever.java +++ b/storage/connectors/cassandra/src/main/java/feast/storage/connectors/cassandra/retriever/CassandraOnlineRetriever.java @@ -51,6 +51,7 @@ public class CassandraOnlineRetriever implements SSTableOnlineRetriever enti .getBytes()); } + /** + * Generate Cassandra table name, with limit of 48 characters. + * + * @param project Name of Feast project + * @param entityNames List of entities used in retrieval call + * @return Cassandra table name for retrieval + */ + @Override + public String getSSTable(String project, List entityNames) { + String tableName = String.format("%s__%s", project, String.join("__", entityNames)); + return trimAndHash(tableName, MAX_TABLE_NAME_LENGTH); + } + /** * Converts Cassandra rows into @NativeFeature type. * diff --git a/storage/connectors/sstable/src/main/java/feast/storage/connectors/sstable/retriever/SSTableOnlineRetriever.java b/storage/connectors/sstable/src/main/java/feast/storage/connectors/sstable/retriever/SSTableOnlineRetriever.java index 957f0d3..c86923a 100644 --- a/storage/connectors/sstable/src/main/java/feast/storage/connectors/sstable/retriever/SSTableOnlineRetriever.java +++ b/storage/connectors/sstable/src/main/java/feast/storage/connectors/sstable/retriever/SSTableOnlineRetriever.java @@ -16,6 +16,7 @@ */ package feast.storage.connectors.sstable.retriever; +import com.google.common.hash.Hashing; import feast.proto.serving.ServingAPIProto.FeatureReferenceV2; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2.EntityRow; import feast.proto.types.ValueProto; @@ -31,6 +32,8 @@ */ public interface SSTableOnlineRetriever extends OnlineRetrieverV2 { + int MAX_TABLE_NAME_LENGTH = 50; + @Override default List> getOnlineFeatures( String project, @@ -93,7 +96,8 @@ List> convertRowToFeature( * @return Name of Cassandra table */ default String getSSTable(String project, List entityNames) { - return String.format("%s__%s", project, String.join("__", entityNames)); + return trimAndHash( + String.format("%s__%s", project, String.join("__", entityNames)), MAX_TABLE_NAME_LENGTH); } /** @@ -137,4 +141,23 @@ default List getSSTableColumns(List featureReference .distinct() .collect(Collectors.toList()); } + + /** + * Trims long SSTable table names and appends hash suffix for uniqueness. + * + * @param expr Original SSTable table name + * @param maxLength Maximum length allowed for SSTable + * @return Hashed suffix SSTable table name + */ + default String trimAndHash(String expr, int maxLength) { + // Length 8 as derived from murmurhash_32 implementation + int maxPrefixLength = maxLength - 8; + String finalName = expr; + if (expr.length() > maxLength) { + String hashSuffix = + Hashing.murmur3_32().hashBytes(expr.substring(maxPrefixLength).getBytes()).toString(); + finalName = expr.substring(0, Math.min(expr.length(), maxPrefixLength)).concat(hashSuffix); + } + return finalName; + } }