1414
1515
1616import grpc
17- from feast . constants import GRPC_CONNECTION_TIMEOUT
17+
1818from feast .core .CoreService_pb2_grpc import CoreServiceStub
19- from feast .core .CoreService_pb2 import GetFeastCoreVersionRequest
19+ from feast .core .CoreService_pb2 import GetFeatureSetsResponse
20+ from feast .feature_set import FeatureSet , Entity
2021from feast .serving .ServingService_pb2_grpc import ServingServiceStub
21- from feast .serving .ServingService_pb2 import GetFeastServingVersionRequest
22+ from google .protobuf import empty_pb2 as Empty
23+ from typing import List
24+ from collections import OrderedDict
25+ from typing import Dict
26+ import os
27+
28+ GRPC_CONNECTION_TIMEOUT = 5 # type: int
29+ FEAST_SERVING_URL_ENV_KEY = "FEAST_SERVING_URL" # type: str
30+ FEAST_CORE_URL_ENV_KEY = "FEAST_CORE_URL" # type: str
2231
2332
2433class Client :
2534 def __init__ (self , core_url : str , serving_url : str , verbose : bool = False ):
35+ self ._feature_sets = [] # type: List[FeatureSet]
2636 self ._core_url = core_url
2737 self ._serving_url = serving_url
2838 self ._verbose = verbose
2939 self .__core_channel : grpc .Channel = None
3040 self .__serving_channel : grpc .Channel = None
3141 self ._core_service_stub : CoreServiceStub = None
3242 self ._serving_service_stub : ServingServiceStub = None
33-
34- def info (self ):
35- pass
36-
37- def apply (self ):
38- pass
43+ self ._is_connected = False
44+
45+ @property
46+ def core_url (self ) -> str :
47+ if self ._core_url is not None :
48+ return self ._core_url
49+ if os .getenv (FEAST_CORE_URL_ENV_KEY ) is not None :
50+ return os .getenv (FEAST_CORE_URL_ENV_KEY )
51+ return ""
52+
53+ @core_url .setter
54+ def core_url (self , value : str ):
55+ self ._core_url = value
56+
57+ @property
58+ def serving_url (self ) -> str :
59+ if self ._serving_url is not None :
60+ return self ._serving_url
61+ if os .getenv (FEAST_SERVING_URL_ENV_KEY ) is not None :
62+ return os .getenv (FEAST_SERVING_URL_ENV_KEY )
63+ return ""
64+
65+ @serving_url .setter
66+ def serving_url (self , value : str ):
67+ self ._serving_url = value
68+
69+ def _connect_all (self ):
70+ if not self .core_url and not self .serving_url :
71+ raise ValueError ("Please set Core and Serving URL." )
72+
73+ if not self ._is_connected :
74+ self ._connect_core ()
75+ self ._connect_serving ()
76+ self ._is_connected = True
3977
4078 def version (self ):
41- self ._connect_core ()
42- self ._connect_serving ()
79+ self ._connect_all ()
4380
4481 try :
4582 core_version = self ._core_service_stub .GetFeastCoreVersion (
46- GetFeastCoreVersionRequest () , timeout = GRPC_CONNECTION_TIMEOUT
83+ Empty , timeout = GRPC_CONNECTION_TIMEOUT
4784 ).version
4885 except grpc .FutureCancelledError :
4986 core_version = "not connected"
5087
5188 try :
5289 serving_version = self ._serving_service_stub .GetFeastServingVersion (
53- GetFeastServingVersionRequest () , timeout = GRPC_CONNECTION_TIMEOUT
90+ Empty , timeout = GRPC_CONNECTION_TIMEOUT
5491 ).version
5592 except grpc .FutureCancelledError :
5693 serving_version = "not connected"
5794
5895 return {
59- "core" : {"url" : self ._core_url , "version" : core_version },
60- "serving" : {"url" : self ._serving_url , "version" : serving_version },
96+ "core" : {"url" : self .core_url , "version" : core_version },
97+ "serving" : {"url" : self .serving_url , "version" : serving_version },
6198 }
6299
63100 def _connect_core (self ):
64101 """
65- Connect to core api
102+ Connect to Core API
66103 """
67104
68105 if self .__core_channel is None :
69- self .__core_channel = grpc .insecure_channel (self ._core_url )
106+ self .__core_channel = grpc .insecure_channel (self .core_url )
70107
71108 try :
72109 grpc .channel_ready_future (self .__core_channel ).result (
@@ -75,18 +112,18 @@ def _connect_core(self):
75112 except grpc .FutureTimeoutError :
76113 raise ConnectionError (
77114 "connection timed out while attempting to connect to Feast Core gRPC server "
78- + self ._core_url
115+ + self .core_url
79116 )
80117 else :
81118 self ._core_service_stub = CoreServiceStub (self .__core_channel )
82119
83120 def _connect_serving (self ):
84121 """
85- Connect to serving api
122+ Connect to Serving API
86123 """
87124
88125 if self .__serving_channel is None :
89- self .__serving_channel = grpc .insecure_channel (self ._serving_url )
126+ self .__serving_channel = grpc .insecure_channel (self .serving_url )
90127
91128 try :
92129 grpc .channel_ready_future (self .__serving_channel ).result (
@@ -95,7 +132,32 @@ def _connect_serving(self):
95132 except grpc .FutureTimeoutError :
96133 raise ConnectionError (
97134 "connection timed out while attempting to connect to Feast Serving gRPC server "
98- + self ._serving_url
135+ + self .serving_url
99136 )
100137 else :
101138 self ._serving_service_stub = ServingServiceStub (self .__serving_channel )
139+
140+ def apply (self , resource ):
141+ if isinstance (resource , FeatureSet ):
142+ self ._apply_feature_set (resource )
143+
144+ def refresh (self ):
145+ self ._connect_all ()
146+ fs = self ._core_service_stub .GetFeatureSets () # type: GetFeatureSetsResponse
147+ self ._feature_sets = fs .featureSets
148+
149+ @property
150+ def feature_sets (self ) -> List [FeatureSet ]:
151+ return self ._feature_sets
152+
153+ @property
154+ def entities (self ) -> Dict [str , Entity ]:
155+ entities_dict = OrderedDict ()
156+ for fs in self ._feature_sets :
157+ for entityName , entity in fs .entities :
158+ entities_dict [entityName ] = entity
159+ return entities_dict
160+
161+ def _apply_feature_set (self , resource ):
162+ resource ._client = self
163+ # TODO: Apply Feature Set to Feast Core
0 commit comments