diff --git a/sdk/python/feast/cli/cli.py b/sdk/python/feast/cli/cli.py index ab756d47496..6667a8464d9 100644 --- a/sdk/python/feast/cli/cli.py +++ b/sdk/python/feast/cli/cli.py @@ -270,9 +270,17 @@ def plan_command( is_flag=True, help="Don't validate feature views. Use with caution as this skips important checks.", ) +@click.option( + "--no-progress", + is_flag=True, + help="Disable progress bars during apply operation.", +) @click.pass_context def apply_total_command( - ctx: click.Context, skip_source_validation: bool, skip_feature_view_validation: bool + ctx: click.Context, + skip_source_validation: bool, + skip_feature_view_validation: bool, + no_progress: bool, ): """ Create or update a feature store deployment @@ -282,9 +290,19 @@ def apply_total_command( cli_check_repo(repo, fs_yaml_file) repo_config = load_repo_config(repo, fs_yaml_file) + + # Set environment variable to disable progress if requested + if no_progress: + import os + + os.environ["FEAST_NO_PROGRESS"] = "1" + try: apply_total( - repo_config, repo, skip_source_validation, skip_feature_view_validation + repo_config, + repo, + skip_source_validation, + skip_feature_view_validation, ) except FeastProviderLoginError as e: print(str(e)) diff --git a/sdk/python/feast/diff/apply_progress.py b/sdk/python/feast/diff/apply_progress.py new file mode 100644 index 00000000000..7d12ecef018 --- /dev/null +++ b/sdk/python/feast/diff/apply_progress.py @@ -0,0 +1,185 @@ +""" +Enhanced progress tracking infrastructure for feast apply operations. + +This module provides the ApplyProgressContext class that manages positioned, +color-coded progress bars during apply operations with fixed-width formatting +for perfect alignment. +""" + +from dataclasses import dataclass +from typing import Optional + +from tqdm import tqdm + +try: + from feast.diff.progress_utils import ( + create_positioned_tqdm, + get_color_for_phase, + is_tty_available, + ) + + _PROGRESS_UTILS_AVAILABLE = True +except ImportError: + # Graceful fallback when progress_utils is not available (e.g., in tests) + _PROGRESS_UTILS_AVAILABLE = False + + def create_positioned_tqdm( + position: int, + description: str, + total: int, + color: str = "blue", + postfix: Optional[str] = None, + ) -> Optional[tqdm]: + return None + + def get_color_for_phase(phase: str) -> str: + return "blue" + + def is_tty_available() -> bool: + return False + + +@dataclass +class ApplyProgressContext: + """ + Enhanced context object for tracking progress during feast apply operations. + + This class manages multiple positioned progress bars with fixed-width formatting: + 1. Overall progress (position 0) - tracks main phases + 2. Phase progress (position 1) - tracks operations within current phase + + Features: + - Fixed-width alignment for perfect visual consistency + - Color-coded progress bars by phase + - Position coordination to prevent overlap + - TTY detection for CI/CD compatibility + """ + + # Core tracking state + current_phase: str = "" + overall_progress: Optional[tqdm] = None + phase_progress: Optional[tqdm] = None + + # Progress tracking + total_phases: int = 3 + completed_phases: int = 0 + tty_available: bool = True + + # Position allocation + OVERALL_POSITION = 0 + PHASE_POSITION = 1 + + def __post_init__(self): + """Initialize TTY detection after dataclass creation.""" + self.tty_available = _PROGRESS_UTILS_AVAILABLE and is_tty_available() + + def start_overall_progress(self): + """Initialize the overall progress bar for apply phases.""" + if not self.tty_available: + return + + if self.overall_progress is None: + try: + self.overall_progress = create_positioned_tqdm( + position=self.OVERALL_POSITION, + description="Applying changes", + total=self.total_phases, + color=get_color_for_phase("overall"), + ) + except (TypeError, AttributeError): + # Handle case where fallback functions don't work as expected + self.overall_progress = None + + def start_phase(self, phase_name: str, operations_count: int = 0): + """ + Start tracking a new phase. + + Args: + phase_name: Human-readable name of the phase + operations_count: Number of operations in this phase (0 for unknown) + """ + if not self.tty_available: + return + + self.current_phase = phase_name + + # Close previous phase progress if exists + if self.phase_progress: + try: + self.phase_progress.close() + except (AttributeError, TypeError): + pass + self.phase_progress = None + + # Create new phase progress bar if operations are known + if operations_count > 0: + try: + self.phase_progress = create_positioned_tqdm( + position=self.PHASE_POSITION, + description=phase_name, + total=operations_count, + color=get_color_for_phase(phase_name.lower()), + ) + except (TypeError, AttributeError): + # Handle case where fallback functions don't work as expected + self.phase_progress = None + + def update_phase_progress(self, description: Optional[str] = None): + """ + Update progress within the current phase. + + Args: + description: Optional description of current operation + """ + if not self.tty_available or not self.phase_progress: + return + + try: + if description: + # Update postfix with current operation + self.phase_progress.set_postfix_str(description) + + self.phase_progress.update(1) + except (AttributeError, TypeError): + # Handle case where phase_progress is None or fallback function returned None + pass + + def complete_phase(self): + """Mark current phase as complete and advance overall progress.""" + if not self.tty_available: + return + + # Close phase progress + if self.phase_progress: + try: + self.phase_progress.close() + except (AttributeError, TypeError): + pass + self.phase_progress = None + + # Update overall progress + if self.overall_progress: + try: + self.overall_progress.update(1) + # Update postfix with phase completion + phase_text = f"({self.completed_phases + 1}/{self.total_phases} phases)" + self.overall_progress.set_postfix_str(phase_text) + except (AttributeError, TypeError): + pass + + self.completed_phases += 1 + + def cleanup(self): + """Clean up all progress bars. Should be called in finally blocks.""" + if self.phase_progress: + try: + self.phase_progress.close() + except (AttributeError, TypeError): + pass + self.phase_progress = None + if self.overall_progress: + try: + self.overall_progress.close() + except (AttributeError, TypeError): + pass + self.overall_progress = None diff --git a/sdk/python/feast/diff/infra_diff.py b/sdk/python/feast/diff/infra_diff.py index b761470905a..2fa9e8e882e 100644 --- a/sdk/python/feast/diff/infra_diff.py +++ b/sdk/python/feast/diff/infra_diff.py @@ -1,5 +1,8 @@ from dataclasses import dataclass -from typing import Generic, Iterable, List, Optional, Tuple, TypeVar +from typing import TYPE_CHECKING, Generic, Iterable, List, Optional, Tuple, TypeVar + +if TYPE_CHECKING: + from feast.diff.apply_progress import ApplyProgressContext from feast.diff.property_diff import PropertyDiff, TransitionType from feast.infra.infra_object import ( @@ -33,8 +36,9 @@ class InfraDiff: def __init__(self): self.infra_object_diffs = [] - def update(self): + def update(self, progress_ctx: Optional["ApplyProgressContext"] = None): """Apply the infrastructure changes specified in this object.""" + for infra_object_diff in self.infra_object_diffs: if infra_object_diff.transition_type in [ TransitionType.DELETE, @@ -43,6 +47,10 @@ def update(self): infra_object = InfraObject.from_proto( infra_object_diff.current_infra_object ) + if progress_ctx: + progress_ctx.update_phase_progress( + f"Tearing down {infra_object_diff.name}" + ) infra_object.teardown() elif infra_object_diff.transition_type in [ TransitionType.CREATE, @@ -51,6 +59,10 @@ def update(self): infra_object = InfraObject.from_proto( infra_object_diff.new_infra_object ) + if progress_ctx: + progress_ctx.update_phase_progress( + f"Creating/updating {infra_object_diff.name}" + ) infra_object.update() def to_string(self): diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 0eff5034683..fc4517281d3 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -18,6 +18,7 @@ from datetime import datetime, timedelta from pathlib import Path from typing import ( + TYPE_CHECKING, Any, Callable, Dict, @@ -31,6 +32,9 @@ cast, ) +if TYPE_CHECKING: + from feast.diff.apply_progress import ApplyProgressContext + import pandas as pd import pyarrow as pa from colorama import Fore, Style @@ -726,6 +730,7 @@ def plan( self, desired_repo_contents: RepoContents, skip_feature_view_validation: bool = False, + progress_ctx: Optional["ApplyProgressContext"] = None, ) -> Tuple[RegistryDiff, InfraDiff, Infra]: """Dry-run registering objects to metadata store. @@ -793,6 +798,9 @@ def plan( self._registry, self.project, desired_repo_contents ) + if progress_ctx: + progress_ctx.update_phase_progress("Computing infrastructure diff") + # Compute the desired difference between the current infra, as stored in the registry, # and the desired infra. self._registry.refresh(project=self.project) @@ -807,7 +815,11 @@ def plan( return registry_diff, infra_diff, new_infra def _apply_diffs( - self, registry_diff: RegistryDiff, infra_diff: InfraDiff, new_infra: Infra + self, + registry_diff: RegistryDiff, + infra_diff: InfraDiff, + new_infra: Infra, + progress_ctx: Optional["ApplyProgressContext"] = None, ): """Applies the given diffs to the metadata store and infrastructure. @@ -815,13 +827,37 @@ def _apply_diffs( registry_diff: The diff between the current registry and the desired registry. infra_diff: The diff between the current infra and the desired infra. new_infra: The desired infra. + progress_ctx: Optional progress context for tracking apply progress. """ - infra_diff.update() - apply_diff_to_registry( - self._registry, registry_diff, self.project, commit=False - ) + try: + # Infrastructure phase + if progress_ctx: + infra_ops_count = len(infra_diff.infra_object_diffs) + progress_ctx.start_phase("Updating infrastructure", infra_ops_count) + + infra_diff.update(progress_ctx=progress_ctx) + + if progress_ctx: + progress_ctx.complete_phase() + progress_ctx.start_phase("Updating registry", 2) + + # Registry phase + apply_diff_to_registry( + self._registry, registry_diff, self.project, commit=False + ) + + if progress_ctx: + progress_ctx.update_phase_progress("Committing registry changes") + + self._registry.update_infra(new_infra, self.project, commit=True) - self._registry.update_infra(new_infra, self.project, commit=True) + if progress_ctx: + progress_ctx.update_phase_progress("Registry update complete") + progress_ctx.complete_phase() + finally: + # Always cleanup progress bars + if progress_ctx: + progress_ctx.cleanup() def apply( self, diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index b0809bdd399..fa5d297752a 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -358,22 +358,40 @@ def apply_total_with_repo_instance( views_to_delete, ) = extract_objects_for_apply_delete(project_name, registry, repo) - if store._should_use_plan(): - registry_diff, infra_diff, new_infra = store.plan( - repo, skip_feature_view_validation=skip_feature_view_validation - ) - click.echo(registry_diff.to_string()) + try: + if store._should_use_plan(): + # Planning phase - compute diffs first without progress bars + registry_diff, infra_diff, new_infra = store.plan( + repo, + skip_feature_view_validation=skip_feature_view_validation, + ) + click.echo(registry_diff.to_string()) - store._apply_diffs(registry_diff, infra_diff, new_infra) - click.echo(infra_diff.to_string()) - else: - store.apply( - all_to_apply, - objects_to_delete=all_to_delete, - partial=False, - skip_feature_view_validation=skip_feature_view_validation, - ) - log_infra_changes(views_to_keep, views_to_delete) + # Only show progress bars if there are actual infrastructure changes + progress_ctx = None + if len(infra_diff.infra_object_diffs) > 0: + from feast.diff.apply_progress import ApplyProgressContext + + progress_ctx = ApplyProgressContext() + progress_ctx.start_overall_progress() + + # Apply phase + store._apply_diffs( + registry_diff, infra_diff, new_infra, progress_ctx=progress_ctx + ) + click.echo(infra_diff.to_string()) + else: + # Legacy apply path - no progress bars for legacy path + store.apply( + all_to_apply, + objects_to_delete=all_to_delete, + partial=False, + skip_feature_view_validation=skip_feature_view_validation, + ) + log_infra_changes(views_to_keep, views_to_delete) + finally: + # Cleanup is handled in the new _apply_diffs method + pass def log_infra_changes(