Skip to content

Commit e2bbbfd

Browse files
authored
fix redis key serialization (feast-dev#2264)
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
1 parent 7c53177 commit e2bbbfd

File tree

5 files changed

+77
-52
lines changed

5 files changed

+77
-52
lines changed

java/serving/src/test/java/feast/serving/it/ServingBaseTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,5 +157,28 @@ public void shouldRefreshRegistryAndServeNewFeatures() throws InterruptedExcepti
157157
equalTo(3));
158158
}
159159

160+
/** https://github.com/feast-dev/feast/issues/2253 */
161+
@Test
162+
public void shouldGetOnlineFeaturesWithStringEntity() {
163+
Map<String, ValueProto.RepeatedValue> entityRows =
164+
ImmutableMap.of(
165+
"entity",
166+
ValueProto.RepeatedValue.newBuilder()
167+
.addVal(DataGenerator.createStrValue("key-1"))
168+
.build());
169+
170+
ImmutableList<String> featureReferences =
171+
ImmutableList.of("feature_view_0:feature_0", "feature_view_0:feature_1");
172+
173+
ServingAPIProto.GetOnlineFeaturesRequest req =
174+
TestUtils.createOnlineFeatureRequest(featureReferences, entityRows);
175+
176+
ServingAPIProto.GetOnlineFeaturesResponse resp = servingStub.getOnlineFeatures(req);
177+
178+
for (final int featureIdx : List.of(0, 1)) {
179+
assertEquals(FieldStatus.PRESENT, resp.getResults(featureIdx).getStatuses(0));
180+
}
181+
}
182+
160183
abstract void updateRegistryFile(RegistryProto.Registry registry);
161184
}

java/serving/src/test/java/feast/serving/it/ServingBenchmarkIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ protected ServingAPIProto.GetOnlineFeaturesRequest buildOnlineRequest(
5151
int rowsCount, int featuresCount) {
5252
List<ValueProto.Value> entities =
5353
IntStream.range(0, rowsCount)
54-
.mapToObj(i -> DataGenerator.createInt64Value(rand.nextInt(1000)))
54+
.mapToObj(
55+
i -> DataGenerator.createStrValue(String.format("key-%s", rand.nextInt(1000))))
5556
.collect(Collectors.toList());
5657

5758
List<String> featureReferences =

java/serving/src/test/resources/docker-compose/feast10/definitions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def transformed_conv_rate(features_df: pd.DataFrame) -> pd.DataFrame:
7373

7474
entity = Entity(
7575
name="entity",
76-
value_type=ValueType.INT64,
76+
value_type=ValueType.STRING,
7777
)
7878

7979
benchmark_feature_views = [

java/serving/src/test/resources/docker-compose/feast10/materialize.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,26 @@
2828
# for more info.
2929
df.to_parquet("driver_stats.parquet")
3030

31+
3132
# For Benchmarks
3233
# Please read more in Feast RFC-031
3334
# (link https://docs.google.com/document/d/12UuvTQnTTCJhdRgy6h10zSbInNGSyEJkIxpOcgOen1I/edit)
3435
# about this benchmark setup
35-
def generate_data(num_rows: int, num_features: int, key_space: int, destination: str) -> pd.DataFrame:
36+
def generate_data(num_rows: int, num_features: int, destination: str) -> pd.DataFrame:
3637
features = [f"feature_{i}" for i in range(num_features)]
3738
columns = ["entity", "event_timestamp"] + features
3839
df = pd.DataFrame(0, index=np.arange(num_rows), columns=columns)
3940
df["event_timestamp"] = datetime.utcnow()
40-
for column in ["entity"] + features:
41-
df[column] = np.random.randint(1, key_space, num_rows)
41+
for column in features:
42+
df[column] = np.random.randint(1, num_rows, num_rows)
43+
44+
df["entity"] = "key-" + \
45+
pd.Series(np.arange(1, num_rows + 1)).astype(pd.StringDtype())
4246

4347
df.to_parquet(destination)
4448

4549

46-
generate_data(10**3, 250, 10**3, "benchmark_data.parquet")
50+
generate_data(10**3, 250, "benchmark_data.parquet")
4751

4852

4953
fs = FeatureStore(".")

java/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java

Lines changed: 43 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,14 @@
1616
*/
1717
package feast.storage.connectors.redis.retriever;
1818

19-
import com.google.common.primitives.UnsignedBytes;
2019
import com.google.protobuf.ProtocolStringList;
2120
import feast.proto.storage.RedisProto;
2221
import feast.proto.types.ValueProto;
2322
import java.nio.ByteBuffer;
2423
import java.nio.ByteOrder;
2524
import java.nio.charset.StandardCharsets;
26-
import java.util.ArrayList;
27-
import java.util.Comparator;
28-
import java.util.List;
25+
import java.util.*;
26+
import org.apache.commons.lang3.ArrayUtils;
2927
import org.apache.commons.lang3.tuple.Pair;
3028

3129
// This is derived from
@@ -48,70 +46,52 @@ public byte[] serialize(RedisProto.RedisKeyV2 entityKey) {
4846
}
4947
tuples.sort(Comparator.comparing(Pair::getLeft));
5048

51-
ByteBuffer stringBytes = ByteBuffer.allocate(Integer.BYTES);
52-
stringBytes.order(ByteOrder.LITTLE_ENDIAN);
53-
stringBytes.putInt(ValueProto.ValueType.Enum.STRING.getNumber());
54-
5549
for (Pair<String, ValueProto.Value> pair : tuples) {
56-
for (final byte b : stringBytes.array()) {
57-
buffer.add(b);
58-
}
59-
for (final byte b : pair.getLeft().getBytes(StandardCharsets.UTF_8)) {
60-
buffer.add(b);
61-
}
50+
buffer.addAll(encodeInteger(ValueProto.ValueType.Enum.STRING.getNumber()));
51+
buffer.addAll(encodeString(pair.getLeft()));
6252
}
6353

6454
for (Pair<String, ValueProto.Value> pair : tuples) {
6555
final ValueProto.Value val = pair.getRight();
6656
switch (val.getValCase()) {
6757
case STRING_VAL:
68-
buffer.add(UnsignedBytes.checkedCast(ValueProto.ValueType.Enum.STRING.getNumber()));
69-
buffer.add(
70-
UnsignedBytes.checkedCast(
71-
val.getStringVal().getBytes(StandardCharsets.UTF_8).length));
72-
for (final byte b : val.getStringVal().getBytes(StandardCharsets.UTF_8)) {
73-
buffer.add(b);
74-
}
58+
String stringVal = val.getStringVal();
59+
60+
buffer.addAll(encodeInteger(ValueProto.ValueType.Enum.STRING.getNumber()));
61+
buffer.addAll(encodeInteger(stringVal.length()));
62+
buffer.addAll(encodeString(stringVal));
63+
7564
break;
7665
case BYTES_VAL:
77-
buffer.add(UnsignedBytes.checkedCast(ValueProto.ValueType.Enum.BYTES.getNumber()));
78-
for (final byte b : val.getBytesVal().toByteArray()) {
79-
buffer.add(b);
80-
}
66+
byte[] bytes = val.getBytesVal().toByteArray();
67+
68+
buffer.addAll(encodeInteger(ValueProto.ValueType.Enum.BYTES.getNumber()));
69+
buffer.addAll(encodeInteger(bytes.length));
70+
buffer.addAll(encodeBytes(bytes));
71+
8172
break;
8273
case INT32_VAL:
83-
ByteBuffer int32ByteBuffer =
84-
ByteBuffer.allocate(Integer.BYTES + Integer.BYTES + Integer.BYTES);
85-
int32ByteBuffer.order(ByteOrder.LITTLE_ENDIAN);
86-
int32ByteBuffer.putInt(ValueProto.ValueType.Enum.INT32.getNumber());
87-
int32ByteBuffer.putInt(Integer.BYTES);
88-
int32ByteBuffer.putInt(val.getInt32Val());
89-
for (final byte b : int32ByteBuffer.array()) {
90-
buffer.add(b);
91-
}
74+
buffer.addAll(encodeInteger(ValueProto.ValueType.Enum.INT32.getNumber()));
75+
buffer.addAll(encodeInteger(Integer.BYTES));
76+
buffer.addAll(encodeInteger(val.getInt32Val()));
77+
9278
break;
9379
case INT64_VAL:
94-
ByteBuffer int64ByteBuffer =
95-
ByteBuffer.allocate(Integer.BYTES + Integer.BYTES + Integer.BYTES);
96-
int64ByteBuffer.order(ByteOrder.LITTLE_ENDIAN);
97-
int64ByteBuffer.putInt(ValueProto.ValueType.Enum.INT64.getNumber());
98-
int64ByteBuffer.putInt(Integer.BYTES);
80+
buffer.addAll(encodeInteger(ValueProto.ValueType.Enum.INT64.getNumber()));
81+
buffer.addAll(encodeInteger(Integer.BYTES));
9982
/* This is super dumb - but in https://github.com/feast-dev/feast/blob/dcae1606f53028ce5413567fb8b66f92cfef0f8e/sdk/python/feast/infra/key_encoding_utils.py#L9
10083
we use `struct.pack("<l", v.int64_val)` to get the bytes of an int64 val. This actually extracts only 4 bytes,
10184
instead of 8 bytes as you'd expect from to serialize an int64 value.
10285
*/
103-
int64ByteBuffer.putInt(Long.valueOf(val.getInt64Val()).intValue());
104-
for (final byte b : int64ByteBuffer.array()) {
105-
buffer.add(b);
106-
}
86+
buffer.addAll(encodeInteger(((Long) val.getInt64Val()).intValue()));
87+
10788
break;
10889
default:
10990
throw new RuntimeException("Unable to serialize Entity Key");
11091
}
11192
}
112-
for (final byte b : entityKey.getProject().getBytes(StandardCharsets.UTF_8)) {
113-
buffer.add(b);
114-
}
93+
94+
buffer.addAll(encodeString(entityKey.getProject()));
11595

11696
final byte[] bytes = new byte[buffer.size()];
11797
for (int i = 0; i < buffer.size(); i++) {
@@ -120,4 +100,21 @@ public byte[] serialize(RedisProto.RedisKeyV2 entityKey) {
120100

121101
return bytes;
122102
}
103+
104+
private List<Byte> encodeBytes(byte[] toByteArray) {
105+
return Arrays.asList(ArrayUtils.toObject(toByteArray));
106+
}
107+
108+
private List<Byte> encodeInteger(Integer value) {
109+
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
110+
buffer.order(ByteOrder.LITTLE_ENDIAN);
111+
buffer.putInt(value);
112+
113+
return Arrays.asList(ArrayUtils.toObject(buffer.array()));
114+
}
115+
116+
private List<Byte> encodeString(String value) {
117+
byte[] stringBytes = value.getBytes(StandardCharsets.UTF_8);
118+
return encodeBytes(stringBytes);
119+
}
123120
}

0 commit comments

Comments
 (0)