|
1 | 1 | import itertools |
2 | | -import time |
3 | | -from dataclasses import asdict, dataclass |
4 | | -from datetime import datetime, timedelta |
| 2 | +from datetime import datetime |
5 | 3 | from multiprocessing.pool import ThreadPool |
6 | 4 | from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union |
7 | 5 |
|
8 | 6 | import mmh3 |
9 | 7 | import pandas |
10 | 8 | import pyarrow |
11 | | -from google.cloud import bigquery |
12 | | -from jinja2 import BaseLoader, Environment |
13 | 9 |
|
14 | 10 | from feast import FeatureTable, utils |
15 | 11 | from feast.data_source import BigQuerySource |
16 | 12 | from feast.feature_view import FeatureView |
17 | 13 | from feast.infra.key_encoding_utils import serialize_entity_key |
| 14 | +from feast.infra.offline_stores.helpers import get_offline_store_from_sources |
18 | 15 | from feast.infra.provider import ( |
19 | 16 | Provider, |
20 | 17 | RetrievalJob, |
21 | 18 | _convert_arrow_to_proto, |
22 | 19 | _get_column_names, |
23 | | - _get_requested_feature_views_to_features_dict, |
24 | 20 | _run_field_mapping, |
25 | 21 | ) |
26 | 22 | from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto |
|
29 | 25 | from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig |
30 | 26 |
|
31 | 27 |
|
32 | | -class Gcp(Provider): |
| 28 | +class GcpProvider(Provider): |
33 | 29 | _gcp_project_id: Optional[str] |
34 | 30 |
|
35 | 31 | def __init__(self, config: Optional[DatastoreOnlineStoreConfig]): |
@@ -153,31 +149,16 @@ def materialize_single_feature_view( |
153 | 149 | start_date = utils.make_tzaware(start_date) |
154 | 150 | end_date = utils.make_tzaware(end_date) |
155 | 151 |
|
156 | | - from_expression = feature_view.input.get_table_query_string() |
157 | | - |
158 | | - partition_by_join_key_string = ", ".join(join_key_columns) |
159 | | - if partition_by_join_key_string != "": |
160 | | - partition_by_join_key_string = ( |
161 | | - "PARTITION BY " + partition_by_join_key_string |
162 | | - ) |
163 | | - timestamps = [event_timestamp_column] |
164 | | - if created_timestamp_column is not None: |
165 | | - timestamps.append(created_timestamp_column) |
166 | | - timestamp_desc_string = " DESC, ".join(timestamps) + " DESC" |
167 | | - field_string = ", ".join(join_key_columns + feature_name_columns + timestamps) |
168 | | - |
169 | | - query = f""" |
170 | | - SELECT {field_string} |
171 | | - FROM ( |
172 | | - SELECT {field_string}, |
173 | | - ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row |
174 | | - FROM {from_expression} |
175 | | - WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}') |
176 | | - ) |
177 | | - WHERE _feast_row = 1 |
178 | | - """ |
179 | | - |
180 | | - table = self._pull_query(query) |
| 152 | + offline_store = get_offline_store_from_sources([feature_view.input]) |
| 153 | + table = offline_store.pull_latest_from_table_or_query( |
| 154 | + data_source=feature_view.input, |
| 155 | + join_key_columns=join_key_columns, |
| 156 | + feature_name_columns=feature_name_columns, |
| 157 | + event_timestamp_column=event_timestamp_column, |
| 158 | + created_timestamp_column=created_timestamp_column, |
| 159 | + start_date=start_date, |
| 160 | + end_date=end_date, |
| 161 | + ) |
181 | 162 |
|
182 | 163 | if feature_view.input.field_mapping is not None: |
183 | 164 | table = _run_field_mapping(table, feature_view.input.field_mapping) |
@@ -205,31 +186,15 @@ def get_historical_features( |
205 | 186 | feature_refs: List[str], |
206 | 187 | entity_df: Union[pandas.DataFrame, str], |
207 | 188 | ) -> RetrievalJob: |
208 | | - # TODO: Add entity_df validation in order to fail before interacting with BigQuery |
209 | | - |
210 | | - if type(entity_df) is str: |
211 | | - entity_df_sql_table = f"({entity_df})" |
212 | | - elif isinstance(entity_df, pandas.DataFrame): |
213 | | - table_id = _upload_entity_df_into_bigquery(config.project, entity_df) |
214 | | - entity_df_sql_table = f"`{table_id}`" |
215 | | - else: |
216 | | - raise ValueError( |
217 | | - f"The entity dataframe you have provided must be a Pandas DataFrame or BigQuery SQL query, " |
218 | | - f"but we found: {type(entity_df)} " |
219 | | - ) |
220 | | - |
221 | | - # Build a query context containing all information required to template the BigQuery SQL query |
222 | | - query_context = get_feature_view_query_context(feature_refs, feature_views) |
223 | | - |
224 | | - # TODO: Infer min_timestamp and max_timestamp from entity_df |
225 | | - # Generate the BigQuery SQL query from the query context |
226 | | - query = build_point_in_time_query( |
227 | | - query_context, |
228 | | - min_timestamp=datetime.now() - timedelta(days=365), |
229 | | - max_timestamp=datetime.now() + timedelta(days=1), |
230 | | - left_table_query_string=entity_df_sql_table, |
| 189 | + offline_store = get_offline_store_from_sources( |
| 190 | + [feature_view.input for feature_view in feature_views] |
| 191 | + ) |
| 192 | + job = offline_store.get_historical_features( |
| 193 | + config=config, |
| 194 | + feature_views=feature_views, |
| 195 | + feature_refs=feature_refs, |
| 196 | + entity_df=entity_df, |
231 | 197 | ) |
232 | | - job = BigQueryRetrievalJob(query=query) |
233 | 198 | return job |
234 | 199 |
|
235 | 200 |
|
@@ -342,236 +307,3 @@ def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str: |
342 | 307 | do with the Entity concept we have in Feast. |
343 | 308 | """ |
344 | 309 | return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex() |
345 | | - |
346 | | - |
347 | | -class BigQueryRetrievalJob(RetrievalJob): |
348 | | - def __init__(self, query): |
349 | | - self.query = query |
350 | | - |
351 | | - def to_df(self): |
352 | | - # TODO: Ideally only start this job when the user runs "get_historical_features", not when they run to_df() |
353 | | - client = bigquery.Client() |
354 | | - df = client.query(self.query).to_dataframe(create_bqstorage_client=True) |
355 | | - return df |
356 | | - |
357 | | - |
358 | | -@dataclass(frozen=True) |
359 | | -class FeatureViewQueryContext: |
360 | | - """Context object used to template a BigQuery point-in-time SQL query""" |
361 | | - |
362 | | - name: str |
363 | | - ttl: int |
364 | | - entities: List[str] |
365 | | - features: List[str] # feature reference format |
366 | | - table_ref: str |
367 | | - event_timestamp_column: str |
368 | | - created_timestamp_column: str |
369 | | - field_mapping: Dict[str, str] |
370 | | - query: str |
371 | | - table_subquery: str |
372 | | - |
373 | | - |
374 | | -def _upload_entity_df_into_bigquery(project, entity_df) -> str: |
375 | | - """Uploads a Pandas entity dataframe into a BigQuery table and returns a reference to the resulting table""" |
376 | | - client = bigquery.Client() |
377 | | - |
378 | | - # First create the BigQuery dataset if it doesn't exist |
379 | | - dataset = bigquery.Dataset(f"{client.project}.feast_{project}") |
380 | | - dataset.location = "US" |
381 | | - client.create_dataset( |
382 | | - dataset, exists_ok=True |
383 | | - ) # TODO: Consider moving this to apply or BigQueryOfflineStore |
384 | | - |
385 | | - # Drop the index so that we dont have unnecessary columns |
386 | | - entity_df.reset_index(drop=True, inplace=True) |
387 | | - |
388 | | - # Upload the dataframe into BigQuery, creating a temporary table |
389 | | - job_config = bigquery.LoadJobConfig() |
390 | | - table_id = f"{client.project}.feast_{project}.entity_df_{int(time.time())}" |
391 | | - job = client.load_table_from_dataframe(entity_df, table_id, job_config=job_config,) |
392 | | - job.result() |
393 | | - |
394 | | - # Ensure that the table expires after some time |
395 | | - table = client.get_table(table=table_id) |
396 | | - table.expires = datetime.utcnow() + timedelta(minutes=30) |
397 | | - client.update_table(table, ["expires"]) |
398 | | - |
399 | | - return table_id |
400 | | - |
401 | | - |
402 | | -def get_feature_view_query_context( |
403 | | - feature_refs: List[str], feature_views: List[FeatureView] |
404 | | -) -> List[FeatureViewQueryContext]: |
405 | | - """Build a query context containing all information required to template a BigQuery point-in-time SQL query""" |
406 | | - |
407 | | - feature_views_to_feature_map = _get_requested_feature_views_to_features_dict( |
408 | | - feature_refs, feature_views |
409 | | - ) |
410 | | - |
411 | | - query_context = [] |
412 | | - for feature_view, features in feature_views_to_feature_map.items(): |
413 | | - entity_names = [entity for entity in feature_view.entities] |
414 | | - |
415 | | - if isinstance(feature_view.ttl, timedelta): |
416 | | - ttl_seconds = int(feature_view.ttl.total_seconds()) |
417 | | - else: |
418 | | - ttl_seconds = 0 |
419 | | - |
420 | | - assert isinstance(feature_view.input, BigQuerySource) |
421 | | - |
422 | | - context = FeatureViewQueryContext( |
423 | | - name=feature_view.name, |
424 | | - ttl=ttl_seconds, |
425 | | - entities=entity_names, |
426 | | - features=features, |
427 | | - table_ref=feature_view.input.table_ref, |
428 | | - event_timestamp_column=feature_view.input.event_timestamp_column, |
429 | | - created_timestamp_column=feature_view.input.created_timestamp_column, |
430 | | - # TODO: Make created column optional and not hardcoded |
431 | | - field_mapping=feature_view.input.field_mapping, |
432 | | - query=feature_view.input.query, |
433 | | - table_subquery=feature_view.input.get_table_query_string(), |
434 | | - ) |
435 | | - query_context.append(context) |
436 | | - return query_context |
437 | | - |
438 | | - |
439 | | -def build_point_in_time_query( |
440 | | - feature_view_query_contexts: List[FeatureViewQueryContext], |
441 | | - min_timestamp: datetime, |
442 | | - max_timestamp: datetime, |
443 | | - left_table_query_string: str, |
444 | | -): |
445 | | - """Build point-in-time query between each feature view table and the entity dataframe""" |
446 | | - template = Environment(loader=BaseLoader()).from_string( |
447 | | - source=SINGLE_FEATURE_VIEW_POINT_IN_TIME_JOIN |
448 | | - ) |
449 | | - |
450 | | - # Add additional fields to dict |
451 | | - template_context = { |
452 | | - "min_timestamp": min_timestamp, |
453 | | - "max_timestamp": max_timestamp, |
454 | | - "left_table_query_string": left_table_query_string, |
455 | | - "featureviews": [asdict(context) for context in feature_view_query_contexts], |
456 | | - } |
457 | | - |
458 | | - query = template.render(template_context) |
459 | | - return query |
460 | | - |
461 | | - |
462 | | -# TODO: Optimizations |
463 | | -# * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly |
464 | | -# * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe |
465 | | -# * Create temporary tables instead of keeping all tables in memory |
466 | | - |
467 | | -SINGLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """ |
468 | | -WITH entity_dataframe AS ( |
469 | | - SELECT ROW_NUMBER() OVER() AS row_number, edf.* FROM {{ left_table_query_string }} as edf |
470 | | -), |
471 | | -{% for featureview in featureviews %} |
472 | | -/* |
473 | | - This query template performs the point-in-time correctness join for a single feature set table |
474 | | - to the provided entity table. |
475 | | - 1. Concatenate the timestamp and entities from the feature set table with the entity dataset. |
476 | | - Feature values are joined to this table later for improved efficiency. |
477 | | - featureview_timestamp is equal to null in rows from the entity dataset. |
478 | | - */ |
479 | | -{{ featureview.name }}__union_features AS ( |
480 | | -SELECT |
481 | | - -- unique identifier for each row in the entity dataset. |
482 | | - row_number, |
483 | | - -- event_timestamp contains the timestamps to join onto |
484 | | - event_timestamp, |
485 | | - -- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp |
486 | | - NULL as {{ featureview.name }}_feature_timestamp, |
487 | | - -- created timestamp of the feature at the corresponding feature_timestamp |
488 | | - NULL as created_timestamp, |
489 | | - -- select only entities belonging to this feature set |
490 | | - {{ featureview.entities | join(', ')}}, |
491 | | - -- boolean for filtering the dataset later |
492 | | - true AS is_entity_table |
493 | | -FROM entity_dataframe |
494 | | -UNION ALL |
495 | | -SELECT |
496 | | - NULL as row_number, |
497 | | - {{ featureview.event_timestamp_column }} as event_timestamp, |
498 | | - {{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp, |
499 | | - {{ featureview.created_timestamp_column }} as created_timestamp, |
500 | | - {{ featureview.entities | join(', ')}}, |
501 | | - false AS is_entity_table |
502 | | -FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}' |
503 | | -{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %} |
504 | | -), |
505 | | -/* |
506 | | - 2. Window the data in the unioned dataset, partitioning by entity and ordering by event_timestamp, as |
507 | | - well as is_entity_table. |
508 | | - Within each window, back-fill the feature_timestamp - as a result of this, the null feature_timestamps |
509 | | - in the rows from the entity table should now contain the latest timestamps relative to the row's |
510 | | - event_timestamp. |
511 | | - For rows where event_timestamp(provided datetime) - feature_timestamp > max age, set the |
512 | | - feature_timestamp to null. |
513 | | - */ |
514 | | -{{ featureview.name }}__joined AS ( |
515 | | -SELECT |
516 | | - row_number, |
517 | | - event_timestamp, |
518 | | - {{ featureview.entities | join(', ')}}, |
519 | | - {% for feature in featureview.features %} |
520 | | - IF(event_timestamp >= {{ featureview.name }}_feature_timestamp {% if featureview.ttl == 0 %}{% else %}AND Timestamp_sub(event_timestamp, interval {{ featureview.ttl }} second) < {{ featureview.name }}_feature_timestamp{% endif %}, {{ featureview.name }}__{{ feature }}, NULL) as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %} |
521 | | - {% endfor %} |
522 | | -FROM ( |
523 | | -SELECT |
524 | | - row_number, |
525 | | - event_timestamp, |
526 | | - {{ featureview.entities | join(', ')}}, |
527 | | - FIRST_VALUE(created_timestamp IGNORE NULLS) over w AS created_timestamp, |
528 | | - FIRST_VALUE({{ featureview.name }}_feature_timestamp IGNORE NULLS) over w AS {{ featureview.name }}_feature_timestamp, |
529 | | - is_entity_table |
530 | | -FROM {{ featureview.name }}__union_features |
531 | | -WINDOW w AS (PARTITION BY {{ featureview.entities | join(', ') }} ORDER BY event_timestamp DESC, is_entity_table DESC, created_timestamp DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) |
532 | | -) |
533 | | -/* |
534 | | - 3. Select only the rows from the entity table, and join the features from the original feature set table |
535 | | - to the dataset using the entity values, feature_timestamp, and created_timestamps. |
536 | | - */ |
537 | | -LEFT JOIN ( |
538 | | -SELECT |
539 | | - {{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp, |
540 | | - {{ featureview.created_timestamp_column }} as created_timestamp, |
541 | | - {{ featureview.entities | join(', ')}}, |
542 | | - {% for feature in featureview.features %} |
543 | | - {{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %} |
544 | | - {% endfor %} |
545 | | -FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}' |
546 | | -{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %} |
547 | | -) USING ({{ featureview.name }}_feature_timestamp, created_timestamp, {{ featureview.entities | join(', ')}}) |
548 | | -WHERE is_entity_table |
549 | | -), |
550 | | -/* |
551 | | - 4. Finally, deduplicate the rows by selecting the first occurrence of each entity table row_number. |
552 | | - */ |
553 | | -{{ featureview.name }}__deduped AS (SELECT |
554 | | - k.* |
555 | | -FROM ( |
556 | | - SELECT ARRAY_AGG(row LIMIT 1)[OFFSET(0)] k |
557 | | - FROM {{ featureview.name }}__joined row |
558 | | - GROUP BY row_number |
559 | | -)){% if loop.last %}{% else %}, {% endif %} |
560 | | -
|
561 | | -{% endfor %} |
562 | | -/* |
563 | | - Joins the outputs of multiple time travel joins to a single table. |
564 | | - */ |
565 | | -SELECT edf.event_timestamp as event_timestamp, * EXCEPT (row_number, event_timestamp) FROM entity_dataframe edf |
566 | | -{% for featureview in featureviews %} |
567 | | -LEFT JOIN ( |
568 | | - SELECT |
569 | | - row_number, |
570 | | - {% for feature in featureview.features %} |
571 | | - {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %} |
572 | | - {% endfor %} |
573 | | - FROM {{ featureview.name }}__deduped |
574 | | -) USING (row_number) |
575 | | -{% endfor %} |
576 | | -ORDER BY event_timestamp |
577 | | -""" |
0 commit comments