Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fix test
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
  • Loading branch information
HaoXuAI committed Apr 18, 2025
commit cdae95b080c14c466c5ad22d2422fa28fd4c7bf3
1 change: 1 addition & 0 deletions sdk/python/feast/infra/compute_engines/local/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def execute(self, context: ExecutionContext) -> ArrowTableValue:
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=self.start_time,
end_date=self.end_time,
)
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/infra/compute_engines/spark/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def execute(self, context: ExecutionContext) -> DAGValue:
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=self.start_time,
end_date=self.end_time,
)
Expand Down
7 changes: 6 additions & 1 deletion sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def pull_all_from_table_or_query(
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
Expand All @@ -202,10 +203,14 @@ def pull_all_from_table_or_query(
project=project_id,
location=config.offline_store.location,
)

timestamp_fields = [timestamp_field]
if created_timestamp_column:
timestamp_fields.append(created_timestamp_column)
field_string = ", ".join(
BigQueryOfflineStore._escape_query_columns(join_key_columns)
+ BigQueryOfflineStore._escape_query_columns(feature_name_columns)
+ [timestamp_field]
+ timestamp_fields
)
timestamp_filter = get_timestamp_filter_sql(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

major change: get the timestamp filter from get_timestamp_filter_sql

start_date,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,19 @@ def pull_all_from_table_or_query(
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
assert isinstance(config.offline_store, AthenaOfflineStoreConfig)
assert isinstance(data_source, AthenaSource)
from_expression = data_source.get_table_query_string(config)

timestamp_fields = [timestamp_field]
if created_timestamp_column:
timestamp_fields.append(created_timestamp_column)
field_string = ", ".join(
join_key_columns + feature_name_columns + [timestamp_field]
join_key_columns + feature_name_columns + timestamp_fields
)

athena_client = aws_utils.get_athena_data_client(config.offline_store.region)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ def pull_all_from_table_or_query(
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
Expand All @@ -244,8 +245,11 @@ def pull_all_from_table_or_query(
assert isinstance(data_source, CouchbaseColumnarSource)
from_expression = data_source.get_table_query_string()

timestamp_fields = [timestamp_field]
if created_timestamp_column:
timestamp_fields.append(created_timestamp_column)
field_string = ", ".join(
join_key_columns + feature_name_columns + [timestamp_field]
join_key_columns + feature_name_columns + timestamp_fields
)
start_date_normalized = (
f"`{normalize_timestamp(start_date)}`" if start_date else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def pull_all_from_table_or_query(
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
Expand All @@ -186,6 +187,7 @@ def pull_all_from_table_or_query(
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
data_source_reader=_build_data_source_reader(config),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,19 @@ def pull_all_from_table_or_query(
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
assert isinstance(config.offline_store, PostgreSQLOfflineStoreConfig)
assert isinstance(data_source, PostgreSQLSource)
from_expression = data_source.get_table_query_string()

timestamp_fields = [timestamp_field]
if created_timestamp_column:
timestamp_fields.append(created_timestamp_column)
field_string = ", ".join(
join_key_columns + feature_name_columns + [timestamp_field]
join_key_columns + feature_name_columns + timestamp_fields
)

timestamp_filter = get_timestamp_filter_sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ def pull_all_from_table_or_query(
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
Expand All @@ -289,8 +290,12 @@ def pull_all_from_table_or_query(
spark_session = get_spark_session_or_start_new_with_repoconfig(
store_config=config.offline_store
)

timestamp_fields = [timestamp_field]
if created_timestamp_column:
timestamp_fields.append(created_timestamp_column)
(fields_with_aliases, aliases) = _get_fields_with_aliases(
fields=join_key_columns + feature_name_columns + [timestamp_field],
fields=join_key_columns + feature_name_columns + timestamp_fields,
field_mappings=data_source.field_mapping,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ def pull_all_from_table_or_query(
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
Expand All @@ -414,8 +415,12 @@ def pull_all_from_table_or_query(
from_expression = data_source.get_table_query_string()

client = _get_trino_client(config=config)

timestamp_fields = [timestamp_field]
if created_timestamp_column:
timestamp_fields.append(created_timestamp_column)
field_string = ", ".join(
join_key_columns + feature_name_columns + [timestamp_field]
join_key_columns + feature_name_columns + timestamp_fields
)

timestamp_filter = get_timestamp_filter_sql(
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/feast/infra/offline_stores/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ def pull_all_from_table_or_query(
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
Expand All @@ -415,7 +416,7 @@ def pull_all_from_table_or_query(
+ [timestamp_field], # avoid deduplication
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=None,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/infra/offline_stores/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def pull_all_from_table_or_query(
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
Expand All @@ -188,6 +189,7 @@ def pull_all_from_table_or_query(
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
data_source_reader=_read_data_source,
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/infra/offline_stores/ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,16 @@ def pull_all_from_table_or_query_ibis(
timestamp_field: str,
data_source_reader: Callable[[DataSource, str], Table],
data_source_writer: Callable[[pyarrow.Table, DataSource, str], None],
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
staging_location: Optional[str] = None,
staging_location_endpoint_override: Optional[str] = None,
) -> RetrievalJob:
fields = join_key_columns + feature_name_columns + [timestamp_field]
timestamp_fields = [timestamp_field]
if created_timestamp_column:
timestamp_fields.append(created_timestamp_column)
fields = join_key_columns + feature_name_columns + timestamp_fields
if start_date:
start_date = start_date.astimezone(tz=timezone.utc)
if end_date:
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ def pull_all_from_table_or_query(
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
Expand All @@ -309,7 +310,8 @@ def pull_all_from_table_or_query(
data_source: The data source from which the entity rows will be extracted.
join_key_columns: The columns of the join keys.
feature_name_columns: The columns of the features.
timestamp_field: The timestamp column.
timestamp_field: The timestamp column, used to determine which rows are the most recent.
created_timestamp_column (Optional): The column indicating when the row was created, used to break ties.
start_date (Optional): The start of the time range.
end_date (Optional): The end of the time range.

Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,19 @@ def pull_all_from_table_or_query(
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
assert isinstance(config.offline_store, RedshiftOfflineStoreConfig)
assert isinstance(data_source, RedshiftSource)
from_expression = data_source.get_table_query_string()

timestamp_fields = [timestamp_field]
if created_timestamp_column:
timestamp_fields.append(created_timestamp_column)
field_string = ", ".join(
join_key_columns + feature_name_columns + [timestamp_field]
join_key_columns + feature_name_columns + timestamp_fields
)

redshift_client = aws_utils.get_redshift_data_client(
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/infra/offline_stores/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ def pull_all_from_table_or_query(
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
Expand All @@ -253,6 +254,7 @@ def pull_all_from_table_or_query(
"join_key_columns": join_key_columns,
"feature_name_columns": feature_name_columns,
"timestamp_field": timestamp_field,
"created_timestamp_column": created_timestamp_column,
"start_date": start_date.isoformat() if start_date else None,
"end_date": end_date.isoformat() if end_date else None,
}
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def pull_all_from_table_or_query(
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
Expand All @@ -242,9 +243,12 @@ def pull_all_from_table_or_query(
if not data_source.database and data_source.schema and data_source.table:
from_expression = f'"{config.offline_store.database}".{from_expression}'

timestamp_fields = [timestamp_field]
if created_timestamp_column:
timestamp_fields.append(created_timestamp_column)
field_string = (
'"'
+ '", "'.join(join_key_columns + feature_name_columns + [timestamp_field])
+ '", "'.join(join_key_columns + feature_name_columns + timestamp_fields)
+ '"'
)

Expand Down
16 changes: 9 additions & 7 deletions sdk/python/feast/offline_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,13 +354,15 @@ def pull_all_from_table_or_query(self, command: dict):
assert_permissions(data_source, actions=[AuthzedAction.READ_OFFLINE])

return self.offline_store.pull_all_from_table_or_query(
self.store.config,
data_source,
command["join_key_columns"],
command["feature_name_columns"],
command["timestamp_field"],
utils.make_tzaware(datetime.fromisoformat(command["start_date"])),
utils.make_tzaware(datetime.fromisoformat(command["end_date"])),
config=self.store.config,
data_source=data_source,
join_key_columns=command["join_key_columns"],
feature_name_columns=command["feature_name_columns"],
timestamp_field=command["timestamp_field"],
start_date=utils.make_tzaware(
datetime.fromisoformat(command["start_date"])
),
end_date=utils.make_tzaware(datetime.fromisoformat(command["end_date"])),
)

def _validate_pull_latest_from_table_or_query_parameters(self, command: dict):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def tqdm_builder(length):
task = MaterializationTask(
project=spark_environment.project,
feature_view=driver_stats_fv,
start_time=now - timedelta(days=1),
start_time=now - timedelta(days=2),
end_time=now,
tqdm_builder=tqdm_builder,
)
Expand Down
Loading