Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 82 additions & 1 deletion sdk/python/feast/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
import json
import logging
from datetime import datetime
from datetime import datetime, timedelta
from importlib.metadata import version as importlib_version
from pathlib import Path
from typing import List, Optional
Expand Down Expand Up @@ -366,6 +366,24 @@ def registry_dump_command(ctx: click.Context):
default=None,
help="Version to materialize (e.g., 'v2'). Requires --views with exactly one feature view.",
)
@click.option(
"--chunk-hours",
type=int,
default=None,
help="Split materialization into N-hour chunks to reduce memory pressure.",
)
@click.option(
"--chunk-minutes",
type=int,
default=None,
help="Split materialization into N-minute chunks (finer granularity than --chunk-hours).",
)
@click.option(
"--chunk-seconds",
type=int,
default=None,
help="Split materialization into N-second chunks (finest granularity).",
)
@click.pass_context
def materialize_command(
ctx: click.Context,
Expand All @@ -374,6 +392,9 @@ def materialize_command(
views: List[str],
disable_event_timestamp: bool,
feature_view_version: Optional[str],
chunk_hours: Optional[int],
chunk_minutes: Optional[int],
chunk_seconds: Optional[int],
):
"""
Run a (non-incremental) materialization job to ingest data into the online store. Feast
Expand All @@ -387,6 +408,24 @@ def materialize_command(
"""
store = create_feature_store(ctx)

if sum(v is not None for v in [chunk_hours, chunk_minutes, chunk_seconds]) > 1:
raise click.UsageError(
"Only one of --chunk-hours, --chunk-minutes, --chunk-seconds may be specified."
)
chunk_size: Optional[timedelta] = None
if chunk_hours is not None:
if chunk_hours <= 0:
raise click.UsageError("--chunk-hours must be a positive integer.")
chunk_size = timedelta(hours=chunk_hours)
elif chunk_minutes is not None:
if chunk_minutes <= 0:
raise click.UsageError("--chunk-minutes must be a positive integer.")
chunk_size = timedelta(minutes=chunk_minutes)
elif chunk_seconds is not None:
if chunk_seconds <= 0:
raise click.UsageError("--chunk-seconds must be a positive integer.")
chunk_size = timedelta(seconds=chunk_seconds)

if disable_event_timestamp:
if start_ts or end_ts:
raise click.UsageError(
Expand All @@ -412,6 +451,7 @@ def materialize_command(
end_date=end_date,
disable_event_timestamp=disable_event_timestamp,
version=feature_view_version,
chunk_size=chunk_size,
)


Expand All @@ -429,12 +469,33 @@ def materialize_command(
default=None,
help="Version to materialize (e.g., 'v2'). Requires --views with exactly one feature view.",
)
@click.option(
"--chunk-hours",
type=int,
default=None,
help="Split materialization into N-hour chunks to reduce memory pressure.",
)
@click.option(
"--chunk-minutes",
type=int,
default=None,
help="Split materialization into N-minute chunks (finer granularity than --chunk-hours).",
)
@click.option(
"--chunk-seconds",
type=int,
default=None,
help="Split materialization into N-second chunks (finest granularity).",
)
@click.pass_context
def materialize_incremental_command(
ctx: click.Context,
end_ts: str,
views: List[str],
feature_view_version: Optional[str],
chunk_hours: Optional[int],
chunk_minutes: Optional[int],
chunk_seconds: Optional[int],
):
"""
Run an incremental materialization job to ingest new data into the online store. Feast will read
Expand All @@ -445,10 +506,30 @@ def materialize_incremental_command(
END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
store = create_feature_store(ctx)

if sum(v is not None for v in [chunk_hours, chunk_minutes, chunk_seconds]) > 1:
raise click.UsageError(
"Only one of --chunk-hours, --chunk-minutes, --chunk-seconds may be specified."
)
chunk_size: Optional[timedelta] = None
if chunk_hours is not None:
if chunk_hours <= 0:
raise click.UsageError("--chunk-hours must be a positive integer.")
chunk_size = timedelta(hours=chunk_hours)
elif chunk_minutes is not None:
if chunk_minutes <= 0:
raise click.UsageError("--chunk-minutes must be a positive integer.")
chunk_size = timedelta(minutes=chunk_minutes)
elif chunk_seconds is not None:
if chunk_seconds <= 0:
raise click.UsageError("--chunk-seconds must be a positive integer.")
chunk_size = timedelta(seconds=chunk_seconds)

store.materialize_incremental(
feature_views=None if not views else views,
end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)),
version=feature_view_version,
chunk_size=chunk_size,
)


Expand Down
Loading
Loading