1- import itertools
21from datetime import datetime
3- from multiprocessing .pool import ThreadPool
4- from typing import Any , Callable , Dict , Iterator , List , Optional , Sequence , Tuple , Union
2+ from typing import Any , Callable , Dict , List , Optional , Sequence , Tuple , Union
53
6- import mmh3
74import pandas
85from tqdm import tqdm
96
10- from feast import FeatureTable , utils
7+ from feast import FeatureTable
118from feast .entity import Entity
12- from feast .errors import FeastProviderLoginError
139from feast .feature_view import FeatureView
14- from feast .infra .key_encoding_utils import serialize_entity_key
1510from feast .infra .offline_stores .helpers import get_offline_store_from_config
11+ from feast .infra .online_stores .helpers import get_online_store_from_config
1612from feast .infra .provider import (
1713 Provider ,
1814 RetrievalJob ,
2319from feast .protos .feast .types .EntityKey_pb2 import EntityKey as EntityKeyProto
2420from feast .protos .feast .types .Value_pb2 import Value as ValueProto
2521from feast .registry import Registry
26- from feast .repo_config import DatastoreOnlineStoreConfig , RepoConfig
27-
28- try :
29- from google .auth .exceptions import DefaultCredentialsError
30- from google .cloud import datastore
31- except ImportError as e :
32- from feast .errors import FeastExtrasDependencyImportError
33-
34- raise FeastExtrasDependencyImportError ("gcp" , str (e ))
22+ from feast .repo_config import RepoConfig
3523
3624
3725class GcpProvider (Provider ):
3826 _gcp_project_id : Optional [str ]
3927 _namespace : Optional [str ]
4028
4129 def __init__ (self , config : RepoConfig ):
42- assert isinstance (config .online_store , DatastoreOnlineStoreConfig )
43- self ._gcp_project_id = config .online_store .project_id
44- self ._namespace = config .online_store .namespace
45- self ._write_concurrency = config .online_store .write_concurrency
46- self ._write_batch_size = config .online_store .write_batch_size
47-
48- assert config .offline_store is not None
30+ self .repo_config = config
4931 self .offline_store = get_offline_store_from_config (config .offline_store )
50-
51- def _initialize_client (self ):
52- try :
53- return datastore .Client (
54- project = self ._gcp_project_id , namespace = self ._namespace
55- )
56- except DefaultCredentialsError as e :
57- raise FeastProviderLoginError (
58- str (e )
59- + '\n It may be necessary to run "gcloud auth application-default login" if you would like to use your '
60- "local Google Cloud account "
61- )
32+ self .online_store = get_online_store_from_config (config .online_store )
6233
6334 def update_infra (
6435 self ,
@@ -69,85 +40,43 @@ def update_infra(
6940 entities_to_keep : Sequence [Entity ],
7041 partial : bool ,
7142 ):
72-
73- client = self ._initialize_client ()
74-
75- for table in tables_to_keep :
76- key = client .key ("Project" , project , "Table" , table .name )
77- entity = datastore .Entity (
78- key = key , exclude_from_indexes = ("created_ts" , "event_ts" , "values" )
79- )
80- entity .update ({"created_ts" : datetime .utcnow ()})
81- client .put (entity )
82-
83- for table in tables_to_delete :
84- _delete_all_values (
85- client , client .key ("Project" , project , "Table" , table .name )
86- )
87-
88- # Delete the table metadata datastore entity
89- key = client .key ("Project" , project , "Table" , table .name )
90- client .delete (key )
43+ self .online_store .update (
44+ config = self .repo_config ,
45+ tables_to_delete = tables_to_delete ,
46+ tables_to_keep = tables_to_keep ,
47+ entities_to_keep = entities_to_keep ,
48+ entities_to_delete = entities_to_delete ,
49+ partial = partial ,
50+ )
9151
9252 def teardown_infra (
9353 self ,
9454 project : str ,
9555 tables : Sequence [Union [FeatureTable , FeatureView ]],
9656 entities : Sequence [Entity ],
9757 ) -> None :
98- client = self ._initialize_client ()
99-
100- for table in tables :
101- _delete_all_values (
102- client , client .key ("Project" , project , "Table" , table .name )
103- )
104-
105- # Delete the table metadata datastore entity
106- key = client .key ("Project" , project , "Table" , table .name )
107- client .delete (key )
58+ self .online_store .teardown (self .repo_config , tables , entities )
10859
10960 def online_write_batch (
11061 self ,
111- project : str ,
62+ config : RepoConfig ,
11263 table : Union [FeatureTable , FeatureView ],
11364 data : List [
11465 Tuple [EntityKeyProto , Dict [str , ValueProto ], datetime , Optional [datetime ]]
11566 ],
11667 progress : Optional [Callable [[int ], Any ]],
11768 ) -> None :
118- client = self ._initialize_client ()
119-
120- pool = ThreadPool (processes = self ._write_concurrency )
121- pool .map (
122- lambda b : _write_minibatch (client , project , table , b , progress ),
123- _to_minibatches (data , batch_size = self ._write_batch_size ),
124- )
69+ self .online_store .online_write_batch (config , table , data , progress )
12570
12671 def online_read (
12772 self ,
128- project : str ,
73+ config : RepoConfig ,
12974 table : Union [FeatureTable , FeatureView ],
13075 entity_keys : List [EntityKeyProto ],
13176 requested_features : List [str ] = None ,
13277 ) -> List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]]:
133- client = self ._initialize_client ( )
78+ result = self .online_store . online_read ( config , table , entity_keys )
13479
135- result : List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]] = []
136- for entity_key in entity_keys :
137- document_id = compute_datastore_entity_id (entity_key )
138- key = client .key (
139- "Project" , project , "Table" , table .name , "Row" , document_id
140- )
141- value = client .get (key )
142- if value is not None :
143- res = {}
144- for feature_name , value_bin in value ["values" ].items ():
145- val = ValueProto ()
146- val .ParseFromString (value_bin )
147- res [feature_name ] = val
148- result .append ((value ["event_ts" ], res ))
149- else :
150- result .append ((None , None ))
15180 return result
15281
15382 def materialize_single_feature_view (
@@ -188,7 +117,7 @@ def materialize_single_feature_view(
188117
189118 with tqdm_builder (len (rows_to_write )) as pbar :
190119 self .online_write_batch (
191- project , feature_view , rows_to_write , lambda x : pbar .update (x )
120+ self . repo_config , feature_view , rows_to_write , lambda x : pbar .update (x )
192121 )
193122
194123 def get_historical_features (
@@ -209,84 +138,3 @@ def get_historical_features(
209138 project = project ,
210139 )
211140 return job
212-
213-
214- ProtoBatch = Sequence [
215- Tuple [EntityKeyProto , Dict [str , ValueProto ], datetime , Optional [datetime ]]
216- ]
217-
218-
219- def _to_minibatches (data : ProtoBatch , batch_size ) -> Iterator [ProtoBatch ]:
220- """
221- Split data into minibatches, making sure we stay under GCP datastore transaction size
222- limits.
223- """
224- iterable = iter (data )
225-
226- while True :
227- batch = list (itertools .islice (iterable , batch_size ))
228- if len (batch ) > 0 :
229- yield batch
230- else :
231- break
232-
233-
234- def _write_minibatch (
235- client ,
236- project : str ,
237- table : Union [FeatureTable , FeatureView ],
238- data : Sequence [
239- Tuple [EntityKeyProto , Dict [str , ValueProto ], datetime , Optional [datetime ]]
240- ],
241- progress : Optional [Callable [[int ], Any ]],
242- ):
243- entities = []
244- for entity_key , features , timestamp , created_ts in data :
245- document_id = compute_datastore_entity_id (entity_key )
246-
247- key = client .key ("Project" , project , "Table" , table .name , "Row" , document_id ,)
248-
249- entity = datastore .Entity (
250- key = key , exclude_from_indexes = ("created_ts" , "event_ts" , "values" )
251- )
252-
253- entity .update (
254- dict (
255- key = entity_key .SerializeToString (),
256- values = {k : v .SerializeToString () for k , v in features .items ()},
257- event_ts = utils .make_tzaware (timestamp ),
258- created_ts = (
259- utils .make_tzaware (created_ts ) if created_ts is not None else None
260- ),
261- )
262- )
263- entities .append (entity )
264- with client .transaction ():
265- client .put_multi (entities )
266-
267- if progress :
268- progress (len (entities ))
269-
270-
271- def _delete_all_values (client , key ) -> None :
272- """
273- Delete all data under the key path in datastore.
274- """
275- while True :
276- query = client .query (kind = "Row" , ancestor = key )
277- entities = list (query .fetch (limit = 1000 ))
278- if not entities :
279- return
280-
281- for entity in entities :
282- client .delete (entity .key )
283-
284-
285- def compute_datastore_entity_id (entity_key : EntityKeyProto ) -> str :
286- """
287- Compute Datastore Entity id given Feast Entity Key.
288-
289- Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to
290- do with the Entity concept we have in Feast.
291- """
292- return mmh3 .hash_bytes (serialize_entity_key (entity_key )).hex ()
0 commit comments