Skip to content
Merged
Prev Previous commit
Next Next commit
update
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
  • Loading branch information
HaoXuAI committed Apr 18, 2025
commit e92d01549f60e64691881ea922e20acef7ac8ce1
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/compute_engines/feature_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ def _should_validate(self):
def build(self) -> ExecutionPlan:
last_node = self.build_source_node()

# PIT join entities to the feature data, and perform filtering
if isinstance(self.task, HistoricalRetrievalTask):
last_node = self.build_join_node(last_node)
# Join entity_df with source if needed
last_node = self.build_join_node(last_node)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without entity_df provided, the join node is just a pass through node


# PIT filter, TTL, and user-defined filter
last_node = self.build_filter_node(last_node)

if self._should_aggregate():
Expand Down
23 changes: 0 additions & 23 deletions sdk/python/feast/infra/compute_engines/local/feature_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from feast.infra.common.materialization_job import MaterializationTask
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
from feast.infra.compute_engines.dag.plan import ExecutionPlan
from feast.infra.compute_engines.feature_builder import FeatureBuilder
from feast.infra.compute_engines.local.backends.base import DataFrameBackend
from feast.infra.compute_engines.local.nodes import (
Expand Down Expand Up @@ -95,25 +94,3 @@ def build_output_nodes(self, input_node):
node = LocalOutputNode("output")
node.add_input(input_node)
self.nodes.append(node)

def build(self) -> ExecutionPlan:
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove it as it is the same with the Parent build

last_node = self.build_source_node()

if isinstance(self.task, HistoricalRetrievalTask):
last_node = self.build_join_node(last_node)

last_node = self.build_filter_node(last_node)

if self._should_aggregate():
last_node = self.build_aggregation_node(last_node)
elif isinstance(self.task, HistoricalRetrievalTask):
last_node = self.build_dedup_node(last_node)

if self._should_transform():
last_node = self.build_transformation_node(last_node)

if self._should_validate():
last_node = self.build_validation_node(last_node)

self.build_output_nodes(last_node)
return ExecutionPlan(self.nodes)
7 changes: 6 additions & 1 deletion sdk/python/feast/infra/compute_engines/spark/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,12 @@ def execute(self, context: ExecutionContext) -> DAGValue:
feature_df: DataFrame = feature_value.data

entity_df = context.entity_df
assert entity_df is not None, "entity_df must be set in ExecutionContext"
if not entity_df:
return DAGValue(
data=feature_df,
format=DAGFormat.SPARK,
metadata={"joined_on": None},
)

# Get timestamp fields from feature view
join_keys, feature_cols, ts_col, created_ts_col = context.column_info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,12 @@ def pull_all_from_table_or_query(
join_key_columns + feature_name_columns + [timestamp_field]
)

start_date_str = None
if start_date:
start_date_str = f"'{start_date}'::timestamptz"
end_date_str = None
if end_date:
end_date_str = f"'{end_date}'::timestamptz"
timestamp_filter = get_timestamp_filter_sql(
start_date_str, end_date_str, timestamp_field, tz=timezone.utc
start_date,
end_date,
timestamp_field,
tz=timezone.utc,
cast_style="timestamptz",
)

query = f"""
Expand Down
67 changes: 40 additions & 27 deletions sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, KeysView, List, Optional, Set, Tuple, Union
from typing import Any, Dict, KeysView, List, Literal, Optional, Set, Tuple, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -274,52 +274,65 @@ def get_timestamp_filter_sql(
timestamp_field: Optional[str] = DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
date_partition_column: Optional[str] = None,
tz: Optional[timezone] = None,
cast_style: Literal["timestamp_func", "timestamptz", "raw"] = "timestamp_func",
) -> str:
"""
Returns a SQL WHERE clause with timestamp filter and optional date partition pruning.

- Datetime inputs are converted using .isoformat() or .strftime('%Y-%m-%d')
- String inputs are passed through as-is (assumed to be preformatted)
- Uses TIMESTAMP(...) for timestamp_field
- Adds 'AND ...' conditions for partition column if provided

Example:
WHERE event_timestamp BETWEEN TIMESTAMP('...') AND TIMESTAMP('...')
AND ds >= '2023-04-01' AND ds <= '2023-04-05'
Returns SQL filter condition (no WHERE) with flexible timestamp casting.

Args:
start_date: datetime or ISO8601 strings
end_date: datetime or ISO8601 strings
timestamp_field: main timestamp column
date_partition_column: optional partition column (for pruning)
tz: optional timezone for datetime inputs
cast_style: one of:
- "timestamp_func": TIMESTAMP('...') → Snowflake, BigQuery, Athena
- "timestamptz": '...'::timestamptz → PostgreSQL
- "raw": '...' → no cast, string only

Returns:
SQL filter string without WHERE
"""

def format_timestamp(val: Union[str, datetime]) -> str:
def format_casted_ts(val: Union[str, datetime]) -> str:
if isinstance(val, datetime):
if tz:
val = val.astimezone(tz)
val = val.isoformat()
return f"TIMESTAMP('{val}')"
val_str = val.isoformat()
else:
val_str = val

if cast_style == "timestamp_func":
return f"TIMESTAMP('{val_str}')"
elif cast_style == "timestamptz":
return f"'{val_str}'::timestamptz"
else: # raw
return f"'{val_str}'"

def format_date(val: Union[str, datetime]) -> str:
if isinstance(val, datetime):
if tz:
val = val.astimezone(tz)
return val.strftime("%Y-%m-%d")
return val # assume already formatted like 'YYYY-MM-DD'
return val

filters = []

# Timestamp filtering
ts_start = format_timestamp(start_date) if start_date else None
ts_end = format_timestamp(end_date) if end_date else None

if ts_start and ts_end:
filters.append(f"{timestamp_field} BETWEEN {ts_start} AND {ts_end}")
elif ts_start:
filters.append(f"{timestamp_field} >= {ts_start}")
elif ts_end:
filters.append(f"{timestamp_field} <= {ts_end}")
# Timestamp filters
if start_date and end_date:
filters.append(
f"{timestamp_field} BETWEEN {format_casted_ts(start_date)} AND {format_casted_ts(end_date)}"
)
elif start_date:
filters.append(f"{timestamp_field} >= {format_casted_ts(start_date)}")
elif end_date:
filters.append(f"{timestamp_field} <= {format_casted_ts(end_date)}")

# Date partition pruning
# Partition pruning
if date_partition_column:
if start_date:
filters.append(f"{date_partition_column} >= '{format_date(start_date)}'")
if end_date:
filters.append(f"{date_partition_column} <= '{format_date(end_date)}'")

return "WHERE " + " AND ".join(filters) if filters else ""
return " AND ".join(filters) if filters else ""
Loading