Skip to content

Commit 78fcd72

Browse files
Jwrederpathade
authored andcommitted
fix(trino): Clean up temporary entity tables after retrieval (feast-dev#6381)
* fix(trino): Clean up temporary entity tables after retrieval TrinoOfflineStore.get_historical_features() creates a temporary table for the entity DataFrame but never drops it, leaking tables indefinitely. Apply the same context manager pattern used by BigQuery, Redshift, and Athena offline stores: wrap the query in a generator that issues DROP TABLE IF EXISTS in a finally block. Fixes feast-dev#6306 Signed-off-by: Jonathan Wrede <wrede.jonathan00@gmail.com> * fix: sort imports for ruff compliance Signed-off-by: Jonathan Wrede <wrede.jonathan00@gmail.com> * fix: decouple temp table cleanup from query access Avoid dropping the temporary entity table on to_sql() calls. Previously, every method used a context manager that dropped the table on exit, so calling to_sql() before to_df() would destroy the table and cause subsequent queries to fail. Now the query is stored as a plain string and cleanup is handled by a dedicated _drop_temp_table() method called only after query execution (to_df, to_trino). A __del__ fallback ensures cleanup if execution methods are never called. The _cleaned_up flag makes the drop idempotent. Signed-off-by: Jonathan Wrede <wrede.jonathan00@gmail.com> --------- Signed-off-by: Jonathan Wrede <wrede.jonathan00@gmail.com> Signed-off-by: RutujaPathade <73137503+RutujaPathade@users.noreply.github.com>
1 parent de050ab commit 78fcd72

1 file changed

Lines changed: 45 additions & 10 deletions

File tree

  • sdk/python/feast/infra/offline_stores/contrib/trino_offline_store

sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
1+
import logging
12
import uuid
23
from datetime import date, datetime
3-
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
4+
from typing import (
5+
Any,
6+
Dict,
7+
List,
8+
Literal,
9+
Optional,
10+
Tuple,
11+
Union,
12+
)
413

514
import numpy as np
615
import pandas as pd
@@ -37,6 +46,8 @@
3746
from feast.repo_config import FeastConfigBaseModel, RepoConfig
3847
from feast.saved_dataset import SavedDatasetStorage
3948

49+
logger = logging.getLogger(__name__)
50+
4051

4152
class BasicAuthModel(FeastConfigBaseModel):
4253
username: StrictStr
@@ -183,13 +194,16 @@ def __init__(
183194
full_feature_names: bool,
184195
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
185196
metadata: Optional[RetrievalMetadata] = None,
197+
temp_table: Optional[str] = None,
186198
):
187199
self._query = query
188200
self._client = client
189201
self._config = config
190202
self._full_feature_names = full_feature_names
191203
self._on_demand_feature_views = on_demand_feature_views or []
192204
self._metadata = metadata
205+
self._temp_table = temp_table
206+
self._cleaned_up = False
193207

194208
@property
195209
def full_feature_names(self) -> bool:
@@ -199,11 +213,29 @@ def full_feature_names(self) -> bool:
199213
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
200214
return self._on_demand_feature_views
201215

216+
def _drop_temp_table(self) -> None:
217+
if self._cleaned_up or not self._temp_table:
218+
return
219+
self._cleaned_up = True
220+
try:
221+
self._client.execute_query(f"DROP TABLE IF EXISTS {self._temp_table}")
222+
except Exception:
223+
logger.exception(
224+
"Failed to drop temporary entity table %s",
225+
self._temp_table,
226+
)
227+
228+
def __del__(self) -> None:
229+
self._drop_temp_table()
230+
202231
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
203232
"""Return dataset as Pandas DataFrame synchronously including on demand transforms"""
204-
results = self._client.execute_query(query_text=self._query)
205-
self.pyarrow_schema = results.pyarrow_schema
206-
return results.to_dataframe()
233+
try:
234+
results = self._client.execute_query(query_text=self._query)
235+
self.pyarrow_schema = results.pyarrow_schema
236+
return results.to_dataframe()
237+
finally:
238+
self._drop_temp_table()
207239

208240
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
209241
"""Return payrrow dataset as synchronously including on demand transforms"""
@@ -234,8 +266,11 @@ def to_trino(
234266
destination_table = f"{self._client.catalog}.{self._config.offline_store.dataset}.historical_{today}_{rand_id}"
235267

236268
# TODO: Implement the timeout logic
237-
query = f"CREATE TABLE {destination_table} AS ({self._query})"
238-
self._client.execute_query(query_text=query)
269+
try:
270+
create_query = f"CREATE TABLE {destination_table} AS ({self._query})"
271+
self._client.execute_query(query_text=create_query)
272+
finally:
273+
self._drop_temp_table()
239274
return destination_table
240275

241276
def persist(
@@ -372,11 +407,12 @@ def get_historical_features(
372407
)
373408

374409
# Generate the Trino SQL query from the query context
410+
entity_table_ref = table_reference
375411
if type(entity_df) is str:
376-
table_reference = f"({entity_df})"
412+
entity_table_ref = f"({entity_df})"
377413
query = offline_utils.build_point_in_time_query(
378414
query_context,
379-
left_table_query_string=table_reference,
415+
left_table_query_string=entity_table_ref,
380416
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
381417
entity_df_columns=entity_schema.keys(),
382418
query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN,
@@ -385,6 +421,7 @@ def get_historical_features(
385421

386422
return TrinoRetrievalJob(
387423
query=query,
424+
temp_table=table_reference if isinstance(entity_df, pd.DataFrame) else None,
388425
client=client,
389426
config=config,
390427
full_feature_names=full_feature_names,
@@ -483,8 +520,6 @@ def _upload_entity_df_and_get_entity_schema(
483520
else:
484521
raise InvalidEntityType(type(entity_df))
485522

486-
# TODO: Ensure that the table expires after some time
487-
488523

489524
def _get_trino_client(config: RepoConfig) -> Trino:
490525
auth = None

0 commit comments

Comments
 (0)