Skip to content

Commit 6ca3b47

Browse files
committed
add init and cleanup of long lived resources
Signed-off-by: Rob Howley <howley.robert@gmail.com>
1 parent ca9fb9b commit 6ca3b47

File tree

37 files changed

+507
-608
lines changed

37 files changed

+507
-608
lines changed

docs/getting-started/concepts/permission.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ The permission model is based on the following components:
3636
The `Permission` class identifies a single permission configured on the feature store and is identified by these attributes:
3737
- `name`: The permission name.
3838
- `types`: The list of protected resource types. Defaults to all managed types, e.g. the `ALL_RESOURCE_TYPES` alias. All sub-classes are included in the resource match.
39-
- `name_pattern`: A regex to match the resource name. Defaults to `None`, meaning that no name filtering is applied
39+
- `name_patterns`: A list of regex patterns to match resource names. If any regex matches, the `Permission` policy is applied. Defaults to `[]`, meaning no name filtering is applied.
4040
- `required_tags`: Dictionary of key-value pairs that must match the resource tags. Defaults to `None`, meaning that no tags filtering is applied.
4141
- `actions`: The actions authorized by this permission. Defaults to `ALL_VALUES`, an alias defined in the `action` module.
4242
- `policy`: The policy to be applied to validate a client request.
@@ -95,7 +95,7 @@ The following permission grants authorization to read the offline store of all t
9595
Permission(
9696
name="reader",
9797
types=[FeatureView],
98-
name_pattern=".*risky.*",
98+
name_patterns=".*risky.*", # Accepts both `str` or `list[str]` types
9999
policy=RoleBasedPolicy(roles=["trusted"]),
100100
actions=[AuthzedAction.READ_OFFLINE],
101101
)

docs/reference/feast-cli-commands.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,10 @@ Options:
172172

173173
```text
174174
+-----------------------+-------------+-----------------------+-----------+----------------+-------------------------+
175-
| NAME | TYPES | NAME_PATTERN | ACTIONS | ROLES | REQUIRED_TAGS |
175+
| NAME | TYPES | NAME_PATTERNS | ACTIONS | ROLES | REQUIRED_TAGS |
176176
+=======================+=============+=======================+===========+================+================+========+
177177
| reader_permission1234 | FeatureView | transformed_conv_rate | DESCRIBE | reader | - |
178+
| | | driver_hourly_stats | DESCRIBE | reader | - |
178179
+-----------------------+-------------+-----------------------+-----------+----------------+-------------------------+
179180
| writer_permission1234 | FeatureView | transformed_conv_rate | CREATE | writer | - |
180181
+-----------------------+-------------+-----------------------+-----------+----------------+-------------------------+

java/serving/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@
287287
<dependency>
288288
<groupId>org.apache.avro</groupId>
289289
<artifactId>avro</artifactId>
290-
<version>1.11.3</version>
290+
<version>1.11.4</version>
291291
</dependency>
292292

293293
<!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-java-root -->

protos/feast/core/Permission.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ message PermissionSpec {
5050

5151
repeated Type types = 3;
5252

53-
string name_pattern = 4;
53+
repeated string name_patterns = 4;
5454

5555
map<string, string> required_tags = 5;
5656

sdk/python/feast/cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1211,7 +1211,7 @@ def feast_permissions_list_command(ctx: click.Context, verbose: bool, tags: list
12111211
headers=[
12121212
"NAME",
12131213
"TYPES",
1214-
"NAME_PATTERN",
1214+
"NAME_PATTERNS",
12151215
"ACTIONS",
12161216
"ROLES",
12171217
"REQUIRED_TAGS",

sdk/python/feast/cli_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ def handle_not_verbose_permissions_command(
196196
[
197197
p.name,
198198
_to_multi_line([t.__name__ for t in p.types]), # type: ignore[union-attr, attr-defined]
199-
p.name_pattern,
199+
_to_multi_line(p.name_patterns),
200200
_to_multi_line([a.value.upper() for a in p.actions]),
201201
_to_multi_line(sorted(roles)),
202202
_dict_to_multi_line(p.required_tags),

sdk/python/feast/feature_server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,10 @@ def async_refresh():
100100
@asynccontextmanager
101101
async def lifespan(app: FastAPI):
102102
async_refresh()
103+
await store.initialize()
103104
yield
104105
stop_refresh()
106+
await store.close()
105107

106108
app = FastAPI(lifespan=lifespan)
107109

sdk/python/feast/feature_store.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -713,7 +713,7 @@ def plan(
713713
>>> fs = FeatureStore(repo_path="project/feature_repo")
714714
>>> driver = Entity(name="driver_id", description="driver id")
715715
>>> driver_hourly_stats = FileSource(
716-
... path="project/feature_repo/data/driver_stats.parquet",
716+
... path="data/driver_stats.parquet",
717717
... timestamp_field="event_timestamp",
718718
... created_timestamp_column="created",
719719
... )
@@ -827,7 +827,7 @@ def apply(
827827
>>> fs = FeatureStore(repo_path="project/feature_repo")
828828
>>> driver = Entity(name="driver_id", description="driver id")
829829
>>> driver_hourly_stats = FileSource(
830-
... path="project/feature_repo/data/driver_stats.parquet",
830+
... path="data/driver_stats.parquet",
831831
... timestamp_field="event_timestamp",
832832
... created_timestamp_column="created",
833833
... )
@@ -2078,6 +2078,14 @@ def list_saved_datasets(
20782078
self.project, allow_cache=allow_cache, tags=tags
20792079
)
20802080

2081+
async def initialize(self) -> None:
2082+
"""Initialize long-lived clients and/or resources needed for accessing datastores"""
2083+
await self._get_provider().initialize(self.config)
2084+
2085+
async def close(self) -> None:
2086+
"""Cleanup any long-lived clients and/or resources"""
2087+
await self._get_provider().close()
2088+
20812089

20822090
def _print_materialization_log(
20832091
start_date, end_date, num_feature_views: int, online_store: str

sdk/python/feast/infra/offline_stores/dask.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def __init__(
5757
self,
5858
evaluation_function: Callable,
5959
full_feature_names: bool,
60+
repo_path: str,
6061
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
6162
metadata: Optional[RetrievalMetadata] = None,
6263
):
@@ -67,6 +68,7 @@ def __init__(
6768
self._full_feature_names = full_feature_names
6869
self._on_demand_feature_views = on_demand_feature_views or []
6970
self._metadata = metadata
71+
self.repo_path = repo_path
7072

7173
@property
7274
def full_feature_names(self) -> bool:
@@ -99,8 +101,13 @@ def persist(
99101
if not allow_overwrite and os.path.exists(storage.file_options.uri):
100102
raise SavedDatasetLocationAlreadyExists(location=storage.file_options.uri)
101103

104+
if not Path(storage.file_options.uri).is_absolute():
105+
absolute_path = Path(self.repo_path) / storage.file_options.uri
106+
else:
107+
absolute_path = Path(storage.file_options.uri)
108+
102109
filesystem, path = FileSource.create_filesystem_and_path(
103-
storage.file_options.uri,
110+
str(absolute_path),
104111
storage.file_options.s3_endpoint_override,
105112
)
106113

@@ -243,7 +250,9 @@ def evaluate_historical_retrieval():
243250

244251
all_join_keys = list(set(all_join_keys + join_keys))
245252

246-
df_to_join = _read_datasource(feature_view.batch_source)
253+
df_to_join = _read_datasource(
254+
feature_view.batch_source, config.repo_path
255+
)
247256

248257
df_to_join, timestamp_field = _field_mapping(
249258
df_to_join,
@@ -297,6 +306,7 @@ def evaluate_historical_retrieval():
297306
min_event_timestamp=entity_df_event_timestamp_range[0],
298307
max_event_timestamp=entity_df_event_timestamp_range[1],
299308
),
309+
repo_path=str(config.repo_path),
300310
)
301311
return job
302312

@@ -316,7 +326,7 @@ def pull_latest_from_table_or_query(
316326

317327
# Create lazy function that is only called from the RetrievalJob object
318328
def evaluate_offline_job():
319-
source_df = _read_datasource(data_source)
329+
source_df = _read_datasource(data_source, config.repo_path)
320330

321331
source_df = _normalize_timestamp(
322332
source_df, timestamp_field, created_timestamp_column
@@ -377,6 +387,7 @@ def evaluate_offline_job():
377387
return DaskRetrievalJob(
378388
evaluation_function=evaluate_offline_job,
379389
full_feature_names=False,
390+
repo_path=str(config.repo_path),
380391
)
381392

382393
@staticmethod
@@ -420,8 +431,13 @@ def write_logged_features(
420431
# Since this code will be mostly used from Go-created thread, it's better to avoid producing new threads
421432
data = pyarrow.parquet.read_table(data, use_threads=False, pre_buffer=False)
422433

434+
if config.repo_path is not None and not Path(destination.path).is_absolute():
435+
absolute_path = config.repo_path / destination.path
436+
else:
437+
absolute_path = Path(destination.path)
438+
423439
filesystem, path = FileSource.create_filesystem_and_path(
424-
destination.path,
440+
str(absolute_path),
425441
destination.s3_endpoint_override,
426442
)
427443

@@ -456,8 +472,14 @@ def offline_write_batch(
456472
)
457473

458474
file_options = feature_view.batch_source.file_options
475+
476+
if config.repo_path is not None and not Path(file_options.uri).is_absolute():
477+
absolute_path = config.repo_path / file_options.uri
478+
else:
479+
absolute_path = Path(file_options.uri)
480+
459481
filesystem, path = FileSource.create_filesystem_and_path(
460-
file_options.uri, file_options.s3_endpoint_override
482+
str(absolute_path), file_options.s3_endpoint_override
461483
)
462484
prev_table = pyarrow.parquet.read_table(
463485
path, filesystem=filesystem, memory_map=True
@@ -493,7 +515,7 @@ def _get_entity_df_event_timestamp_range(
493515
)
494516

495517

496-
def _read_datasource(data_source) -> dd.DataFrame:
518+
def _read_datasource(data_source, repo_path) -> dd.DataFrame:
497519
storage_options = (
498520
{
499521
"client_kwargs": {
@@ -504,8 +526,12 @@ def _read_datasource(data_source) -> dd.DataFrame:
504526
else None
505527
)
506528

529+
if not Path(data_source.path).is_absolute():
530+
path = repo_path / data_source.path
531+
else:
532+
path = data_source.path
507533
return dd.read_parquet(
508-
data_source.path,
534+
path,
509535
storage_options=storage_options,
510536
)
511537

sdk/python/feast/infra/offline_stores/duckdb.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from feast.repo_config import FeastConfigBaseModel, RepoConfig
2828

2929

30-
def _read_data_source(data_source: DataSource) -> Table:
30+
def _read_data_source(data_source: DataSource, repo_path: str) -> Table:
3131
assert isinstance(data_source, FileSource)
3232

3333
if isinstance(data_source.file_format, ParquetFormat):
@@ -43,21 +43,32 @@ def _read_data_source(data_source: DataSource) -> Table:
4343
def _write_data_source(
4444
table: Table,
4545
data_source: DataSource,
46+
repo_path: str,
4647
mode: str = "append",
4748
allow_overwrite: bool = False,
4849
):
4950
assert isinstance(data_source, FileSource)
5051

5152
file_options = data_source.file_options
5253

53-
if mode == "overwrite" and not allow_overwrite and os.path.exists(file_options.uri):
54+
if not Path(file_options.uri).is_absolute():
55+
absolute_path = Path(repo_path) / file_options.uri
56+
else:
57+
absolute_path = Path(file_options.uri)
58+
59+
if (
60+
mode == "overwrite"
61+
and not allow_overwrite
62+
and os.path.exists(str(absolute_path))
63+
):
5464
raise SavedDatasetLocationAlreadyExists(location=file_options.uri)
5565

5666
if isinstance(data_source.file_format, ParquetFormat):
5767
if mode == "overwrite":
5868
table = table.to_pyarrow()
69+
5970
filesystem, path = FileSource.create_filesystem_and_path(
60-
file_options.uri,
71+
str(absolute_path),
6172
file_options.s3_endpoint_override,
6273
)
6374

0 commit comments

Comments
 (0)