Skip to content

Commit 745a1b4

Browse files
author
Tsotne Tabidze
authored
Local feature server implementation (HTTP endpoint) (#1780)
* Local feature server implementation (HTTP endpoint) Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai> * Update the request protobufs and hack json/protobuf conversion to avoid extra structure (e.g. "int64_val") in json Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai> * Revert update to the service Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>
1 parent 954565e commit 745a1b4

File tree

9 files changed

+427
-5
lines changed

9 files changed

+427
-5
lines changed

protos/feast/serving/ServingService.proto

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,22 @@ message GetOnlineFeaturesRequestV2 {
8181
}
8282
}
8383

84+
// In JSON "val" field can be omitted
85+
message FeatureList {
86+
repeated string val = 1;
87+
}
88+
89+
message GetOnlineFeaturesRequest {
90+
oneof kind {
91+
string feature_service = 1;
92+
FeatureList features = 2;
93+
}
94+
// The entity data is specified in a columnar format
95+
// A map of entity name -> list of values
96+
map<string, feast.types.RepeatedValue> entities = 3;
97+
bool full_feature_names = 4;
98+
}
99+
84100
message GetOnlineFeaturesResponse {
85101
// Feature values retrieved from feast.
86102
repeated FieldValues field_values = 1;

protos/feast/types/Value.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,14 @@ message ValueType {
4141
FLOAT_LIST = 16;
4242
BOOL_LIST = 17;
4343
UNIX_TIMESTAMP_LIST = 18;
44+
NULL = 19;
4445
}
4546
}
4647

4748
message Value {
4849
// ValueType is referenced by the metadata types, FeatureInfo and EntityInfo.
4950
// The enum values do not have to match the oneof val field ids, but they should.
51+
// In JSON "*_val" field can be omitted
5052
oneof val {
5153
bytes bytes_val = 1;
5254
string string_val = 2;
@@ -64,9 +66,14 @@ message Value {
6466
FloatList float_list_val = 16;
6567
BoolList bool_list_val = 17;
6668
Int64List unix_timestamp_list_val = 18;
69+
Null null_val = 19;
6770
}
6871
}
6972

73+
enum Null {
74+
NULL = 0;
75+
}
76+
7077
message BytesList {
7178
repeated bytes val = 1;
7279
}
@@ -94,3 +101,9 @@ message FloatList {
94101
message BoolList {
95102
repeated bool val = 1;
96103
}
104+
105+
// This is to avoid an issue of being unable to specify `repeated value` in oneofs or maps
106+
// In JSON "val" field can be omitted
107+
message RepeatedValue {
108+
repeated Value val = 1;
109+
}

sdk/python/feast/cli.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,5 +357,19 @@ def init_command(project_directory, minimal: bool, template: str):
357357
init_repo(project_directory, template)
358358

359359

360+
@cli.command("serve")
361+
@click.option(
362+
"--port", "-p", type=click.INT, default=6566, help="Specify a port for the server"
363+
)
364+
@click.pass_context
365+
def serve_command(ctx: click.Context, port: int):
366+
"""Start a the feature consumption server locally on a given port."""
367+
repo = ctx.obj["CHDIR"]
368+
cli_check_repo(repo)
369+
store = FeatureStore(repo_path=str(repo))
370+
371+
store.serve(port)
372+
373+
360374
if __name__ == "__main__":
361375
cli()

sdk/python/feast/feature_server.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import uvicorn
2+
from fastapi import FastAPI, HTTPException, Request
3+
from fastapi.logger import logger
4+
from google.protobuf.json_format import MessageToDict, Parse
5+
6+
import feast
7+
from feast import proto_json
8+
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest
9+
10+
11+
def get_app(store: "feast.FeatureStore"):
12+
proto_json.patch()
13+
14+
app = FastAPI()
15+
16+
@app.get("/get-online-features/")
17+
async def get_online_features(request: Request):
18+
try:
19+
# Validate and parse the request data into GetOnlineFeaturesRequest Protobuf object
20+
body = await request.body()
21+
request_proto = GetOnlineFeaturesRequest()
22+
Parse(body, request_proto)
23+
24+
# Initialize parameters for FeatureStore.get_online_features(...) call
25+
if request_proto.HasField("feature_service"):
26+
features = store.get_feature_service(request_proto.feature_service)
27+
else:
28+
features = list(request_proto.features.val)
29+
30+
full_feature_names = request_proto.full_feature_names
31+
32+
batch_sizes = [len(v.val) for v in request_proto.entities.values()]
33+
num_entities = batch_sizes[0]
34+
if any(batch_size != num_entities for batch_size in batch_sizes):
35+
raise HTTPException(status_code=500, detail="Uneven number of columns")
36+
37+
entity_rows = [
38+
{k: v.val[idx] for k, v in request_proto.entities.items()}
39+
for idx in range(num_entities)
40+
]
41+
42+
response_proto = store.get_online_features(
43+
features, entity_rows, full_feature_names=full_feature_names
44+
).proto
45+
46+
# Convert the Protobuf object to JSON and return it
47+
return MessageToDict(response_proto, preserving_proto_field_name=True)
48+
except Exception as e:
49+
# Print the original exception on the server side
50+
logger.exception(e)
51+
# Raise HTTPException to return the error message to the client
52+
raise HTTPException(status_code=500, detail=str(e))
53+
54+
return app
55+
56+
57+
def start_server(store: "feast.FeatureStore", port: int):
58+
app = get_app(store)
59+
uvicorn.run(app, host="127.0.0.1", port=port)

sdk/python/feast/feature_store.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from colorama import Fore, Style
2323
from tqdm import tqdm
2424

25-
from feast import utils
25+
from feast import feature_server, utils
2626
from feast.entity import Entity
2727
from feast.errors import (
2828
EntityNotFoundException,
@@ -761,6 +761,11 @@ def get_online_features(
761761

762762
return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows))
763763

764+
@log_exceptions_and_usage
765+
def serve(self, port: int) -> None:
766+
"""Start the feature consumption server locally on a given port."""
767+
feature_server.start_server(self, port)
768+
764769

765770
def _entity_row_to_key(row: GetOnlineFeaturesRequestV2.EntityRow) -> EntityKeyProto:
766771
names, values = zip(*row.fields.items())

sdk/python/feast/proto_json.py

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
import uuid
2+
from typing import Any, Callable, Type
3+
4+
from google.protobuf.json_format import ( # type: ignore
5+
_WKTJSONMETHODS,
6+
ParseError,
7+
_Parser,
8+
_Printer,
9+
)
10+
11+
from feast.protos.feast.serving.ServingService_pb2 import FeatureList
12+
from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value
13+
14+
ProtoMessage = Any
15+
JsonObject = Any
16+
17+
18+
def _patch_proto_json_encoding(
19+
proto_type: Type[ProtoMessage],
20+
to_json_object: Callable[[_Printer, ProtoMessage], JsonObject],
21+
from_json_object: Callable[[_Parser, JsonObject, ProtoMessage], None],
22+
) -> None:
23+
"""Patch Protobuf JSON Encoder / Decoder for a desired Protobuf type with to_json & from_json methods."""
24+
to_json_fn_name = "_" + uuid.uuid4().hex
25+
from_json_fn_name = "_" + uuid.uuid4().hex
26+
setattr(_Printer, to_json_fn_name, to_json_object)
27+
setattr(_Parser, from_json_fn_name, from_json_object)
28+
_WKTJSONMETHODS[proto_type.DESCRIPTOR.full_name] = [
29+
to_json_fn_name,
30+
from_json_fn_name,
31+
]
32+
33+
34+
def _patch_feast_value_json_encoding():
35+
"""Patch Protobuf JSON Encoder / Decoder with a Feast Value type.
36+
37+
This allows encoding the proto object as a native type, without the dummy structural wrapper.
38+
39+
Here's a before example:
40+
41+
{
42+
"value_1": {
43+
"int64_val": 1
44+
},
45+
"value_2": {
46+
"double_list_val": [1.0, 2.0, 3.0]
47+
},
48+
}
49+
50+
And here's an after example:
51+
52+
{
53+
"value_1": 1,
54+
"value_2": [1.0, 2.0, 3.0]
55+
}
56+
"""
57+
58+
def to_json_object(printer: _Printer, message: ProtoMessage) -> JsonObject:
59+
which = message.WhichOneof("val")
60+
# If the Value message is not set treat as null_value when serialize
61+
# to JSON. The parse back result will be different from original message.
62+
if which is None or which == "null_val":
63+
return None
64+
elif "_list_" in which:
65+
value = list(getattr(message, which).val)
66+
else:
67+
value = getattr(message, which)
68+
return value
69+
70+
def from_json_object(
71+
parser: _Parser, value: JsonObject, message: ProtoMessage
72+
) -> None:
73+
if value is None:
74+
message.null_val = 0
75+
elif isinstance(value, bool):
76+
message.bool_val = value
77+
elif isinstance(value, str):
78+
message.string_val = value
79+
elif isinstance(value, int):
80+
message.int64_val = value
81+
elif isinstance(value, float):
82+
message.double_val = value
83+
elif isinstance(value, list):
84+
if len(value) == 0:
85+
# Clear will mark the struct as modified so it will be created even if there are no values
86+
message.int64_list_val.Clear()
87+
elif isinstance(value[0], bool):
88+
message.bool_list_val.val.extend(value)
89+
elif isinstance(value[0], str):
90+
message.string_list_val.val.extend(value)
91+
elif isinstance(value[0], (float, int, type(None))):
92+
# Identify array as ints if all of the elements are ints
93+
if all(isinstance(item, int) for item in value):
94+
message.int64_list_val.val.extend(value)
95+
# If any of the elements are floats or nulls, then parse it as a float array
96+
else:
97+
# Convert each null as NaN.
98+
message.double_list_val.val.extend(
99+
[item if item is not None else float("nan") for item in value]
100+
)
101+
else:
102+
raise ParseError(
103+
"Value {0} has unexpected type {1}.".format(
104+
value[0], type(value[0])
105+
)
106+
)
107+
else:
108+
raise ParseError(
109+
"Value {0} has unexpected type {1}.".format(value, type(value))
110+
)
111+
112+
_patch_proto_json_encoding(Value, to_json_object, from_json_object)
113+
114+
115+
def _patch_feast_repeated_value_json_encoding():
116+
"""Patch Protobuf JSON Encoder / Decoder with a Feast RepeatedValue type.
117+
118+
This allows list of lists without dummy field name "val".
119+
120+
Here's a before example:
121+
122+
{
123+
"repeated_value": [
124+
{"val": [1,2,3]},
125+
{"val": [4,5,6]}
126+
]
127+
}
128+
129+
And here's an after example:
130+
131+
{
132+
"repeated_value": [
133+
[1,2,3],
134+
[4,5,6]
135+
]
136+
}
137+
"""
138+
139+
def to_json_object(printer: _Printer, message: ProtoMessage) -> JsonObject:
140+
return [printer._MessageToJsonObject(item) for item in message.val]
141+
142+
def from_json_object(
143+
parser: _Parser, value: JsonObject, message: ProtoMessage
144+
) -> None:
145+
array = value if isinstance(value, list) else value["val"]
146+
for item in array:
147+
parser.ConvertMessage(item, message.val.add())
148+
149+
_patch_proto_json_encoding(RepeatedValue, to_json_object, from_json_object)
150+
151+
152+
def _patch_feast_feature_list_json_encoding():
153+
"""Patch Protobuf JSON Encoder / Decoder with a Feast FeatureList type.
154+
155+
This allows list of lists without dummy field name "features".
156+
157+
Here's a before example:
158+
159+
{
160+
"feature_list": {
161+
"features": [
162+
"feature-1",
163+
"feature-2",
164+
"feature-3"
165+
]
166+
}
167+
}
168+
169+
And here's an after example:
170+
171+
{
172+
"feature_list": [
173+
"feature-1",
174+
"feature-2",
175+
"feature-3"
176+
]
177+
}
178+
"""
179+
180+
def to_json_object(printer: _Printer, message: ProtoMessage) -> JsonObject:
181+
return list(message.val)
182+
183+
def from_json_object(
184+
parser: _Parser, value: JsonObject, message: ProtoMessage
185+
) -> None:
186+
array = value if isinstance(value, list) else value["val"]
187+
message.val.extend(array)
188+
189+
_patch_proto_json_encoding(FeatureList, to_json_object, from_json_object)
190+
191+
192+
def patch():
193+
"""Patch Protobuf JSON Encoder / Decoder with all desired Feast types."""
194+
_patch_feast_value_json_encoding()
195+
_patch_feast_repeated_value_json_encoding()
196+
_patch_feast_feature_list_json_encoding()

sdk/python/feast/type_map.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,9 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
204204
return ProtoValue(
205205
int32_list_val=Int32List(
206206
val=[
207-
item if type(item) is np.int32 else _type_err(item, np.int32)
207+
item
208+
if type(item) in [np.int32, int]
209+
else _type_err(item, np.int32)
208210
for item in value
209211
]
210212
)
@@ -215,7 +217,7 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
215217
int64_list_val=Int64List(
216218
val=[
217219
item
218-
if type(item) in [np.int64, np.int32]
220+
if type(item) in [np.int64, np.int32, int]
219221
else _type_err(item, np.int64)
220222
for item in value
221223
]
@@ -227,7 +229,7 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
227229
int64_list_val=Int64List(
228230
val=[
229231
item
230-
if type(item) in [np.int64, np.int32]
232+
if type(item) in [np.int64, np.int32, int]
231233
else _type_err(item, np.int64)
232234
for item in value
233235
]
@@ -283,7 +285,7 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
283285
elif feast_value_type == ValueType.FLOAT:
284286
return ProtoValue(float_val=float(value))
285287
elif feast_value_type == ValueType.DOUBLE:
286-
assert type(value) is float or np.float64
288+
assert type(value) in [float, np.float64]
287289
return ProtoValue(double_val=value)
288290
elif feast_value_type == ValueType.STRING:
289291
return ProtoValue(string_val=str(value))

0 commit comments

Comments
 (0)