Skip to content

Commit 69a5a24

Browse files
committed
Add Feast Client, FeatureSets, Types to Python SDK
1 parent 2ca31fb commit 69a5a24

7 files changed

Lines changed: 378 additions & 219 deletions

File tree

sdk/python/feast/client.py

Lines changed: 83 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,59 +14,96 @@
1414

1515

1616
import grpc
17-
from feast.constants import GRPC_CONNECTION_TIMEOUT
17+
1818
from feast.core.CoreService_pb2_grpc import CoreServiceStub
19-
from feast.core.CoreService_pb2 import GetFeastCoreVersionRequest
19+
from feast.core.CoreService_pb2 import GetFeatureSetsResponse
20+
from feast.feature_set import FeatureSet, Entity
2021
from feast.serving.ServingService_pb2_grpc import ServingServiceStub
21-
from feast.serving.ServingService_pb2 import GetFeastServingVersionRequest
22+
from google.protobuf import empty_pb2 as Empty
23+
from typing import List
24+
from collections import OrderedDict
25+
from typing import Dict
26+
import os
27+
28+
GRPC_CONNECTION_TIMEOUT = 5 # type: int
29+
FEAST_SERVING_URL_ENV_KEY = "FEAST_SERVING_URL" # type: str
30+
FEAST_CORE_URL_ENV_KEY = "FEAST_CORE_URL" # type: str
2231

2332

2433
class Client:
2534
def __init__(self, core_url: str, serving_url: str, verbose: bool = False):
35+
self._feature_sets = [] # type: List[FeatureSet]
2636
self._core_url = core_url
2737
self._serving_url = serving_url
2838
self._verbose = verbose
2939
self.__core_channel: grpc.Channel = None
3040
self.__serving_channel: grpc.Channel = None
3141
self._core_service_stub: CoreServiceStub = None
3242
self._serving_service_stub: ServingServiceStub = None
33-
34-
def info(self):
35-
pass
36-
37-
def apply(self):
38-
pass
43+
self._is_connected = False
44+
45+
@property
46+
def core_url(self) -> str:
47+
if self._core_url is not None:
48+
return self._core_url
49+
if os.getenv(FEAST_CORE_URL_ENV_KEY) is not None:
50+
return os.getenv(FEAST_CORE_URL_ENV_KEY)
51+
return ""
52+
53+
@core_url.setter
54+
def core_url(self, value: str):
55+
self._core_url = value
56+
57+
@property
58+
def serving_url(self) -> str:
59+
if self._serving_url is not None:
60+
return self._serving_url
61+
if os.getenv(FEAST_SERVING_URL_ENV_KEY) is not None:
62+
return os.getenv(FEAST_SERVING_URL_ENV_KEY)
63+
return ""
64+
65+
@serving_url.setter
66+
def serving_url(self, value: str):
67+
self._serving_url = value
68+
69+
def _connect_all(self):
70+
if not self.core_url and not self.serving_url:
71+
raise ValueError("Please set Core and Serving URL.")
72+
73+
if not self._is_connected:
74+
self._connect_core()
75+
self._connect_serving()
76+
self._is_connected = True
3977

4078
def version(self):
41-
self._connect_core()
42-
self._connect_serving()
79+
self._connect_all()
4380

4481
try:
4582
core_version = self._core_service_stub.GetFeastCoreVersion(
46-
GetFeastCoreVersionRequest(), timeout=GRPC_CONNECTION_TIMEOUT
83+
Empty, timeout=GRPC_CONNECTION_TIMEOUT
4784
).version
4885
except grpc.FutureCancelledError:
4986
core_version = "not connected"
5087

5188
try:
5289
serving_version = self._serving_service_stub.GetFeastServingVersion(
53-
GetFeastServingVersionRequest(), timeout=GRPC_CONNECTION_TIMEOUT
90+
Empty, timeout=GRPC_CONNECTION_TIMEOUT
5491
).version
5592
except grpc.FutureCancelledError:
5693
serving_version = "not connected"
5794

5895
return {
59-
"core": {"url": self._core_url, "version": core_version},
60-
"serving": {"url": self._serving_url, "version": serving_version},
96+
"core": {"url": self.core_url, "version": core_version},
97+
"serving": {"url": self.serving_url, "version": serving_version},
6198
}
6299

63100
def _connect_core(self):
64101
"""
65-
Connect to core api
102+
Connect to Core API
66103
"""
67104

68105
if self.__core_channel is None:
69-
self.__core_channel = grpc.insecure_channel(self._core_url)
106+
self.__core_channel = grpc.insecure_channel(self.core_url)
70107

71108
try:
72109
grpc.channel_ready_future(self.__core_channel).result(
@@ -75,18 +112,18 @@ def _connect_core(self):
75112
except grpc.FutureTimeoutError:
76113
raise ConnectionError(
77114
"connection timed out while attempting to connect to Feast Core gRPC server "
78-
+ self._core_url
115+
+ self.core_url
79116
)
80117
else:
81118
self._core_service_stub = CoreServiceStub(self.__core_channel)
82119

83120
def _connect_serving(self):
84121
"""
85-
Connect to serving api
122+
Connect to Serving API
86123
"""
87124

88125
if self.__serving_channel is None:
89-
self.__serving_channel = grpc.insecure_channel(self._serving_url)
126+
self.__serving_channel = grpc.insecure_channel(self.serving_url)
90127

91128
try:
92129
grpc.channel_ready_future(self.__serving_channel).result(
@@ -95,7 +132,32 @@ def _connect_serving(self):
95132
except grpc.FutureTimeoutError:
96133
raise ConnectionError(
97134
"connection timed out while attempting to connect to Feast Serving gRPC server "
98-
+ self._serving_url
135+
+ self.serving_url
99136
)
100137
else:
101138
self._serving_service_stub = ServingServiceStub(self.__serving_channel)
139+
140+
def apply(self, resource):
141+
if isinstance(resource, FeatureSet):
142+
self._apply_feature_set(resource)
143+
144+
def refresh(self):
145+
self._connect_all()
146+
fs = self._core_service_stub.GetFeatureSets() # type: GetFeatureSetsResponse
147+
self._feature_sets = fs.featureSets
148+
149+
@property
150+
def feature_sets(self) -> List[FeatureSet]:
151+
return self._feature_sets
152+
153+
@property
154+
def entities(self) -> Dict[str, Entity]:
155+
entities_dict = OrderedDict()
156+
for fs in self._feature_sets:
157+
for entityName, entity in fs.entities:
158+
entities_dict[entityName] = entity
159+
return entities_dict
160+
161+
def _apply_feature_set(self, resource):
162+
resource._client = self
163+
# TODO: Apply Feature Set to Feast Core

sdk/python/feast/core/FeatureSet_pb2.py

Lines changed: 9 additions & 105 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)