1313# limitations under the License.
1414import json
1515import logging
16- from datetime import datetime
16+ from datetime import datetime , timedelta
1717from importlib .metadata import version as importlib_version
1818from pathlib import Path
1919from typing import List , Optional
@@ -366,6 +366,24 @@ def registry_dump_command(ctx: click.Context):
366366 default = None ,
367367 help = "Version to materialize (e.g., 'v2'). Requires --views with exactly one feature view." ,
368368)
369+ @click .option (
370+ "--chunk-hours" ,
371+ type = int ,
372+ default = None ,
373+ help = "Split materialization into N-hour chunks to reduce memory pressure." ,
374+ )
375+ @click .option (
376+ "--chunk-minutes" ,
377+ type = int ,
378+ default = None ,
379+ help = "Split materialization into N-minute chunks (finer granularity than --chunk-hours)." ,
380+ )
381+ @click .option (
382+ "--chunk-seconds" ,
383+ type = int ,
384+ default = None ,
385+ help = "Split materialization into N-second chunks (finest granularity)." ,
386+ )
369387@click .pass_context
370388def materialize_command (
371389 ctx : click .Context ,
@@ -374,6 +392,9 @@ def materialize_command(
374392 views : List [str ],
375393 disable_event_timestamp : bool ,
376394 feature_view_version : Optional [str ],
395+ chunk_hours : Optional [int ],
396+ chunk_minutes : Optional [int ],
397+ chunk_seconds : Optional [int ],
377398):
378399 """
379400 Run a (non-incremental) materialization job to ingest data into the online store. Feast
@@ -387,6 +408,24 @@ def materialize_command(
387408 """
388409 store = create_feature_store (ctx )
389410
411+ if sum (v is not None for v in [chunk_hours , chunk_minutes , chunk_seconds ]) > 1 :
412+ raise click .UsageError (
413+ "Only one of --chunk-hours, --chunk-minutes, --chunk-seconds may be specified."
414+ )
415+ chunk_size : Optional [timedelta ] = None
416+ if chunk_hours is not None :
417+ if chunk_hours <= 0 :
418+ raise click .UsageError ("--chunk-hours must be a positive integer." )
419+ chunk_size = timedelta (hours = chunk_hours )
420+ elif chunk_minutes is not None :
421+ if chunk_minutes <= 0 :
422+ raise click .UsageError ("--chunk-minutes must be a positive integer." )
423+ chunk_size = timedelta (minutes = chunk_minutes )
424+ elif chunk_seconds is not None :
425+ if chunk_seconds <= 0 :
426+ raise click .UsageError ("--chunk-seconds must be a positive integer." )
427+ chunk_size = timedelta (seconds = chunk_seconds )
428+
390429 if disable_event_timestamp :
391430 if start_ts or end_ts :
392431 raise click .UsageError (
@@ -412,6 +451,7 @@ def materialize_command(
412451 end_date = end_date ,
413452 disable_event_timestamp = disable_event_timestamp ,
414453 version = feature_view_version ,
454+ chunk_size = chunk_size ,
415455 )
416456
417457
@@ -429,12 +469,33 @@ def materialize_command(
429469 default = None ,
430470 help = "Version to materialize (e.g., 'v2'). Requires --views with exactly one feature view." ,
431471)
472+ @click .option (
473+ "--chunk-hours" ,
474+ type = int ,
475+ default = None ,
476+ help = "Split materialization into N-hour chunks to reduce memory pressure." ,
477+ )
478+ @click .option (
479+ "--chunk-minutes" ,
480+ type = int ,
481+ default = None ,
482+ help = "Split materialization into N-minute chunks (finer granularity than --chunk-hours)." ,
483+ )
484+ @click .option (
485+ "--chunk-seconds" ,
486+ type = int ,
487+ default = None ,
488+ help = "Split materialization into N-second chunks (finest granularity)." ,
489+ )
432490@click .pass_context
433491def materialize_incremental_command (
434492 ctx : click .Context ,
435493 end_ts : str ,
436494 views : List [str ],
437495 feature_view_version : Optional [str ],
496+ chunk_hours : Optional [int ],
497+ chunk_minutes : Optional [int ],
498+ chunk_seconds : Optional [int ],
438499):
439500 """
440501 Run an incremental materialization job to ingest new data into the online store. Feast will read
@@ -445,10 +506,30 @@ def materialize_incremental_command(
445506 END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
446507 """
447508 store = create_feature_store (ctx )
509+
510+ if sum (v is not None for v in [chunk_hours , chunk_minutes , chunk_seconds ]) > 1 :
511+ raise click .UsageError (
512+ "Only one of --chunk-hours, --chunk-minutes, --chunk-seconds may be specified."
513+ )
514+ chunk_size : Optional [timedelta ] = None
515+ if chunk_hours is not None :
516+ if chunk_hours <= 0 :
517+ raise click .UsageError ("--chunk-hours must be a positive integer." )
518+ chunk_size = timedelta (hours = chunk_hours )
519+ elif chunk_minutes is not None :
520+ if chunk_minutes <= 0 :
521+ raise click .UsageError ("--chunk-minutes must be a positive integer." )
522+ chunk_size = timedelta (minutes = chunk_minutes )
523+ elif chunk_seconds is not None :
524+ if chunk_seconds <= 0 :
525+ raise click .UsageError ("--chunk-seconds must be a positive integer." )
526+ chunk_size = timedelta (seconds = chunk_seconds )
527+
448528 store .materialize_incremental (
449529 feature_views = None if not views else views ,
450530 end_date = utils .make_tzaware (datetime .fromisoformat (end_ts )),
451531 version = feature_view_version ,
532+ chunk_size = chunk_size ,
452533 )
453534
454535
0 commit comments