Skip to content

Commit 6a04c48

Browse files
authored
feat: Add s3 remote storage export for duckdb (feast-dev#4195)
add s3 remote export, tests for duckdb Signed-off-by: tokoko <togurg14@freeuni.edu.ge>
1 parent a17725d commit 6a04c48

10 files changed

Lines changed: 191 additions & 26 deletions

File tree

sdk/python/feast/infra/offline_stores/duckdb.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ def _read_data_source(data_source: DataSource) -> Table:
3333
if isinstance(data_source.file_format, ParquetFormat):
3434
return ibis.read_parquet(data_source.path)
3535
elif isinstance(data_source.file_format, DeltaFormat):
36-
return ibis.read_delta(data_source.path)
36+
storage_options = {
37+
"AWS_ENDPOINT_URL": data_source.s3_endpoint_override,
38+
}
39+
40+
return ibis.read_delta(data_source.path, storage_options=storage_options)
3741

3842

3943
def _write_data_source(
@@ -72,10 +76,18 @@ def _write_data_source(
7276
new_table = pyarrow.concat_tables([table, prev_table])
7377
ibis.memtable(new_table).to_parquet(file_options.uri)
7478
elif isinstance(data_source.file_format, DeltaFormat):
79+
storage_options = {
80+
"AWS_ENDPOINT_URL": str(data_source.s3_endpoint_override),
81+
}
82+
7583
if mode == "append":
7684
from deltalake import DeltaTable
7785

78-
prev_schema = DeltaTable(file_options.uri).schema().to_pyarrow()
86+
prev_schema = (
87+
DeltaTable(file_options.uri, storage_options=storage_options)
88+
.schema()
89+
.to_pyarrow()
90+
)
7991
table = table.cast(ibis.Schema.from_pyarrow(prev_schema))
8092
write_mode = "append"
8193
elif mode == "overwrite":
@@ -85,13 +97,19 @@ def _write_data_source(
8597
else "error"
8698
)
8799

88-
table.to_delta(file_options.uri, mode=write_mode)
100+
table.to_delta(
101+
file_options.uri, mode=write_mode, storage_options=storage_options
102+
)
89103

90104

91105
class DuckDBOfflineStoreConfig(FeastConfigBaseModel):
92106
type: StrictStr = "duckdb"
93107
# """ Offline store type selector"""
94108

109+
staging_location: Optional[str] = None
110+
111+
staging_location_endpoint_override: Optional[str] = None
112+
95113

96114
class DuckDBOfflineStore(OfflineStore):
97115
@staticmethod
@@ -116,6 +134,8 @@ def pull_latest_from_table_or_query(
116134
end_date=end_date,
117135
data_source_reader=_read_data_source,
118136
data_source_writer=_write_data_source,
137+
staging_location=config.offline_store.staging_location,
138+
staging_location_endpoint_override=config.offline_store.staging_location_endpoint_override,
119139
)
120140

121141
@staticmethod
@@ -138,6 +158,8 @@ def get_historical_features(
138158
full_feature_names=full_feature_names,
139159
data_source_reader=_read_data_source,
140160
data_source_writer=_write_data_source,
161+
staging_location=config.offline_store.staging_location,
162+
staging_location_endpoint_override=config.offline_store.staging_location_endpoint_override,
141163
)
142164

143165
@staticmethod
@@ -160,6 +182,8 @@ def pull_all_from_table_or_query(
160182
end_date=end_date,
161183
data_source_reader=_read_data_source,
162184
data_source_writer=_write_data_source,
185+
staging_location=config.offline_store.staging_location,
186+
staging_location_endpoint_override=config.offline_store.staging_location_endpoint_override,
163187
)
164188

165189
@staticmethod

sdk/python/feast/infra/offline_stores/file_source.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,15 @@ def get_table_column_names_and_types(
179179
elif isinstance(self.file_format, DeltaFormat):
180180
from deltalake import DeltaTable
181181

182-
schema = DeltaTable(self.path).schema().to_pyarrow()
182+
storage_options = {
183+
"AWS_ENDPOINT_URL": str(self.s3_endpoint_override),
184+
}
185+
186+
schema = (
187+
DeltaTable(self.path, storage_options=storage_options)
188+
.schema()
189+
.to_pyarrow()
190+
)
183191
else:
184192
raise Exception(f"Unknown FileFormat -> {self.file_format}")
185193

sdk/python/feast/infra/offline_stores/ibis.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ def pull_latest_from_table_or_query_ibis(
4747
end_date: datetime,
4848
data_source_reader: Callable[[DataSource], Table],
4949
data_source_writer: Callable[[pyarrow.Table, DataSource], None],
50+
staging_location: Optional[str] = None,
51+
staging_location_endpoint_override: Optional[str] = None,
5052
) -> RetrievalJob:
5153
fields = join_key_columns + feature_name_columns + [timestamp_field]
5254
if created_timestamp_column:
@@ -82,6 +84,8 @@ def pull_latest_from_table_or_query_ibis(
8284
full_feature_names=False,
8385
metadata=None,
8486
data_source_writer=data_source_writer,
87+
staging_location=staging_location,
88+
staging_location_endpoint_override=staging_location_endpoint_override,
8589
)
8690

8791

@@ -140,6 +144,8 @@ def get_historical_features_ibis(
140144
data_source_reader: Callable[[DataSource], Table],
141145
data_source_writer: Callable[[pyarrow.Table, DataSource], None],
142146
full_feature_names: bool = False,
147+
staging_location: Optional[str] = None,
148+
staging_location_endpoint_override: Optional[str] = None,
143149
) -> RetrievalJob:
144150
entity_schema = _get_entity_schema(
145151
entity_df=entity_df,
@@ -231,6 +237,8 @@ def read_fv(
231237
max_event_timestamp=timestamp_range[1],
232238
),
233239
data_source_writer=data_source_writer,
240+
staging_location=staging_location,
241+
staging_location_endpoint_override=staging_location_endpoint_override,
234242
)
235243

236244

@@ -244,6 +252,8 @@ def pull_all_from_table_or_query_ibis(
244252
end_date: datetime,
245253
data_source_reader: Callable[[DataSource], Table],
246254
data_source_writer: Callable[[pyarrow.Table, DataSource], None],
255+
staging_location: Optional[str] = None,
256+
staging_location_endpoint_override: Optional[str] = None,
247257
) -> RetrievalJob:
248258
fields = join_key_columns + feature_name_columns + [timestamp_field]
249259
start_date = start_date.astimezone(tz=utc)
@@ -270,6 +280,8 @@ def pull_all_from_table_or_query_ibis(
270280
full_feature_names=False,
271281
metadata=None,
272282
data_source_writer=data_source_writer,
283+
staging_location=staging_location,
284+
staging_location_endpoint_override=staging_location_endpoint_override,
273285
)
274286

275287

@@ -411,6 +423,23 @@ def point_in_time_join(
411423
return acc_table
412424

413425

426+
def list_s3_files(path: str, endpoint_url: str) -> List[str]:
427+
import boto3
428+
429+
s3 = boto3.client("s3", endpoint_url=endpoint_url)
430+
if path.startswith("s3://"):
431+
path = path[len("s3://") :]
432+
bucket, prefix = path.split("/", 1)
433+
objects = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
434+
contents = objects["Contents"]
435+
files = [
436+
f"s3://{bucket}/{content['Key']}"
437+
for content in contents
438+
if content["Key"].endswith("parquet")
439+
]
440+
return files
441+
442+
414443
class IbisRetrievalJob(RetrievalJob):
415444
def __init__(
416445
self,
@@ -419,6 +448,8 @@ def __init__(
419448
full_feature_names,
420449
metadata,
421450
data_source_writer,
451+
staging_location,
452+
staging_location_endpoint_override,
422453
) -> None:
423454
super().__init__()
424455
self.table = table
@@ -428,6 +459,8 @@ def __init__(
428459
self._full_feature_names = full_feature_names
429460
self._metadata = metadata
430461
self.data_source_writer = data_source_writer
462+
self.staging_location = staging_location
463+
self.staging_location_endpoint_override = staging_location_endpoint_override
431464

432465
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
433466
return self.table.execute()
@@ -456,3 +489,15 @@ def persist(
456489
@property
457490
def metadata(self) -> Optional[RetrievalMetadata]:
458491
return self._metadata
492+
493+
def supports_remote_storage_export(self) -> bool:
494+
return self.staging_location is not None
495+
496+
def to_remote_storage(self) -> List[str]:
497+
path = self.staging_location + f"/{str(uuid.uuid4())}"
498+
499+
storage_options = {"AWS_ENDPOINT_URL": self.staging_location_endpoint_override}
500+
501+
self.table.to_delta(path, storage_options=storage_options)
502+
503+
return list_s3_files(path, self.staging_location_endpoint_override)

sdk/python/pytest.ini

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,10 @@ markers =
55

66
env =
77
FEAST_USAGE=False
8-
IS_TEST=True
8+
IS_TEST=True
9+
10+
filterwarnings =
11+
ignore::DeprecationWarning:pyspark.sql.pandas.*:
12+
ignore::DeprecationWarning:pyspark.sql.connect.*:
13+
ignore::DeprecationWarning:httpx.*:
14+
ignore::FutureWarning:ibis_substrait.compiler.*:

sdk/python/tests/conftest.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
# limitations under the License.
1414
import logging
1515
import multiprocessing
16+
import os
1617
import random
1718
from datetime import datetime, timedelta
1819
from multiprocessing import Process
1920
from sys import platform
2021
from typing import Any, Dict, List, Tuple, no_type_check
22+
from unittest import mock
2123

2224
import pandas as pd
2325
import pytest
@@ -180,7 +182,11 @@ def environment(request, worker_id):
180182
request.param, worker_id=worker_id, fixture_request=request
181183
)
182184

183-
yield e
185+
if hasattr(e.data_source_creator, "mock_environ"):
186+
with mock.patch.dict(os.environ, e.data_source_creator.mock_environ):
187+
yield e
188+
else:
189+
yield e
184190

185191
e.feature_store.teardown()
186192
e.data_source_creator.teardown()

sdk/python/tests/integration/feature_repos/repo_configuration.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from tests.integration.feature_repos.universal.data_sources.file import (
3434
DuckDBDataSourceCreator,
3535
DuckDBDeltaDataSourceCreator,
36+
DuckDBDeltaS3DataSourceCreator,
3637
FileDataSourceCreator,
3738
)
3839
from tests.integration.feature_repos.universal.data_sources.redshift import (
@@ -122,6 +123,14 @@
122123
("local", DuckDBDeltaDataSourceCreator),
123124
]
124125

126+
if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True":
127+
AVAILABLE_OFFLINE_STORES.extend(
128+
[
129+
("local", DuckDBDeltaS3DataSourceCreator),
130+
]
131+
)
132+
133+
125134
AVAILABLE_ONLINE_STORES: Dict[
126135
str, Tuple[Union[str, Dict[Any, Any]], Optional[Type[OnlineStoreCreator]]]
127136
] = {

sdk/python/tests/integration/feature_repos/universal/data_sources/file.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from minio import Minio
1111
from testcontainers.core.generic import DockerContainer
1212
from testcontainers.core.waiting_utils import wait_for_logs
13+
from testcontainers.minio import MinioContainer
1314

1415
from feast import FileSource
1516
from feast.data_format import DeltaFormat, ParquetFormat
@@ -134,6 +135,74 @@ def create_logged_features_destination(self) -> LoggingDestination:
134135
return FileLoggingDestination(path=d)
135136

136137

138+
class DeltaS3FileSourceCreator(FileDataSourceCreator):
139+
def __init__(self, project_name: str, *args, **kwargs):
140+
super().__init__(project_name)
141+
self.minio = MinioContainer()
142+
self.minio.start()
143+
client = self.minio.get_client()
144+
client.make_bucket("test")
145+
host_ip = self.minio.get_container_host_ip()
146+
exposed_port = self.minio.get_exposed_port(self.minio.port)
147+
self.endpoint_url = f"http://{host_ip}:{exposed_port}"
148+
149+
self.mock_environ = {
150+
"AWS_ACCESS_KEY_ID": self.minio.access_key,
151+
"AWS_SECRET_ACCESS_KEY": self.minio.secret_key,
152+
"AWS_EC2_METADATA_DISABLED": "true",
153+
"AWS_REGION": "us-east-1",
154+
"AWS_ALLOW_HTTP": "true",
155+
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
156+
}
157+
158+
def create_data_source(
159+
self,
160+
df: pd.DataFrame,
161+
destination_name: str,
162+
created_timestamp_column="created_ts",
163+
field_mapping: Optional[Dict[str, str]] = None,
164+
timestamp_field: Optional[str] = "ts",
165+
) -> DataSource:
166+
from deltalake.writer import write_deltalake
167+
168+
destination_name = self.get_prefixed_table_name(destination_name)
169+
170+
storage_options = {
171+
"AWS_ACCESS_KEY_ID": self.minio.access_key,
172+
"AWS_SECRET_ACCESS_KEY": self.minio.secret_key,
173+
"AWS_ENDPOINT_URL": self.endpoint_url,
174+
}
175+
176+
path = f"s3://test/{str(uuid.uuid4())}/{destination_name}"
177+
178+
write_deltalake(path, df, storage_options=storage_options)
179+
180+
return FileSource(
181+
file_format=DeltaFormat(),
182+
path=path,
183+
timestamp_field=timestamp_field,
184+
created_timestamp_column=created_timestamp_column,
185+
field_mapping=field_mapping or {"ts_1": "ts"},
186+
s3_endpoint_override=self.endpoint_url,
187+
)
188+
189+
def create_saved_dataset_destination(self) -> SavedDatasetFileStorage:
190+
return SavedDatasetFileStorage(
191+
path=f"s3://test/{str(uuid.uuid4())}",
192+
file_format=DeltaFormat(),
193+
s3_endpoint_override=self.endpoint_url,
194+
)
195+
196+
# LoggingDestination is parquet-only
197+
def create_logged_features_destination(self) -> LoggingDestination:
198+
d = tempfile.mkdtemp(prefix=self.project_name)
199+
self.keep.append(d)
200+
return FileLoggingDestination(path=d)
201+
202+
def teardown(self):
203+
self.minio.stop()
204+
205+
137206
class FileParquetDatasetSourceCreator(FileDataSourceCreator):
138207
def create_data_source(
139208
self,
@@ -273,3 +342,12 @@ class DuckDBDeltaDataSourceCreator(DeltaFileSourceCreator):
273342
def create_offline_store_config(self):
274343
self.duckdb_offline_store_config = DuckDBOfflineStoreConfig()
275344
return self.duckdb_offline_store_config
345+
346+
347+
class DuckDBDeltaS3DataSourceCreator(DeltaS3FileSourceCreator):
348+
def create_offline_store_config(self):
349+
self.duckdb_offline_store_config = DuckDBOfflineStoreConfig(
350+
staging_location="s3://test/staging",
351+
staging_location_endpoint_override=self.endpoint_url,
352+
)
353+
return self.duckdb_offline_store_config

sdk/python/tests/integration/materialization/contrib/spark/test_spark.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def test_spark_materialization_consistency():
3131
batch_engine={"type": "spark.engine", "partitions": 10},
3232
)
3333
spark_environment = construct_test_environment(
34-
spark_config, None, entity_key_serialization_version=1
34+
spark_config, None, entity_key_serialization_version=2
3535
)
3636

3737
df = create_basic_driver_dataset()

sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,7 @@ def test_historical_features(
139139

140140
if job_from_df.supports_remote_storage_export():
141141
files = job_from_df.to_remote_storage()
142-
print(files)
143-
assert len(files) > 0 # This test should be way more detailed
142+
assert len(files) # 0 # This test should be way more detailed
144143

145144
start_time = datetime.utcnow()
146145
actual_df_from_df_entities = job_from_df.to_df()

0 commit comments

Comments
 (0)