Skip to content

Commit 22bf06c

Browse files
authored
Add multi-source support to local and GCP provider (feast-dev#1454)
* Add multi-source support to local and GCP provider Signed-off-by: Willem Pienaar <git@willem.co> * Remove commented out code Signed-off-by: Willem Pienaar <git@willem.co>
1 parent e95700e commit 22bf06c

9 files changed

Lines changed: 680 additions & 455 deletions

File tree

sdk/python/feast/infra/gcp.py

Lines changed: 21 additions & 289 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,22 @@
11
import itertools
2-
import time
3-
from dataclasses import asdict, dataclass
4-
from datetime import datetime, timedelta
2+
from datetime import datetime
53
from multiprocessing.pool import ThreadPool
64
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union
75

86
import mmh3
97
import pandas
108
import pyarrow
11-
from google.cloud import bigquery
12-
from jinja2 import BaseLoader, Environment
139

1410
from feast import FeatureTable, utils
1511
from feast.data_source import BigQuerySource
1612
from feast.feature_view import FeatureView
1713
from feast.infra.key_encoding_utils import serialize_entity_key
14+
from feast.infra.offline_stores.helpers import get_offline_store_from_sources
1815
from feast.infra.provider import (
1916
Provider,
2017
RetrievalJob,
2118
_convert_arrow_to_proto,
2219
_get_column_names,
23-
_get_requested_feature_views_to_features_dict,
2420
_run_field_mapping,
2521
)
2622
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
@@ -29,7 +25,7 @@
2925
from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig
3026

3127

32-
class Gcp(Provider):
28+
class GcpProvider(Provider):
3329
_gcp_project_id: Optional[str]
3430

3531
def __init__(self, config: Optional[DatastoreOnlineStoreConfig]):
@@ -153,31 +149,16 @@ def materialize_single_feature_view(
153149
start_date = utils.make_tzaware(start_date)
154150
end_date = utils.make_tzaware(end_date)
155151

156-
from_expression = feature_view.input.get_table_query_string()
157-
158-
partition_by_join_key_string = ", ".join(join_key_columns)
159-
if partition_by_join_key_string != "":
160-
partition_by_join_key_string = (
161-
"PARTITION BY " + partition_by_join_key_string
162-
)
163-
timestamps = [event_timestamp_column]
164-
if created_timestamp_column is not None:
165-
timestamps.append(created_timestamp_column)
166-
timestamp_desc_string = " DESC, ".join(timestamps) + " DESC"
167-
field_string = ", ".join(join_key_columns + feature_name_columns + timestamps)
168-
169-
query = f"""
170-
SELECT {field_string}
171-
FROM (
172-
SELECT {field_string},
173-
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row
174-
FROM {from_expression}
175-
WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}')
176-
)
177-
WHERE _feast_row = 1
178-
"""
179-
180-
table = self._pull_query(query)
152+
offline_store = get_offline_store_from_sources([feature_view.input])
153+
table = offline_store.pull_latest_from_table_or_query(
154+
data_source=feature_view.input,
155+
join_key_columns=join_key_columns,
156+
feature_name_columns=feature_name_columns,
157+
event_timestamp_column=event_timestamp_column,
158+
created_timestamp_column=created_timestamp_column,
159+
start_date=start_date,
160+
end_date=end_date,
161+
)
181162

182163
if feature_view.input.field_mapping is not None:
183164
table = _run_field_mapping(table, feature_view.input.field_mapping)
@@ -205,31 +186,15 @@ def get_historical_features(
205186
feature_refs: List[str],
206187
entity_df: Union[pandas.DataFrame, str],
207188
) -> RetrievalJob:
208-
# TODO: Add entity_df validation in order to fail before interacting with BigQuery
209-
210-
if type(entity_df) is str:
211-
entity_df_sql_table = f"({entity_df})"
212-
elif isinstance(entity_df, pandas.DataFrame):
213-
table_id = _upload_entity_df_into_bigquery(config.project, entity_df)
214-
entity_df_sql_table = f"`{table_id}`"
215-
else:
216-
raise ValueError(
217-
f"The entity dataframe you have provided must be a Pandas DataFrame or BigQuery SQL query, "
218-
f"but we found: {type(entity_df)} "
219-
)
220-
221-
# Build a query context containing all information required to template the BigQuery SQL query
222-
query_context = get_feature_view_query_context(feature_refs, feature_views)
223-
224-
# TODO: Infer min_timestamp and max_timestamp from entity_df
225-
# Generate the BigQuery SQL query from the query context
226-
query = build_point_in_time_query(
227-
query_context,
228-
min_timestamp=datetime.now() - timedelta(days=365),
229-
max_timestamp=datetime.now() + timedelta(days=1),
230-
left_table_query_string=entity_df_sql_table,
189+
offline_store = get_offline_store_from_sources(
190+
[feature_view.input for feature_view in feature_views]
191+
)
192+
job = offline_store.get_historical_features(
193+
config=config,
194+
feature_views=feature_views,
195+
feature_refs=feature_refs,
196+
entity_df=entity_df,
231197
)
232-
job = BigQueryRetrievalJob(query=query)
233198
return job
234199

235200

@@ -342,236 +307,3 @@ def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str:
342307
do with the Entity concept we have in Feast.
343308
"""
344309
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()
345-
346-
347-
class BigQueryRetrievalJob(RetrievalJob):
348-
def __init__(self, query):
349-
self.query = query
350-
351-
def to_df(self):
352-
# TODO: Ideally only start this job when the user runs "get_historical_features", not when they run to_df()
353-
client = bigquery.Client()
354-
df = client.query(self.query).to_dataframe(create_bqstorage_client=True)
355-
return df
356-
357-
358-
@dataclass(frozen=True)
359-
class FeatureViewQueryContext:
360-
"""Context object used to template a BigQuery point-in-time SQL query"""
361-
362-
name: str
363-
ttl: int
364-
entities: List[str]
365-
features: List[str] # feature reference format
366-
table_ref: str
367-
event_timestamp_column: str
368-
created_timestamp_column: str
369-
field_mapping: Dict[str, str]
370-
query: str
371-
table_subquery: str
372-
373-
374-
def _upload_entity_df_into_bigquery(project, entity_df) -> str:
375-
"""Uploads a Pandas entity dataframe into a BigQuery table and returns a reference to the resulting table"""
376-
client = bigquery.Client()
377-
378-
# First create the BigQuery dataset if it doesn't exist
379-
dataset = bigquery.Dataset(f"{client.project}.feast_{project}")
380-
dataset.location = "US"
381-
client.create_dataset(
382-
dataset, exists_ok=True
383-
) # TODO: Consider moving this to apply or BigQueryOfflineStore
384-
385-
# Drop the index so that we dont have unnecessary columns
386-
entity_df.reset_index(drop=True, inplace=True)
387-
388-
# Upload the dataframe into BigQuery, creating a temporary table
389-
job_config = bigquery.LoadJobConfig()
390-
table_id = f"{client.project}.feast_{project}.entity_df_{int(time.time())}"
391-
job = client.load_table_from_dataframe(entity_df, table_id, job_config=job_config,)
392-
job.result()
393-
394-
# Ensure that the table expires after some time
395-
table = client.get_table(table=table_id)
396-
table.expires = datetime.utcnow() + timedelta(minutes=30)
397-
client.update_table(table, ["expires"])
398-
399-
return table_id
400-
401-
402-
def get_feature_view_query_context(
403-
feature_refs: List[str], feature_views: List[FeatureView]
404-
) -> List[FeatureViewQueryContext]:
405-
"""Build a query context containing all information required to template a BigQuery point-in-time SQL query"""
406-
407-
feature_views_to_feature_map = _get_requested_feature_views_to_features_dict(
408-
feature_refs, feature_views
409-
)
410-
411-
query_context = []
412-
for feature_view, features in feature_views_to_feature_map.items():
413-
entity_names = [entity for entity in feature_view.entities]
414-
415-
if isinstance(feature_view.ttl, timedelta):
416-
ttl_seconds = int(feature_view.ttl.total_seconds())
417-
else:
418-
ttl_seconds = 0
419-
420-
assert isinstance(feature_view.input, BigQuerySource)
421-
422-
context = FeatureViewQueryContext(
423-
name=feature_view.name,
424-
ttl=ttl_seconds,
425-
entities=entity_names,
426-
features=features,
427-
table_ref=feature_view.input.table_ref,
428-
event_timestamp_column=feature_view.input.event_timestamp_column,
429-
created_timestamp_column=feature_view.input.created_timestamp_column,
430-
# TODO: Make created column optional and not hardcoded
431-
field_mapping=feature_view.input.field_mapping,
432-
query=feature_view.input.query,
433-
table_subquery=feature_view.input.get_table_query_string(),
434-
)
435-
query_context.append(context)
436-
return query_context
437-
438-
439-
def build_point_in_time_query(
440-
feature_view_query_contexts: List[FeatureViewQueryContext],
441-
min_timestamp: datetime,
442-
max_timestamp: datetime,
443-
left_table_query_string: str,
444-
):
445-
"""Build point-in-time query between each feature view table and the entity dataframe"""
446-
template = Environment(loader=BaseLoader()).from_string(
447-
source=SINGLE_FEATURE_VIEW_POINT_IN_TIME_JOIN
448-
)
449-
450-
# Add additional fields to dict
451-
template_context = {
452-
"min_timestamp": min_timestamp,
453-
"max_timestamp": max_timestamp,
454-
"left_table_query_string": left_table_query_string,
455-
"featureviews": [asdict(context) for context in feature_view_query_contexts],
456-
}
457-
458-
query = template.render(template_context)
459-
return query
460-
461-
462-
# TODO: Optimizations
463-
# * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly
464-
# * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe
465-
# * Create temporary tables instead of keeping all tables in memory
466-
467-
SINGLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """
468-
WITH entity_dataframe AS (
469-
SELECT ROW_NUMBER() OVER() AS row_number, edf.* FROM {{ left_table_query_string }} as edf
470-
),
471-
{% for featureview in featureviews %}
472-
/*
473-
This query template performs the point-in-time correctness join for a single feature set table
474-
to the provided entity table.
475-
1. Concatenate the timestamp and entities from the feature set table with the entity dataset.
476-
Feature values are joined to this table later for improved efficiency.
477-
featureview_timestamp is equal to null in rows from the entity dataset.
478-
*/
479-
{{ featureview.name }}__union_features AS (
480-
SELECT
481-
-- unique identifier for each row in the entity dataset.
482-
row_number,
483-
-- event_timestamp contains the timestamps to join onto
484-
event_timestamp,
485-
-- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp
486-
NULL as {{ featureview.name }}_feature_timestamp,
487-
-- created timestamp of the feature at the corresponding feature_timestamp
488-
NULL as created_timestamp,
489-
-- select only entities belonging to this feature set
490-
{{ featureview.entities | join(', ')}},
491-
-- boolean for filtering the dataset later
492-
true AS is_entity_table
493-
FROM entity_dataframe
494-
UNION ALL
495-
SELECT
496-
NULL as row_number,
497-
{{ featureview.event_timestamp_column }} as event_timestamp,
498-
{{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp,
499-
{{ featureview.created_timestamp_column }} as created_timestamp,
500-
{{ featureview.entities | join(', ')}},
501-
false AS is_entity_table
502-
FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}'
503-
{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %}
504-
),
505-
/*
506-
2. Window the data in the unioned dataset, partitioning by entity and ordering by event_timestamp, as
507-
well as is_entity_table.
508-
Within each window, back-fill the feature_timestamp - as a result of this, the null feature_timestamps
509-
in the rows from the entity table should now contain the latest timestamps relative to the row's
510-
event_timestamp.
511-
For rows where event_timestamp(provided datetime) - feature_timestamp > max age, set the
512-
feature_timestamp to null.
513-
*/
514-
{{ featureview.name }}__joined AS (
515-
SELECT
516-
row_number,
517-
event_timestamp,
518-
{{ featureview.entities | join(', ')}},
519-
{% for feature in featureview.features %}
520-
IF(event_timestamp >= {{ featureview.name }}_feature_timestamp {% if featureview.ttl == 0 %}{% else %}AND Timestamp_sub(event_timestamp, interval {{ featureview.ttl }} second) < {{ featureview.name }}_feature_timestamp{% endif %}, {{ featureview.name }}__{{ feature }}, NULL) as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
521-
{% endfor %}
522-
FROM (
523-
SELECT
524-
row_number,
525-
event_timestamp,
526-
{{ featureview.entities | join(', ')}},
527-
FIRST_VALUE(created_timestamp IGNORE NULLS) over w AS created_timestamp,
528-
FIRST_VALUE({{ featureview.name }}_feature_timestamp IGNORE NULLS) over w AS {{ featureview.name }}_feature_timestamp,
529-
is_entity_table
530-
FROM {{ featureview.name }}__union_features
531-
WINDOW w AS (PARTITION BY {{ featureview.entities | join(', ') }} ORDER BY event_timestamp DESC, is_entity_table DESC, created_timestamp DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
532-
)
533-
/*
534-
3. Select only the rows from the entity table, and join the features from the original feature set table
535-
to the dataset using the entity values, feature_timestamp, and created_timestamps.
536-
*/
537-
LEFT JOIN (
538-
SELECT
539-
{{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp,
540-
{{ featureview.created_timestamp_column }} as created_timestamp,
541-
{{ featureview.entities | join(', ')}},
542-
{% for feature in featureview.features %}
543-
{{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
544-
{% endfor %}
545-
FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}'
546-
{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %}
547-
) USING ({{ featureview.name }}_feature_timestamp, created_timestamp, {{ featureview.entities | join(', ')}})
548-
WHERE is_entity_table
549-
),
550-
/*
551-
4. Finally, deduplicate the rows by selecting the first occurrence of each entity table row_number.
552-
*/
553-
{{ featureview.name }}__deduped AS (SELECT
554-
k.*
555-
FROM (
556-
SELECT ARRAY_AGG(row LIMIT 1)[OFFSET(0)] k
557-
FROM {{ featureview.name }}__joined row
558-
GROUP BY row_number
559-
)){% if loop.last %}{% else %}, {% endif %}
560-
561-
{% endfor %}
562-
/*
563-
Joins the outputs of multiple time travel joins to a single table.
564-
*/
565-
SELECT edf.event_timestamp as event_timestamp, * EXCEPT (row_number, event_timestamp) FROM entity_dataframe edf
566-
{% for featureview in featureviews %}
567-
LEFT JOIN (
568-
SELECT
569-
row_number,
570-
{% for feature in featureview.features %}
571-
{{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
572-
{% endfor %}
573-
FROM {{ featureview.name }}__deduped
574-
) USING (row_number)
575-
{% endfor %}
576-
ORDER BY event_timestamp
577-
"""

0 commit comments

Comments
 (0)