Skip to content

Commit 4c7757a

Browse files
committed
feat: add typed_features field to PushRequest and WriteToOnlineStoreRequest (#6117)
Signed-off-by: Nick Quinn <nicholas_quinn@apple.com>
1 parent 1e5b60f commit 4c7757a

File tree

96 files changed

+7181
-4682
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+7181
-4682
lines changed

protos/feast/serving/GrpcServer.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
syntax = "proto3";
22

33
import "feast/serving/ServingService.proto";
4+
import "feast/types/Value.proto";
45

56
option java_package = "feast.proto.serving";
67
option java_outer_classname = "GrpcServerAPIProto";
@@ -11,6 +12,7 @@ message PushRequest {
1112
string stream_feature_view = 2;
1213
bool allow_registry_cache = 3;
1314
string to = 4;
15+
map<string, feast.types.Value> typed_features = 5;
1416
}
1517

1618
message PushResponse {
@@ -21,6 +23,7 @@ message WriteToOnlineStoreRequest {
2123
map<string, string> features = 1;
2224
string feature_view_name = 2;
2325
bool allow_registry_cache = 3;
26+
map<string, feast.types.Value> typed_features = 4;
2427
}
2528

2629
message WriteToOnlineStoreResponse {

sdk/python/feast/infra/contrib/grpc_server.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ def parse(features):
3434
return pd.DataFrame.from_dict(df)
3535

3636

37+
def parse_typed(typed_features):
38+
df = {}
39+
for key, value in typed_features.items():
40+
val_case = value.WhichOneof("val")
41+
df[key] = [getattr(value, val_case) if val_case is not None else None]
42+
return pd.DataFrame.from_dict(df)
43+
44+
3745
class GrpcFeatureServer(GrpcFeatureServerServicer):
3846
fs: FeatureStore
3947

@@ -49,7 +57,13 @@ def __init__(self, fs: FeatureStore, registry_ttl_sec: int = 5):
4957

5058
def Push(self, request, context):
5159
try:
52-
df = parse(request.features)
60+
if request.features and request.typed_features:
61+
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
62+
context.set_details(
63+
"Only one of features or typed_features may be set, not both"
64+
)
65+
return PushResponse(status=False)
66+
df = parse_typed(request.typed_features) if request.typed_features else parse(request.features)
5367
if request.to == "offline":
5468
to = PushMode.OFFLINE
5569
elif request.to == "online":
@@ -84,7 +98,13 @@ def WriteToOnlineStore(self, request, context):
8498
"write_to_online_store is deprecated. Please consider using Push instead"
8599
)
86100
try:
87-
df = parse(request.features)
101+
if request.features and request.typed_features:
102+
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
103+
context.set_details(
104+
"Only one of features or typed_features may be set, not both"
105+
)
106+
return WriteToOnlineStoreResponse(status=False)
107+
df = parse_typed(request.typed_features) if request.typed_features else parse(request.features)
88108
self.fs.write_to_online_store(
89109
feature_view_name=request.feature_view_name,
90110
df=df,
@@ -94,7 +114,7 @@ def WriteToOnlineStore(self, request, context):
94114
logger.exception(str(e))
95115
context.set_code(grpc.StatusCode.INTERNAL)
96116
context.set_details(str(e))
97-
return PushResponse(status=False)
117+
return WriteToOnlineStoreResponse(status=False)
98118
return WriteToOnlineStoreResponse(status=True)
99119

100120
def GetOnlineFeatures(self, request: GetOnlineFeaturesRequest, context):

sdk/python/feast/protos/feast/core/Aggregation_pb2.py

Lines changed: 15 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/python/feast/protos/feast/core/Aggregation_pb2.pyi

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,44 +2,49 @@
22
@generated by mypy-protobuf. Do not edit manually!
33
isort:skip_file
44
"""
5-
import builtins
6-
import google.protobuf.descriptor
7-
import google.protobuf.duration_pb2
8-
import google.protobuf.message
5+
6+
from google.protobuf import descriptor as _descriptor
7+
from google.protobuf import duration_pb2 as _duration_pb2
8+
from google.protobuf import message as _message
9+
import builtins as _builtins
910
import sys
11+
import typing as _typing
1012

11-
if sys.version_info >= (3, 8):
12-
import typing as typing_extensions
13+
if sys.version_info >= (3, 10):
14+
from typing import TypeAlias as _TypeAlias
1315
else:
14-
import typing_extensions
16+
from typing_extensions import TypeAlias as _TypeAlias
1517

16-
DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
18+
DESCRIPTOR: _descriptor.FileDescriptor
1719

18-
class Aggregation(google.protobuf.message.Message):
19-
DESCRIPTOR: google.protobuf.descriptor.Descriptor
20+
@_typing.final
21+
class Aggregation(_message.Message):
22+
DESCRIPTOR: _descriptor.Descriptor
2023

21-
COLUMN_FIELD_NUMBER: builtins.int
22-
FUNCTION_FIELD_NUMBER: builtins.int
23-
TIME_WINDOW_FIELD_NUMBER: builtins.int
24-
SLIDE_INTERVAL_FIELD_NUMBER: builtins.int
25-
NAME_FIELD_NUMBER: builtins.int
26-
column: builtins.str
27-
function: builtins.str
28-
name: builtins.str
29-
@property
30-
def time_window(self) -> google.protobuf.duration_pb2.Duration: ...
31-
@property
32-
def slide_interval(self) -> google.protobuf.duration_pb2.Duration: ...
24+
COLUMN_FIELD_NUMBER: _builtins.int
25+
FUNCTION_FIELD_NUMBER: _builtins.int
26+
TIME_WINDOW_FIELD_NUMBER: _builtins.int
27+
SLIDE_INTERVAL_FIELD_NUMBER: _builtins.int
28+
NAME_FIELD_NUMBER: _builtins.int
29+
column: _builtins.str
30+
function: _builtins.str
31+
name: _builtins.str
32+
@_builtins.property
33+
def time_window(self) -> _duration_pb2.Duration: ...
34+
@_builtins.property
35+
def slide_interval(self) -> _duration_pb2.Duration: ...
3336
def __init__(
3437
self,
3538
*,
36-
column: builtins.str = ...,
37-
function: builtins.str = ...,
38-
time_window: google.protobuf.duration_pb2.Duration | None = ...,
39-
slide_interval: google.protobuf.duration_pb2.Duration | None = ...,
40-
name: builtins.str = ...,
39+
column: _builtins.str = ...,
40+
function: _builtins.str = ...,
41+
time_window: _duration_pb2.Duration | None = ...,
42+
slide_interval: _duration_pb2.Duration | None = ...,
43+
name: _builtins.str = ...,
4144
) -> None: ...
42-
def HasField(self, field_name: typing_extensions.Literal["slide_interval", b"slide_interval", "time_window", b"time_window"]) -> builtins.bool: ...
43-
def ClearField(self, field_name: typing_extensions.Literal["column", b"column", "function", b"function", "name", b"name", "slide_interval", b"slide_interval", "time_window", b"time_window"]) -> None: ...
45+
_HasFieldArgType: _TypeAlias = _typing.Literal["slide_interval", b"slide_interval", "time_window", b"time_window"] # noqa: Y015
46+
def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ...
47+
_ClearFieldArgType: _TypeAlias = _typing.Literal["column", b"column", "function", b"function", "name", b"name", "slide_interval", b"slide_interval", "time_window", b"time_window"] # noqa: Y015
48+
def ClearField(self, field_name: _ClearFieldArgType) -> None: ...
4449

45-
global___Aggregation = Aggregation
50+
Global___Aggregation: _TypeAlias = Aggregation # noqa: Y015
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,24 @@
11
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
22
"""Client and server classes corresponding to protobuf-defined services."""
33
import grpc
4+
import warnings
45

6+
7+
GRPC_GENERATED_VERSION = '1.78.0'
8+
GRPC_VERSION = grpc.__version__
9+
_version_not_supported = False
10+
11+
try:
12+
from grpc._utilities import first_version_is_lower
13+
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
14+
except ImportError:
15+
_version_not_supported = True
16+
17+
if _version_not_supported:
18+
raise RuntimeError(
19+
f'The grpc package installed is at version {GRPC_VERSION},'
20+
+ ' but the generated code in feast/core/Aggregation_pb2_grpc.py depends on'
21+
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
22+
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
23+
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
24+
)

sdk/python/feast/protos/feast/core/DataFormat_pb2.py

Lines changed: 17 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)