Skip to content

Commit eeaa6db

Browse files
nquinn408nickquinn408franciscojavierarceo
authored
feat: Add typed_features field to grpc write request ((#6117) (#6118)
* feat: add typed_features field to PushRequest and WriteToOnlineStoreRequest (#6117) Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> * Devin feedback Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> * feat: Add typed_features field to grpc write request (#6117) Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> * fix: Revert generated proto files to CI-compatible version Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> * fix: Fix GrpcServer_pb2 typed_features stub and import paths Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> * style: Apply ruff line-length formatting to grpc_server.py Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> * Unwrap compound protobuf Value types in parse_typed Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> * feat: Support distinct count aggregation [#6116] Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> --------- Signed-off-by: Nick Quinn <nicholas_quinn@apple.com> Co-authored-by: Nick Quinn <nicholas_quinn@apple.com> Co-authored-by: Francisco Javier Arceo <arceofrancisco@gmail.com>
1 parent 4844488 commit eeaa6db

File tree

6 files changed

+275
-22
lines changed

6 files changed

+275
-22
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,3 +264,6 @@ Desktop.ini
264264

265265
# AgentReady reports
266266
.agentready/
267+
268+
# Claude Code project settings
269+
.claude/

protos/feast/serving/GrpcServer.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
syntax = "proto3";
22

33
import "feast/serving/ServingService.proto";
4+
import "feast/types/Value.proto";
45

56
option java_package = "feast.proto.serving";
67
option java_outer_classname = "GrpcServerAPIProto";
@@ -11,6 +12,7 @@ message PushRequest {
1112
string stream_feature_view = 2;
1213
bool allow_registry_cache = 3;
1314
string to = 4;
15+
map<string, feast.types.Value> typed_features = 5;
1416
}
1517

1618
message PushResponse {
@@ -21,6 +23,7 @@ message WriteToOnlineStoreRequest {
2123
map<string, string> features = 1;
2224
string feature_view_name = 2;
2325
bool allow_registry_cache = 3;
26+
map<string, feast.types.Value> typed_features = 4;
2427
}
2528

2629
message WriteToOnlineStoreResponse {

sdk/python/feast/infra/contrib/grpc_server.py

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import logging
22
import threading
3+
from collections.abc import Mapping
34
from concurrent import futures
45
from typing import Optional, Union
56

67
import grpc
78
import pandas as pd
8-
from grpc_health.v1 import health, health_pb2_grpc
99

1010
from feast.data_source import PushMode
1111
from feast.errors import FeatureServiceNotFoundException, PushSourceNotFoundException
@@ -34,6 +34,20 @@ def parse(features):
3434
return pd.DataFrame.from_dict(df)
3535

3636

37+
def parse_typed(typed_features):
38+
df = {}
39+
for key, value in typed_features.items():
40+
val_case = value.WhichOneof("val")
41+
if val_case is None or val_case == "null_val":
42+
df[key] = [None]
43+
else:
44+
raw = getattr(value, val_case)
45+
if hasattr(raw, "val"):
46+
raw = dict(raw.val) if isinstance(raw.val, Mapping) else list(raw.val)
47+
df[key] = [raw]
48+
return pd.DataFrame.from_dict(df)
49+
50+
3751
class GrpcFeatureServer(GrpcFeatureServerServicer):
3852
fs: FeatureStore
3953

@@ -49,7 +63,17 @@ def __init__(self, fs: FeatureStore, registry_ttl_sec: int = 5):
4963

5064
def Push(self, request, context):
5165
try:
52-
df = parse(request.features)
66+
if request.features and request.typed_features:
67+
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
68+
context.set_details(
69+
"Only one of features or typed_features may be set, not both"
70+
)
71+
return PushResponse(status=False)
72+
df = (
73+
parse_typed(request.typed_features)
74+
if request.typed_features
75+
else parse(request.features)
76+
)
5377
if request.to == "offline":
5478
to = PushMode.OFFLINE
5579
elif request.to == "online":
@@ -62,7 +86,7 @@ def Push(self, request, context):
6286
f"'online_and_offline']."
6387
)
6488
self.fs.push(
65-
push_source_name=request.push_source_name,
89+
push_source_name=request.stream_feature_view,
6690
df=df,
6791
allow_registry_cache=request.allow_registry_cache,
6892
to=to,
@@ -84,7 +108,17 @@ def WriteToOnlineStore(self, request, context):
84108
"write_to_online_store is deprecated. Please consider using Push instead"
85109
)
86110
try:
87-
df = parse(request.features)
111+
if request.features and request.typed_features:
112+
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
113+
context.set_details(
114+
"Only one of features or typed_features may be set, not both"
115+
)
116+
return WriteToOnlineStoreResponse(status=False)
117+
df = (
118+
parse_typed(request.typed_features)
119+
if request.typed_features
120+
else parse(request.features)
121+
)
88122
self.fs.write_to_online_store(
89123
feature_view_name=request.feature_view_name,
90124
df=df,
@@ -94,7 +128,7 @@ def WriteToOnlineStore(self, request, context):
94128
logger.exception(str(e))
95129
context.set_code(grpc.StatusCode.INTERNAL)
96130
context.set_details(str(e))
97-
return PushResponse(status=False)
131+
return WriteToOnlineStoreResponse(status=False)
98132
return WriteToOnlineStoreResponse(status=True)
99133

100134
def GetOnlineFeatures(self, request: GetOnlineFeaturesRequest, context):
@@ -136,6 +170,8 @@ def get_grpc_server(
136170
max_workers: int,
137171
registry_ttl_sec: int,
138172
):
173+
from grpc_health.v1 import health, health_pb2_grpc
174+
139175
logger.info(f"Initializing gRPC server on {address}")
140176
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
141177
add_GrpcFeatureServerServicer_to_server(

sdk/python/feast/protos/feast/serving/GrpcServer_pb2.py

Lines changed: 24 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/python/feast/protos/feast/serving/GrpcServer_pb2.pyi

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ isort:skip_file
44
"""
55
import builtins
66
import collections.abc
7+
import feast.protos.feast.types.Value_pb2
78
import google.protobuf.descriptor
89
import google.protobuf.internal.containers
910
import google.protobuf.message
@@ -34,24 +35,45 @@ class PushRequest(google.protobuf.message.Message):
3435
) -> None: ...
3536
def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ...
3637

38+
class TypedFeaturesEntry(google.protobuf.message.Message):
39+
DESCRIPTOR: google.protobuf.descriptor.Descriptor
40+
41+
KEY_FIELD_NUMBER: builtins.int
42+
VALUE_FIELD_NUMBER: builtins.int
43+
key: builtins.str
44+
@property
45+
def value(self) -> feast.protos.feast.types.Value_pb2.Value: ...
46+
def __init__(
47+
self,
48+
*,
49+
key: builtins.str = ...,
50+
value: feast.protos.feast.types.Value_pb2.Value | None = ...,
51+
) -> None: ...
52+
def HasField(self, field_name: typing_extensions.Literal["value", b"value"]) -> builtins.bool: ...
53+
def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ...
54+
3755
FEATURES_FIELD_NUMBER: builtins.int
3856
STREAM_FEATURE_VIEW_FIELD_NUMBER: builtins.int
3957
ALLOW_REGISTRY_CACHE_FIELD_NUMBER: builtins.int
4058
TO_FIELD_NUMBER: builtins.int
59+
TYPED_FEATURES_FIELD_NUMBER: builtins.int
4160
@property
4261
def features(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: ...
4362
stream_feature_view: builtins.str
4463
allow_registry_cache: builtins.bool
4564
to: builtins.str
65+
@property
66+
def typed_features(self) -> google.protobuf.internal.containers.MessageMap[builtins.str, feast.protos.feast.types.Value_pb2.Value]: ...
4667
def __init__(
4768
self,
4869
*,
4970
features: collections.abc.Mapping[builtins.str, builtins.str] | None = ...,
5071
stream_feature_view: builtins.str = ...,
5172
allow_registry_cache: builtins.bool = ...,
5273
to: builtins.str = ...,
74+
typed_features: collections.abc.Mapping[builtins.str, feast.protos.feast.types.Value_pb2.Value] | None = ...,
5375
) -> None: ...
54-
def ClearField(self, field_name: typing_extensions.Literal["allow_registry_cache", b"allow_registry_cache", "features", b"features", "stream_feature_view", b"stream_feature_view", "to", b"to"]) -> None: ...
76+
def ClearField(self, field_name: typing_extensions.Literal["allow_registry_cache", b"allow_registry_cache", "features", b"features", "stream_feature_view", b"stream_feature_view", "to", b"to", "typed_features", b"typed_features"]) -> None: ...
5577

5678
global___PushRequest = PushRequest
5779

@@ -87,21 +109,42 @@ class WriteToOnlineStoreRequest(google.protobuf.message.Message):
87109
) -> None: ...
88110
def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ...
89111

112+
class TypedFeaturesEntry(google.protobuf.message.Message):
113+
DESCRIPTOR: google.protobuf.descriptor.Descriptor
114+
115+
KEY_FIELD_NUMBER: builtins.int
116+
VALUE_FIELD_NUMBER: builtins.int
117+
key: builtins.str
118+
@property
119+
def value(self) -> feast.protos.feast.types.Value_pb2.Value: ...
120+
def __init__(
121+
self,
122+
*,
123+
key: builtins.str = ...,
124+
value: feast.protos.feast.types.Value_pb2.Value | None = ...,
125+
) -> None: ...
126+
def HasField(self, field_name: typing_extensions.Literal["value", b"value"]) -> builtins.bool: ...
127+
def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ...
128+
90129
FEATURES_FIELD_NUMBER: builtins.int
91130
FEATURE_VIEW_NAME_FIELD_NUMBER: builtins.int
92131
ALLOW_REGISTRY_CACHE_FIELD_NUMBER: builtins.int
132+
TYPED_FEATURES_FIELD_NUMBER: builtins.int
93133
@property
94134
def features(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: ...
95135
feature_view_name: builtins.str
96136
allow_registry_cache: builtins.bool
137+
@property
138+
def typed_features(self) -> google.protobuf.internal.containers.MessageMap[builtins.str, feast.protos.feast.types.Value_pb2.Value]: ...
97139
def __init__(
98140
self,
99141
*,
100142
features: collections.abc.Mapping[builtins.str, builtins.str] | None = ...,
101143
feature_view_name: builtins.str = ...,
102144
allow_registry_cache: builtins.bool = ...,
145+
typed_features: collections.abc.Mapping[builtins.str, feast.protos.feast.types.Value_pb2.Value] | None = ...,
103146
) -> None: ...
104-
def ClearField(self, field_name: typing_extensions.Literal["allow_registry_cache", b"allow_registry_cache", "feature_view_name", b"feature_view_name", "features", b"features"]) -> None: ...
147+
def ClearField(self, field_name: typing_extensions.Literal["allow_registry_cache", b"allow_registry_cache", "feature_view_name", b"feature_view_name", "features", b"features", "typed_features", b"typed_features"]) -> None: ...
105148

106149
global___WriteToOnlineStoreRequest = WriteToOnlineStoreRequest
107150

0 commit comments

Comments
 (0)