Skip to content

Commit 472f9c4

Browse files
feat(materialization): add time chunks management
Signed-off-by: Alan Gauthier <alan.gauthier@jobteaser.com>
1 parent d41becf commit 472f9c4

File tree

6 files changed

+954
-35
lines changed

6 files changed

+954
-35
lines changed

sdk/python/feast/cli/cli.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414
import json
1515
import logging
16-
from datetime import datetime
16+
from datetime import datetime, timedelta
1717
from importlib.metadata import version as importlib_version
1818
from pathlib import Path
1919
from typing import List, Optional
@@ -366,6 +366,24 @@ def registry_dump_command(ctx: click.Context):
366366
default=None,
367367
help="Version to materialize (e.g., 'v2'). Requires --views with exactly one feature view.",
368368
)
369+
@click.option(
370+
"--chunk-hours",
371+
type=int,
372+
default=None,
373+
help="Split materialization into N-hour chunks to reduce memory pressure.",
374+
)
375+
@click.option(
376+
"--chunk-minutes",
377+
type=int,
378+
default=None,
379+
help="Split materialization into N-minute chunks (finer granularity than --chunk-hours).",
380+
)
381+
@click.option(
382+
"--chunk-seconds",
383+
type=int,
384+
default=None,
385+
help="Split materialization into N-second chunks (finest granularity).",
386+
)
369387
@click.pass_context
370388
def materialize_command(
371389
ctx: click.Context,
@@ -374,6 +392,9 @@ def materialize_command(
374392
views: List[str],
375393
disable_event_timestamp: bool,
376394
feature_view_version: Optional[str],
395+
chunk_hours: Optional[int],
396+
chunk_minutes: Optional[int],
397+
chunk_seconds: Optional[int],
377398
):
378399
"""
379400
Run a (non-incremental) materialization job to ingest data into the online store. Feast
@@ -387,6 +408,24 @@ def materialize_command(
387408
"""
388409
store = create_feature_store(ctx)
389410

411+
if sum(v is not None for v in [chunk_hours, chunk_minutes, chunk_seconds]) > 1:
412+
raise click.UsageError(
413+
"Only one of --chunk-hours, --chunk-minutes, --chunk-seconds may be specified."
414+
)
415+
chunk_size: Optional[timedelta] = None
416+
if chunk_hours is not None:
417+
if chunk_hours <= 0:
418+
raise click.UsageError("--chunk-hours must be a positive integer.")
419+
chunk_size = timedelta(hours=chunk_hours)
420+
elif chunk_minutes is not None:
421+
if chunk_minutes <= 0:
422+
raise click.UsageError("--chunk-minutes must be a positive integer.")
423+
chunk_size = timedelta(minutes=chunk_minutes)
424+
elif chunk_seconds is not None:
425+
if chunk_seconds <= 0:
426+
raise click.UsageError("--chunk-seconds must be a positive integer.")
427+
chunk_size = timedelta(seconds=chunk_seconds)
428+
390429
if disable_event_timestamp:
391430
if start_ts or end_ts:
392431
raise click.UsageError(
@@ -412,6 +451,7 @@ def materialize_command(
412451
end_date=end_date,
413452
disable_event_timestamp=disable_event_timestamp,
414453
version=feature_view_version,
454+
chunk_size=chunk_size,
415455
)
416456

417457

@@ -429,12 +469,33 @@ def materialize_command(
429469
default=None,
430470
help="Version to materialize (e.g., 'v2'). Requires --views with exactly one feature view.",
431471
)
472+
@click.option(
473+
"--chunk-hours",
474+
type=int,
475+
default=None,
476+
help="Split materialization into N-hour chunks to reduce memory pressure.",
477+
)
478+
@click.option(
479+
"--chunk-minutes",
480+
type=int,
481+
default=None,
482+
help="Split materialization into N-minute chunks (finer granularity than --chunk-hours).",
483+
)
484+
@click.option(
485+
"--chunk-seconds",
486+
type=int,
487+
default=None,
488+
help="Split materialization into N-second chunks (finest granularity).",
489+
)
432490
@click.pass_context
433491
def materialize_incremental_command(
434492
ctx: click.Context,
435493
end_ts: str,
436494
views: List[str],
437495
feature_view_version: Optional[str],
496+
chunk_hours: Optional[int],
497+
chunk_minutes: Optional[int],
498+
chunk_seconds: Optional[int],
438499
):
439500
"""
440501
Run an incremental materialization job to ingest new data into the online store. Feast will read
@@ -445,10 +506,30 @@ def materialize_incremental_command(
445506
END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
446507
"""
447508
store = create_feature_store(ctx)
509+
510+
if sum(v is not None for v in [chunk_hours, chunk_minutes, chunk_seconds]) > 1:
511+
raise click.UsageError(
512+
"Only one of --chunk-hours, --chunk-minutes, --chunk-seconds may be specified."
513+
)
514+
chunk_size: Optional[timedelta] = None
515+
if chunk_hours is not None:
516+
if chunk_hours <= 0:
517+
raise click.UsageError("--chunk-hours must be a positive integer.")
518+
chunk_size = timedelta(hours=chunk_hours)
519+
elif chunk_minutes is not None:
520+
if chunk_minutes <= 0:
521+
raise click.UsageError("--chunk-minutes must be a positive integer.")
522+
chunk_size = timedelta(minutes=chunk_minutes)
523+
elif chunk_seconds is not None:
524+
if chunk_seconds <= 0:
525+
raise click.UsageError("--chunk-seconds must be a positive integer.")
526+
chunk_size = timedelta(seconds=chunk_seconds)
527+
448528
store.materialize_incremental(
449529
feature_views=None if not views else views,
450530
end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)),
451531
version=feature_view_version,
532+
chunk_size=chunk_size,
452533
)
453534

454535

0 commit comments

Comments
 (0)