Skip to content

Commit e44dcb0

Browse files
feat: Add progress bar to CLI from feast apply
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
1 parent f6116f9 commit e44dcb0

File tree

5 files changed

+219
-22
lines changed

5 files changed

+219
-22
lines changed

sdk/python/feast/cli/cli.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from colorama import Fore, Style
2424
from dateutil import parser
2525
from pygments import formatters, highlight, lexers
26+
from tqdm import tqdm
2627

2728
from feast import utils
2829
from feast.cli.data_sources import data_sources_cmd
@@ -270,9 +271,17 @@ def plan_command(
270271
is_flag=True,
271272
help="Don't validate feature views. Use with caution as this skips important checks.",
272273
)
274+
@click.option(
275+
"--no-progress",
276+
is_flag=True,
277+
help="Disable progress bars during apply operation.",
278+
)
273279
@click.pass_context
274280
def apply_total_command(
275-
ctx: click.Context, skip_source_validation: bool, skip_feature_view_validation: bool
281+
ctx: click.Context,
282+
skip_source_validation: bool,
283+
skip_feature_view_validation: bool,
284+
no_progress: bool,
276285
):
277286
"""
278287
Create or update a feature store deployment
@@ -282,9 +291,21 @@ def apply_total_command(
282291
cli_check_repo(repo, fs_yaml_file)
283292

284293
repo_config = load_repo_config(repo, fs_yaml_file)
294+
295+
# Create tqdm_builder for progress tracking
296+
tqdm_builder = None
297+
if not no_progress:
298+
299+
def tqdm_builder(length):
300+
return tqdm(total=length, ncols=100)
301+
285302
try:
286303
apply_total(
287-
repo_config, repo, skip_source_validation, skip_feature_view_validation
304+
repo_config,
305+
repo,
306+
skip_source_validation,
307+
skip_feature_view_validation,
308+
tqdm_builder=tqdm_builder,
288309
)
289310
except FeastProviderLoginError as e:
290311
print(str(e))
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
"""
2+
Progress tracking infrastructure for feast apply operations.
3+
4+
This module provides the ApplyProgressContext class that manages progress bars
5+
during apply operations, following the same tqdm_builder pattern used in
6+
materialization operations.
7+
"""
8+
9+
from dataclasses import dataclass
10+
from typing import Callable, Optional
11+
12+
from tqdm import tqdm
13+
14+
15+
@dataclass
16+
class ApplyProgressContext:
17+
"""
18+
Context object for tracking progress during feast apply operations.
19+
20+
This class manages progress bars for the main phases of apply:
21+
1. Planning changes (computing diffs)
22+
2. Updating infrastructure (table creation/deletion)
23+
3. Updating registry (metadata updates)
24+
25+
Follows the same tqdm_builder pattern used throughout Feast for consistency.
26+
"""
27+
28+
tqdm_builder: Callable[[int], tqdm]
29+
current_phase: str = ""
30+
overall_progress: Optional[tqdm] = None
31+
phase_progress: Optional[tqdm] = None
32+
33+
# Phase tracking
34+
total_phases: int = 3
35+
completed_phases: int = 0
36+
37+
# Infrastructure operation tracking
38+
total_infra_operations: int = 0
39+
completed_infra_operations: int = 0
40+
41+
def start_overall_progress(self):
42+
"""Initialize the overall progress bar for apply phases."""
43+
if self.overall_progress is None:
44+
self.overall_progress = self.tqdm_builder(self.total_phases)
45+
self.overall_progress.set_description("Applying changes")
46+
47+
def start_phase(self, phase_name: str, operations_count: int = 0):
48+
"""
49+
Start tracking a new phase.
50+
51+
Args:
52+
phase_name: Human-readable name of the phase
53+
operations_count: Number of operations in this phase (0 for unknown)
54+
"""
55+
self.current_phase = phase_name
56+
if operations_count > 0:
57+
self.phase_progress = self.tqdm_builder(operations_count)
58+
self.phase_progress.set_description(f"{phase_name}")
59+
60+
def update_phase_progress(self, description: Optional[str] = None):
61+
"""
62+
Update progress within the current phase.
63+
64+
Args:
65+
description: Optional description of current operation
66+
"""
67+
if self.phase_progress:
68+
self.phase_progress.update(1)
69+
if description:
70+
self.phase_progress.set_description(
71+
f"{self.current_phase}: {description}"
72+
)
73+
74+
def complete_phase(self):
75+
"""Mark current phase as complete and advance overall progress."""
76+
if self.phase_progress:
77+
self.phase_progress.close()
78+
self.phase_progress = None
79+
if self.overall_progress:
80+
self.overall_progress.update(1)
81+
self.completed_phases += 1
82+
83+
def cleanup(self):
84+
"""Clean up all progress bars. Should be called in finally blocks."""
85+
if self.phase_progress:
86+
self.phase_progress.close()
87+
self.phase_progress = None
88+
if self.overall_progress:
89+
self.overall_progress.close()
90+
self.overall_progress = None

sdk/python/feast/diff/infra_diff.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
from dataclasses import dataclass
2-
from typing import Generic, Iterable, List, Optional, Tuple, TypeVar
2+
from typing import TYPE_CHECKING, Generic, Iterable, List, Optional, Tuple, TypeVar
3+
4+
if TYPE_CHECKING:
5+
from feast.diff.apply_progress import ApplyProgressContext
36

47
from feast.diff.property_diff import PropertyDiff, TransitionType
58
from feast.infra.infra_object import (
@@ -33,8 +36,9 @@ class InfraDiff:
3336
def __init__(self):
3437
self.infra_object_diffs = []
3538

36-
def update(self):
39+
def update(self, progress_ctx: Optional["ApplyProgressContext"] = None):
3740
"""Apply the infrastructure changes specified in this object."""
41+
3842
for infra_object_diff in self.infra_object_diffs:
3943
if infra_object_diff.transition_type in [
4044
TransitionType.DELETE,
@@ -43,6 +47,10 @@ def update(self):
4347
infra_object = InfraObject.from_proto(
4448
infra_object_diff.current_infra_object
4549
)
50+
if progress_ctx:
51+
progress_ctx.update_phase_progress(
52+
f"Tearing down {infra_object_diff.name}"
53+
)
4654
infra_object.teardown()
4755
elif infra_object_diff.transition_type in [
4856
TransitionType.CREATE,
@@ -51,8 +59,19 @@ def update(self):
5159
infra_object = InfraObject.from_proto(
5260
infra_object_diff.new_infra_object
5361
)
62+
if progress_ctx:
63+
progress_ctx.update_phase_progress(
64+
f"Creating/updating {infra_object_diff.name}"
65+
)
5466
infra_object.update()
5567

68+
# Update progress after each operation (except unchanged operations)
69+
if (
70+
progress_ctx
71+
and infra_object_diff.transition_type != TransitionType.UNCHANGED
72+
):
73+
progress_ctx.update_phase_progress()
74+
5675
def to_string(self):
5776
from colorama import Fore, Style
5877

sdk/python/feast/feature_store.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from datetime import datetime, timedelta
1919
from pathlib import Path
2020
from typing import (
21+
TYPE_CHECKING,
2122
Any,
2223
Callable,
2324
Dict,
@@ -31,6 +32,9 @@
3132
cast,
3233
)
3334

35+
if TYPE_CHECKING:
36+
from feast.diff.apply_progress import ApplyProgressContext
37+
3438
import pandas as pd
3539
import pyarrow as pa
3640
from colorama import Fore, Style
@@ -726,6 +730,7 @@ def plan(
726730
self,
727731
desired_repo_contents: RepoContents,
728732
skip_feature_view_validation: bool = False,
733+
progress_ctx: Optional["ApplyProgressContext"] = None,
729734
) -> Tuple[RegistryDiff, InfraDiff, Infra]:
730735
"""Dry-run registering objects to metadata store.
731736
@@ -793,6 +798,9 @@ def plan(
793798
self._registry, self.project, desired_repo_contents
794799
)
795800

801+
if progress_ctx:
802+
progress_ctx.update_phase_progress("Computing infrastructure diff")
803+
796804
# Compute the desired difference between the current infra, as stored in the registry,
797805
# and the desired infra.
798806
self._registry.refresh(project=self.project)
@@ -807,22 +815,45 @@ def plan(
807815
return registry_diff, infra_diff, new_infra
808816

809817
def _apply_diffs(
810-
self, registry_diff: RegistryDiff, infra_diff: InfraDiff, new_infra: Infra
818+
self,
819+
registry_diff: RegistryDiff,
820+
infra_diff: InfraDiff,
821+
new_infra: Infra,
822+
progress_ctx: Optional["ApplyProgressContext"] = None,
811823
):
812824
"""Applies the given diffs to the metadata store and infrastructure.
813825
814826
Args:
815827
registry_diff: The diff between the current registry and the desired registry.
816828
infra_diff: The diff between the current infra and the desired infra.
817829
new_infra: The desired infra.
830+
progress_ctx: Optional progress context for tracking apply progress.
818831
"""
819-
infra_diff.update()
832+
# Infrastructure phase
833+
if progress_ctx:
834+
infra_ops_count = len(infra_diff.infra_object_diffs)
835+
progress_ctx.start_phase("Updating infrastructure", infra_ops_count)
836+
837+
infra_diff.update(progress_ctx=progress_ctx)
838+
839+
if progress_ctx:
840+
progress_ctx.complete_phase()
841+
progress_ctx.start_phase("Updating registry", 2)
842+
843+
# Registry phase
820844
apply_diff_to_registry(
821845
self._registry, registry_diff, self.project, commit=False
822846
)
823847

848+
if progress_ctx:
849+
progress_ctx.update_phase_progress("Committing registry changes")
850+
824851
self._registry.update_infra(new_infra, self.project, commit=True)
825852

853+
if progress_ctx:
854+
progress_ctx.update_phase_progress("Registry update complete")
855+
progress_ctx.complete_phase()
856+
826857
def apply(
827858
self,
828859
objects: Union[

sdk/python/feast/repo_operations.py

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010
from importlib.abc import Loader
1111
from importlib.machinery import ModuleSpec
1212
from pathlib import Path
13-
from typing import List, Optional, Set, Union
13+
from typing import Callable, List, Optional, Set, Union
1414

1515
import click
1616
from click.exceptions import BadParameter
17+
from tqdm import tqdm
1718

1819
from feast import PushSource
1920
from feast.batch_feature_view import BatchFeatureView
@@ -342,6 +343,7 @@ def apply_total_with_repo_instance(
342343
repo: RepoContents,
343344
skip_source_validation: bool,
344345
skip_feature_view_validation: bool = False,
346+
tqdm_builder: Optional[Callable[[int], tqdm]] = None,
345347
):
346348
if not skip_source_validation:
347349
provider = store._get_provider()
@@ -358,22 +360,54 @@ def apply_total_with_repo_instance(
358360
views_to_delete,
359361
) = extract_objects_for_apply_delete(project_name, registry, repo)
360362

361-
if store._should_use_plan():
362-
registry_diff, infra_diff, new_infra = store.plan(
363-
repo, skip_feature_view_validation=skip_feature_view_validation
364-
)
365-
click.echo(registry_diff.to_string())
363+
# Create progress context if tqdm_builder is provided
364+
progress_ctx = None
365+
if tqdm_builder:
366+
from feast.diff.apply_progress import ApplyProgressContext
366367

367-
store._apply_diffs(registry_diff, infra_diff, new_infra)
368-
click.echo(infra_diff.to_string())
369-
else:
370-
store.apply(
371-
all_to_apply,
372-
objects_to_delete=all_to_delete,
373-
partial=False,
374-
skip_feature_view_validation=skip_feature_view_validation,
375-
)
376-
log_infra_changes(views_to_keep, views_to_delete)
368+
progress_ctx = ApplyProgressContext(tqdm_builder=tqdm_builder)
369+
progress_ctx.start_overall_progress()
370+
371+
try:
372+
if store._should_use_plan():
373+
# Planning phase
374+
if progress_ctx:
375+
progress_ctx.start_phase("Planning changes", 1)
376+
377+
registry_diff, infra_diff, new_infra = store.plan(
378+
repo,
379+
skip_feature_view_validation=skip_feature_view_validation,
380+
progress_ctx=progress_ctx,
381+
)
382+
click.echo(registry_diff.to_string())
383+
384+
if progress_ctx:
385+
progress_ctx.complete_phase()
386+
387+
# Apply phase
388+
store._apply_diffs(
389+
registry_diff, infra_diff, new_infra, progress_ctx=progress_ctx
390+
)
391+
click.echo(infra_diff.to_string())
392+
else:
393+
# Legacy apply path - simple single phase tracking
394+
if progress_ctx:
395+
progress_ctx.start_phase("Applying changes", 1)
396+
397+
store.apply(
398+
all_to_apply,
399+
objects_to_delete=all_to_delete,
400+
partial=False,
401+
skip_feature_view_validation=skip_feature_view_validation,
402+
)
403+
log_infra_changes(views_to_keep, views_to_delete)
404+
405+
if progress_ctx:
406+
progress_ctx.update_phase_progress()
407+
progress_ctx.complete_phase()
408+
finally:
409+
if progress_ctx:
410+
progress_ctx.cleanup()
377411

378412

379413
def log_infra_changes(
@@ -416,6 +450,7 @@ def apply_total(
416450
repo_path: Path,
417451
skip_source_validation: bool,
418452
skip_feature_view_validation: bool = False,
453+
tqdm_builder: Optional[Callable[[int], tqdm]] = None,
419454
):
420455
os.chdir(repo_path)
421456
repo = _get_repo_contents(repo_path, repo_config.project, repo_config)
@@ -437,6 +472,7 @@ def apply_total(
437472
repo,
438473
skip_source_validation,
439474
skip_feature_view_validation,
475+
tqdm_builder=tqdm_builder,
440476
)
441477

442478

0 commit comments

Comments
 (0)