Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add init and cleanup of long lived resources
Signed-off-by: Rob Howley <howley.robert@gmail.com>
  • Loading branch information
robhowley committed Oct 17, 2024
commit 6ca3b47cc84a9e8aecf06a32ae5c782e2d8f1bd7
4 changes: 2 additions & 2 deletions docs/getting-started/concepts/permission.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ The permission model is based on the following components:
The `Permission` class identifies a single permission configured on the feature store and is identified by these attributes:
- `name`: The permission name.
- `types`: The list of protected resource types. Defaults to all managed types, e.g. the `ALL_RESOURCE_TYPES` alias. All sub-classes are included in the resource match.
- `name_pattern`: A regex to match the resource name. Defaults to `None`, meaning that no name filtering is applied
- `name_patterns`: A list of regex patterns to match resource names. If any regex matches, the `Permission` policy is applied. Defaults to `[]`, meaning no name filtering is applied.
- `required_tags`: Dictionary of key-value pairs that must match the resource tags. Defaults to `None`, meaning that no tags filtering is applied.
- `actions`: The actions authorized by this permission. Defaults to `ALL_VALUES`, an alias defined in the `action` module.
- `policy`: The policy to be applied to validate a client request.
Expand Down Expand Up @@ -95,7 +95,7 @@ The following permission grants authorization to read the offline store of all t
Permission(
name="reader",
types=[FeatureView],
name_pattern=".*risky.*",
name_patterns=".*risky.*", # Accepts both `str` or `list[str]` types
policy=RoleBasedPolicy(roles=["trusted"]),
actions=[AuthzedAction.READ_OFFLINE],
)
Expand Down
3 changes: 2 additions & 1 deletion docs/reference/feast-cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,10 @@ Options:

```text
+-----------------------+-------------+-----------------------+-----------+----------------+-------------------------+
| NAME | TYPES | NAME_PATTERN | ACTIONS | ROLES | REQUIRED_TAGS |
| NAME | TYPES | NAME_PATTERNS | ACTIONS | ROLES | REQUIRED_TAGS |
+=======================+=============+=======================+===========+================+================+========+
| reader_permission1234 | FeatureView | transformed_conv_rate | DESCRIBE | reader | - |
| | | driver_hourly_stats | DESCRIBE | reader | - |
+-----------------------+-------------+-----------------------+-----------+----------------+-------------------------+
| writer_permission1234 | FeatureView | transformed_conv_rate | CREATE | writer | - |
+-----------------------+-------------+-----------------------+-----------+----------------+-------------------------+
Expand Down
2 changes: 1 addition & 1 deletion java/serving/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
<version>1.11.4</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-java-root -->
Expand Down
2 changes: 1 addition & 1 deletion protos/feast/core/Permission.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ message PermissionSpec {

repeated Type types = 3;

string name_pattern = 4;
repeated string name_patterns = 4;

map<string, string> required_tags = 5;

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,7 @@ def feast_permissions_list_command(ctx: click.Context, verbose: bool, tags: list
headers=[
"NAME",
"TYPES",
"NAME_PATTERN",
"NAME_PATTERNS",
"ACTIONS",
"ROLES",
"REQUIRED_TAGS",
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def handle_not_verbose_permissions_command(
[
p.name,
_to_multi_line([t.__name__ for t in p.types]), # type: ignore[union-attr, attr-defined]
p.name_pattern,
_to_multi_line(p.name_patterns),
_to_multi_line([a.value.upper() for a in p.actions]),
_to_multi_line(sorted(roles)),
_dict_to_multi_line(p.required_tags),
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ def async_refresh():
@asynccontextmanager
async def lifespan(app: FastAPI):
async_refresh()
await store.initialize()
yield
stop_refresh()
await store.close()

app = FastAPI(lifespan=lifespan)

Expand Down
12 changes: 10 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ def plan(
>>> fs = FeatureStore(repo_path="project/feature_repo")
>>> driver = Entity(name="driver_id", description="driver id")
>>> driver_hourly_stats = FileSource(
... path="project/feature_repo/data/driver_stats.parquet",
... path="data/driver_stats.parquet",
... timestamp_field="event_timestamp",
... created_timestamp_column="created",
... )
Expand Down Expand Up @@ -827,7 +827,7 @@ def apply(
>>> fs = FeatureStore(repo_path="project/feature_repo")
>>> driver = Entity(name="driver_id", description="driver id")
>>> driver_hourly_stats = FileSource(
... path="project/feature_repo/data/driver_stats.parquet",
... path="data/driver_stats.parquet",
... timestamp_field="event_timestamp",
... created_timestamp_column="created",
... )
Expand Down Expand Up @@ -2078,6 +2078,14 @@ def list_saved_datasets(
self.project, allow_cache=allow_cache, tags=tags
)

async def initialize(self) -> None:
"""Initialize long-lived clients and/or resources needed for accessing datastores"""
await self._get_provider().initialize(self.config)

async def close(self) -> None:
"""Cleanup any long-lived clients and/or resources"""
await self._get_provider().close()


def _print_materialization_log(
start_date, end_date, num_feature_views: int, online_store: str
Expand Down
40 changes: 33 additions & 7 deletions sdk/python/feast/infra/offline_stores/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(
self,
evaluation_function: Callable,
full_feature_names: bool,
repo_path: str,
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
metadata: Optional[RetrievalMetadata] = None,
):
Expand All @@ -67,6 +68,7 @@ def __init__(
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views or []
self._metadata = metadata
self.repo_path = repo_path

@property
def full_feature_names(self) -> bool:
Expand Down Expand Up @@ -99,8 +101,13 @@ def persist(
if not allow_overwrite and os.path.exists(storage.file_options.uri):
raise SavedDatasetLocationAlreadyExists(location=storage.file_options.uri)

if not Path(storage.file_options.uri).is_absolute():
absolute_path = Path(self.repo_path) / storage.file_options.uri
else:
absolute_path = Path(storage.file_options.uri)

filesystem, path = FileSource.create_filesystem_and_path(
storage.file_options.uri,
str(absolute_path),
storage.file_options.s3_endpoint_override,
)

Expand Down Expand Up @@ -243,7 +250,9 @@ def evaluate_historical_retrieval():

all_join_keys = list(set(all_join_keys + join_keys))

df_to_join = _read_datasource(feature_view.batch_source)
df_to_join = _read_datasource(
feature_view.batch_source, config.repo_path
)

df_to_join, timestamp_field = _field_mapping(
df_to_join,
Expand Down Expand Up @@ -297,6 +306,7 @@ def evaluate_historical_retrieval():
min_event_timestamp=entity_df_event_timestamp_range[0],
max_event_timestamp=entity_df_event_timestamp_range[1],
),
repo_path=str(config.repo_path),
)
return job

Expand All @@ -316,7 +326,7 @@ def pull_latest_from_table_or_query(

# Create lazy function that is only called from the RetrievalJob object
def evaluate_offline_job():
source_df = _read_datasource(data_source)
source_df = _read_datasource(data_source, config.repo_path)

source_df = _normalize_timestamp(
source_df, timestamp_field, created_timestamp_column
Expand Down Expand Up @@ -377,6 +387,7 @@ def evaluate_offline_job():
return DaskRetrievalJob(
evaluation_function=evaluate_offline_job,
full_feature_names=False,
repo_path=str(config.repo_path),
)

@staticmethod
Expand Down Expand Up @@ -420,8 +431,13 @@ def write_logged_features(
# Since this code will be mostly used from Go-created thread, it's better to avoid producing new threads
data = pyarrow.parquet.read_table(data, use_threads=False, pre_buffer=False)

if config.repo_path is not None and not Path(destination.path).is_absolute():
absolute_path = config.repo_path / destination.path
else:
absolute_path = Path(destination.path)

filesystem, path = FileSource.create_filesystem_and_path(
destination.path,
str(absolute_path),
destination.s3_endpoint_override,
)

Expand Down Expand Up @@ -456,8 +472,14 @@ def offline_write_batch(
)

file_options = feature_view.batch_source.file_options

if config.repo_path is not None and not Path(file_options.uri).is_absolute():
absolute_path = config.repo_path / file_options.uri
else:
absolute_path = Path(file_options.uri)

filesystem, path = FileSource.create_filesystem_and_path(
file_options.uri, file_options.s3_endpoint_override
str(absolute_path), file_options.s3_endpoint_override
)
prev_table = pyarrow.parquet.read_table(
path, filesystem=filesystem, memory_map=True
Expand Down Expand Up @@ -493,7 +515,7 @@ def _get_entity_df_event_timestamp_range(
)


def _read_datasource(data_source) -> dd.DataFrame:
def _read_datasource(data_source, repo_path) -> dd.DataFrame:
storage_options = (
{
"client_kwargs": {
Expand All @@ -504,8 +526,12 @@ def _read_datasource(data_source) -> dd.DataFrame:
else None
)

if not Path(data_source.path).is_absolute():
path = repo_path / data_source.path
else:
path = data_source.path
return dd.read_parquet(
data_source.path,
path,
storage_options=storage_options,
)

Expand Down
17 changes: 14 additions & 3 deletions sdk/python/feast/infra/offline_stores/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from feast.repo_config import FeastConfigBaseModel, RepoConfig


def _read_data_source(data_source: DataSource) -> Table:
def _read_data_source(data_source: DataSource, repo_path: str) -> Table:
assert isinstance(data_source, FileSource)

if isinstance(data_source.file_format, ParquetFormat):
Expand All @@ -43,21 +43,32 @@ def _read_data_source(data_source: DataSource) -> Table:
def _write_data_source(
table: Table,
data_source: DataSource,
repo_path: str,
mode: str = "append",
allow_overwrite: bool = False,
):
assert isinstance(data_source, FileSource)

file_options = data_source.file_options

if mode == "overwrite" and not allow_overwrite and os.path.exists(file_options.uri):
if not Path(file_options.uri).is_absolute():
absolute_path = Path(repo_path) / file_options.uri
else:
absolute_path = Path(file_options.uri)

if (
mode == "overwrite"
and not allow_overwrite
and os.path.exists(str(absolute_path))
):
raise SavedDatasetLocationAlreadyExists(location=file_options.uri)

if isinstance(data_source.file_format, ParquetFormat):
if mode == "overwrite":
table = table.to_pyarrow()

filesystem, path = FileSource.create_filesystem_and_path(
file_options.uri,
str(absolute_path),
file_options.s3_endpoint_override,
)

Expand Down
11 changes: 10 additions & 1 deletion sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Optional, Tuple

import pyarrow
Expand Down Expand Up @@ -154,8 +155,16 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
if (
config.repo_path is not None
and not Path(self.file_options.uri).is_absolute()
):
absolute_path = config.repo_path / self.file_options.uri
else:
absolute_path = Path(self.file_options.uri)

filesystem, path = FileSource.create_filesystem_and_path(
self.path, self.file_options.s3_endpoint_override
str(absolute_path), self.file_options.s3_endpoint_override
)

# TODO why None check necessary
Expand Down
Loading