Skip to content

Commit 0d2d951

Browse files
authored
feat: Allow passing repo config path via flag (feast-dev#3077)
* WIP ci: Docs on structuring feature repos Signed-off-by: Achal Shah <achals@gmail.com> * plumb Signed-off-by: Achal Shah <achals@gmail.com> * plumb Signed-off-by: Achal Shah <achals@gmail.com> * fix load_repo_config Signed-off-by: Achal Shah <achals@gmail.com> * tests Signed-off-by: Achal Shah <achals@gmail.com> * tests Signed-off-by: Achal Shah <achals@gmail.com> * tests Signed-off-by: Achal Shah <achals@gmail.com> Signed-off-by: Achal Shah <achals@gmail.com>
1 parent 55c61f9 commit 0d2d951

8 files changed

Lines changed: 131 additions & 67 deletions

File tree

java/serving/src/test/resources/docker-compose/feast10/setup_it.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def main():
6464
print("Running setup_it.py")
6565

6666
setup_data()
67-
existing_repo_config = load_repo_config(Path("."))
67+
existing_repo_config = load_repo_config(Path("."), Path(".") / "feature_store.yaml")
6868

6969
# Update to default online store since otherwise, relies on Dockerized Redis service
7070
fs = FeatureStore(config=existing_repo_config.copy(update={"online_store": {}}))

sdk/python/feast/cli.py

Lines changed: 82 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,17 @@ def format_options(self, ctx: click.Context, formatter: click.HelpFormatter):
7070
default="info",
7171
help="The logging level. One of DEBUG, INFO, WARNING, ERROR, and CRITICAL (case-insensitive).",
7272
)
73+
@click.option(
74+
"--feature-store-yaml",
75+
help="Override the directory where the CLI should look for the feature_store.yaml file.",
76+
)
7377
@click.pass_context
74-
def cli(ctx: click.Context, chdir: Optional[str], log_level: str):
78+
def cli(
79+
ctx: click.Context,
80+
chdir: Optional[str],
81+
log_level: str,
82+
feature_store_yaml: Optional[str],
83+
):
7584
"""
7685
Feast CLI
7786
@@ -81,6 +90,11 @@ def cli(ctx: click.Context, chdir: Optional[str], log_level: str):
8190
"""
8291
ctx.ensure_object(dict)
8392
ctx.obj["CHDIR"] = Path.cwd() if chdir is None else Path(chdir).absolute()
93+
ctx.obj["FS_YAML_FILE"] = (
94+
Path(feature_store_yaml).absolute()
95+
if feature_store_yaml
96+
else ctx.obj["CHDIR"] / "feature_store.yaml"
97+
)
8498
try:
8599
level = getattr(logging, log_level.upper())
86100
logging.basicConfig(
@@ -141,8 +155,9 @@ def ui(ctx: click.Context, host: str, port: int, registry_ttl_sec: int):
141155
Shows the Feast UI over the current directory
142156
"""
143157
repo = ctx.obj["CHDIR"]
144-
cli_check_repo(repo)
145-
store = FeatureStore(repo_path=str(repo))
158+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
159+
cli_check_repo(repo, fs_yaml_file)
160+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
146161
# Pass in the registry_dump method to get around a circular dependency
147162
store.serve_ui(
148163
host=host,
@@ -159,8 +174,9 @@ def endpoint(ctx: click.Context):
159174
Display feature server endpoints
160175
"""
161176
repo = ctx.obj["CHDIR"]
162-
cli_check_repo(repo)
163-
store = FeatureStore(repo_path=str(repo))
177+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
178+
cli_check_repo(repo, fs_yaml_file)
179+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
164180
endpoint = store.get_feature_server_endpoint()
165181
if endpoint is not None:
166182
_logger.info(
@@ -186,8 +202,9 @@ def data_source_describe(ctx: click.Context, name: str):
186202
Describe a data source
187203
"""
188204
repo = ctx.obj["CHDIR"]
189-
cli_check_repo(repo)
190-
store = FeatureStore(repo_path=str(repo))
205+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
206+
cli_check_repo(repo, fs_yaml_file)
207+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
191208

192209
try:
193210
data_source = store.get_data_source(name)
@@ -209,8 +226,9 @@ def data_source_list(ctx: click.Context):
209226
List all data sources
210227
"""
211228
repo = ctx.obj["CHDIR"]
212-
cli_check_repo(repo)
213-
store = FeatureStore(repo_path=str(repo))
229+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
230+
cli_check_repo(repo, fs_yaml_file)
231+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
214232
table = []
215233
for datasource in store.list_data_sources():
216234
table.append([datasource.name, datasource.__class__])
@@ -236,8 +254,9 @@ def entity_describe(ctx: click.Context, name: str):
236254
Describe an entity
237255
"""
238256
repo = ctx.obj["CHDIR"]
239-
cli_check_repo(repo)
240-
store = FeatureStore(repo_path=str(repo))
257+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
258+
cli_check_repo(repo, fs_yaml_file)
259+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
241260

242261
try:
243262
entity = store.get_entity(name)
@@ -259,8 +278,9 @@ def entity_list(ctx: click.Context):
259278
List all entities
260279
"""
261280
repo = ctx.obj["CHDIR"]
262-
cli_check_repo(repo)
263-
store = FeatureStore(repo_path=str(repo))
281+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
282+
cli_check_repo(repo, fs_yaml_file)
283+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
264284
table = []
265285
for entity in store.list_entities():
266286
table.append([entity.name, entity.description, entity.value_type])
@@ -286,8 +306,9 @@ def feature_service_describe(ctx: click.Context, name: str):
286306
Describe a feature service
287307
"""
288308
repo = ctx.obj["CHDIR"]
289-
cli_check_repo(repo)
290-
store = FeatureStore(repo_path=str(repo))
309+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
310+
cli_check_repo(repo, fs_yaml_file)
311+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
291312

292313
try:
293314
feature_service = store.get_feature_service(name)
@@ -311,8 +332,9 @@ def feature_service_list(ctx: click.Context):
311332
List all feature services
312333
"""
313334
repo = ctx.obj["CHDIR"]
314-
cli_check_repo(repo)
315-
store = FeatureStore(repo_path=str(repo))
335+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
336+
cli_check_repo(repo, fs_yaml_file)
337+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
316338
feature_services = []
317339
for feature_service in store.list_feature_services():
318340
feature_names = []
@@ -343,8 +365,9 @@ def feature_view_describe(ctx: click.Context, name: str):
343365
Describe a feature view
344366
"""
345367
repo = ctx.obj["CHDIR"]
346-
cli_check_repo(repo)
347-
store = FeatureStore(repo_path=str(repo))
368+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
369+
cli_check_repo(repo, fs_yaml_file)
370+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
348371

349372
try:
350373
feature_view = store.get_feature_view(name)
@@ -366,8 +389,10 @@ def feature_view_list(ctx: click.Context):
366389
List all feature views
367390
"""
368391
repo = ctx.obj["CHDIR"]
369-
cli_check_repo(repo)
370-
store = FeatureStore(repo_path=str(repo))
392+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
393+
394+
cli_check_repo(repo, fs_yaml_file)
395+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
371396
table = []
372397
for feature_view in [
373398
*store.list_feature_views(),
@@ -409,8 +434,9 @@ def on_demand_feature_view_describe(ctx: click.Context, name: str):
409434
[Experimental] Describe an on demand feature view
410435
"""
411436
repo = ctx.obj["CHDIR"]
412-
cli_check_repo(repo)
413-
store = FeatureStore(repo_path=str(repo))
437+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
438+
cli_check_repo(repo, fs_yaml_file)
439+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
414440

415441
try:
416442
on_demand_feature_view = store.get_on_demand_feature_view(name)
@@ -434,8 +460,9 @@ def on_demand_feature_view_list(ctx: click.Context):
434460
[Experimental] List all on demand feature views
435461
"""
436462
repo = ctx.obj["CHDIR"]
437-
cli_check_repo(repo)
438-
store = FeatureStore(repo_path=str(repo))
463+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
464+
cli_check_repo(repo, fs_yaml_file)
465+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
439466
table = []
440467
for on_demand_feature_view in store.list_on_demand_feature_views():
441468
table.append([on_demand_feature_view.name])
@@ -457,8 +484,9 @@ def plan_command(ctx: click.Context, skip_source_validation: bool):
457484
Create or update a feature store deployment
458485
"""
459486
repo = ctx.obj["CHDIR"]
460-
cli_check_repo(repo)
461-
repo_config = load_repo_config(repo)
487+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
488+
cli_check_repo(repo, fs_yaml_file)
489+
repo_config = load_repo_config(repo, fs_yaml_file)
462490
try:
463491
plan(repo_config, repo, skip_source_validation)
464492
except FeastProviderLoginError as e:
@@ -477,8 +505,10 @@ def apply_total_command(ctx: click.Context, skip_source_validation: bool):
477505
Create or update a feature store deployment
478506
"""
479507
repo = ctx.obj["CHDIR"]
480-
cli_check_repo(repo)
481-
repo_config = load_repo_config(repo)
508+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
509+
cli_check_repo(repo, fs_yaml_file)
510+
511+
repo_config = load_repo_config(repo, fs_yaml_file)
482512
try:
483513
apply_total(repo_config, repo, skip_source_validation)
484514
except FeastProviderLoginError as e:
@@ -492,8 +522,9 @@ def teardown_command(ctx: click.Context):
492522
Tear down deployed feature store infrastructure
493523
"""
494524
repo = ctx.obj["CHDIR"]
495-
cli_check_repo(repo)
496-
repo_config = load_repo_config(repo)
525+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
526+
cli_check_repo(repo, fs_yaml_file)
527+
repo_config = load_repo_config(repo, fs_yaml_file)
497528

498529
teardown(repo_config, repo)
499530

@@ -505,8 +536,9 @@ def registry_dump_command(ctx: click.Context):
505536
Print contents of the metadata registry
506537
"""
507538
repo = ctx.obj["CHDIR"]
508-
cli_check_repo(repo)
509-
repo_config = load_repo_config(repo)
539+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
540+
cli_check_repo(repo, fs_yaml_file)
541+
repo_config = load_repo_config(repo, fs_yaml_file)
510542

511543
click.echo(registry_dump(repo_config, repo_path=repo))
512544

@@ -533,8 +565,9 @@ def materialize_command(
533565
START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
534566
"""
535567
repo = ctx.obj["CHDIR"]
536-
cli_check_repo(repo)
537-
store = FeatureStore(repo_path=str(repo))
568+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
569+
cli_check_repo(repo, fs_yaml_file)
570+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
538571
store.materialize(
539572
feature_views=None if not views else views,
540573
start_date=utils.make_tzaware(parser.parse(start_ts)),
@@ -561,8 +594,9 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List
561594
END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
562595
"""
563596
repo = ctx.obj["CHDIR"]
564-
cli_check_repo(repo)
565-
store = FeatureStore(repo_path=str(repo))
597+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
598+
cli_check_repo(repo, fs_yaml_file)
599+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
566600
store.materialize_incremental(
567601
feature_views=None if not views else views,
568602
end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)),
@@ -651,8 +685,9 @@ def serve_command(
651685
):
652686
"""Start a feature server locally on a given port."""
653687
repo = ctx.obj["CHDIR"]
654-
cli_check_repo(repo)
655-
store = FeatureStore(repo_path=str(repo))
688+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
689+
cli_check_repo(repo, fs_yaml_file)
690+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
656691

657692
if go:
658693
# Turn on Go feature retrieval.
@@ -673,8 +708,9 @@ def serve_command(
673708
def serve_transformations_command(ctx: click.Context, port: int):
674709
"""[Experimental] Start a feature consumption server locally on a given port."""
675710
repo = ctx.obj["CHDIR"]
676-
cli_check_repo(repo)
677-
store = FeatureStore(repo_path=str(repo))
711+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
712+
cli_check_repo(repo, fs_yaml_file)
713+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
678714

679715
store.serve_transformations(port)
680716

@@ -712,8 +748,9 @@ def validate(
712748
START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
713749
"""
714750
repo = ctx.obj["CHDIR"]
715-
cli_check_repo(repo)
716-
store = FeatureStore(repo_path=str(repo))
751+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
752+
cli_check_repo(repo, fs_yaml_file)
753+
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
717754

718755
feature_service = store.get_feature_service(name=feature_service)
719756
reference = store.get_validation_reference(reference)
@@ -754,7 +791,8 @@ def repo_upgrade(ctx: click.Context, write: bool):
754791
Upgrade a feature repo in place.
755792
"""
756793
repo = ctx.obj["CHDIR"]
757-
cli_check_repo(repo)
794+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
795+
cli_check_repo(repo, fs_yaml_file)
758796
try:
759797
RepoUpgrader(repo, write).upgrade()
760798
except FeastProviderLoginError as e:

sdk/python/feast/feature_store.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -120,30 +120,41 @@ class FeatureStore:
120120
repo_path: Path
121121
_registry: BaseRegistry
122122
_provider: Provider
123-
_go_server: "EmbeddedOnlineFeatureServer"
123+
_go_server: Optional["EmbeddedOnlineFeatureServer"]
124124

125125
@log_exceptions
126126
def __init__(
127127
self,
128128
repo_path: Optional[str] = None,
129129
config: Optional[RepoConfig] = None,
130+
fs_yaml_file: Optional[Path] = None,
130131
):
131132
"""
132133
Creates a FeatureStore object.
133134
134135
Raises:
135136
ValueError: If both or neither of repo_path and config are specified.
136137
"""
137-
if repo_path is not None and config is not None:
138-
raise ValueError("You cannot specify both repo_path and config.")
139-
if config is not None:
138+
if fs_yaml_file is not None and config is not None:
139+
raise ValueError("You cannot specify both fs_yaml_dir and config.")
140+
141+
if repo_path:
142+
self.repo_path = Path(repo_path)
143+
else:
140144
self.repo_path = Path(os.getcwd())
145+
146+
# If config is specified, or fs_yaml_file is specified, those take precedence over
147+
# the default feature_store.yaml location under repo_path.
148+
if config is not None:
141149
self.config = config
142-
elif repo_path is not None:
143-
self.repo_path = Path(repo_path)
144-
self.config = load_repo_config(Path(repo_path))
150+
elif fs_yaml_file is not None:
151+
self.config = load_repo_config(self.repo_path, fs_yaml_file)
152+
elif repo_path:
153+
self.config = load_repo_config(
154+
self.repo_path, Path(repo_path) / "feature_store.yaml"
155+
)
145156
else:
146-
raise ValueError("Please specify one of repo_path or config.")
157+
raise ValueError("Please specify one of fs_yaml_dir or config.")
147158

148159
registry_config = self.config.get_registry_config()
149160
if registry_config.registry_type == "sql":
@@ -152,7 +163,8 @@ def __init__(
152163
r = Registry(registry_config, repo_path=self.repo_path)
153164
r._initialize_registry(self.config.project)
154165
self._registry = r
155-
self._provider = get_provider(self.config, self.repo_path)
166+
167+
self._provider = get_provider(self.config)
156168
self._go_server = None
157169

158170
@log_exceptions
@@ -1587,7 +1599,7 @@ def _get_online_features(
15871599
}
15881600

15891601
# If the embedded Go code is enabled, send request to it instead of going through regular Python logic.
1590-
if self.config.go_feature_retrieval:
1602+
if self.config.go_feature_retrieval and self._go_server:
15911603
self._lazy_init_go_server()
15921604

15931605
entity_native_values: Dict[str, List[Any]]
@@ -2239,7 +2251,7 @@ def serve(
22392251
) -> None:
22402252
"""Start the feature consumption server locally on a given port."""
22412253
type_ = type_.lower()
2242-
if self.config.go_feature_serving:
2254+
if self.config.go_feature_serving and self._go_server:
22432255
# Start go server instead of python if the flag is enabled
22442256
self._lazy_init_go_server()
22452257
enable_logging = (

sdk/python/feast/infra/provider.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ def get_feature_server_endpoint(self) -> Optional[str]:
297297
return None
298298

299299

300-
def get_provider(config: RepoConfig, repo_path: Path) -> Provider:
300+
def get_provider(config: RepoConfig) -> Provider:
301301
if "." not in config.provider:
302302
if config.provider not in PROVIDERS_CLASS_FOR_TYPE:
303303
raise errors.FeastProviderNotImplementedError(config.provider)

0 commit comments

Comments
 (0)