Skip to content

Commit 0892cd1

Browse files
authored
Add Feast init command (feast-dev#1414)
* add feast init command Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> * tweak init Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
1 parent 6d12589 commit 0892cd1

8 files changed

Lines changed: 173 additions & 8 deletions

File tree

sdk/python/feast/cli.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,13 @@
3131
from feast.feature_table import FeatureTable
3232
from feast.loaders.yaml import yaml_loader
3333
from feast.repo_config import load_repo_config
34-
from feast.repo_operations import apply_total, registry_dump, teardown
34+
from feast.repo_operations import (
35+
apply_total,
36+
cli_check_repo,
37+
init_repo,
38+
registry_dump,
39+
teardown,
40+
)
3541

3642
_logger = logging.getLogger(__name__)
3743

@@ -360,22 +366,28 @@ def project_list():
360366

361367

362368
@cli.command("apply")
363-
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
369+
@click.argument(
370+
"repo_path", type=click.Path(dir_okay=True, exists=True), default=Path.cwd
371+
)
364372
def apply_total_command(repo_path: str):
365373
"""
366374
Applies a feature repo
367375
"""
376+
cli_check_repo(Path(repo_path))
368377
repo_config = load_repo_config(Path(repo_path))
369378

370379
apply_total(repo_config, Path(repo_path).resolve())
371380

372381

373382
@cli.command("teardown")
374-
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
383+
@click.argument(
384+
"repo_path", type=click.Path(dir_okay=True, exists=True), default=Path.cwd
385+
)
375386
def teardown_command(repo_path: str):
376387
"""
377388
Tear down infra for a feature repo
378389
"""
390+
cli_check_repo(Path(repo_path))
379391
repo_config = load_repo_config(Path(repo_path))
380392

381393
teardown(repo_config, Path(repo_path).resolve())
@@ -393,13 +405,15 @@ def registry_dump_command(repo_path: str):
393405

394406

395407
@cli.command("materialize")
396-
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
397408
@click.argument("start_ts")
398409
@click.argument("end_ts")
410+
@click.argument(
411+
"repo_path", type=click.Path(dir_okay=True, exists=True,), default=Path.cwd
412+
)
399413
@click.option(
400414
"--views", "-v", help="Feature views to materialize", multiple=True,
401415
)
402-
def materialize_command(repo_path: str, start_ts: str, end_ts: str, views: List[str]):
416+
def materialize_command(start_ts: str, end_ts: str, repo_path: str, views: List[str]):
403417
"""
404418
Run a (non-incremental) materialization job to ingest data into the online store. Feast
405419
will read all data between START_TS and END_TS from the offline store and write it to the
@@ -416,5 +430,12 @@ def materialize_command(repo_path: str, start_ts: str, end_ts: str, views: List[
416430
)
417431

418432

433+
@cli.command("init")
434+
@click.option("--minimal", "-m", is_flag=True, help="Only generate the config")
435+
def init_command(minimal: bool):
436+
repo_path = Path.cwd()
437+
init_repo(repo_path, minimal)
438+
439+
419440
if __name__ == "__main__":
420441
cli()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# This module generates dummy data to be used for tests and examples.
12
import numpy as np
23
import pandas as pd
34

sdk/python/feast/example_repo.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# This is an example feature definition file
2+
3+
from google.protobuf.duration_pb2 import Duration
4+
5+
from feast import Entity, Feature, FeatureView, ValueType
6+
from feast.data_source import FileSource
7+
8+
# Read data from parquet files. Parquet is convenient for local development mode. For
9+
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
10+
# for more info.
11+
driver_hourly_stats = FileSource(
12+
path="%PARQUET_PATH%",
13+
event_timestamp_column="datetime",
14+
created_timestamp_column="created",
15+
)
16+
17+
# Define an entity for the driver. You can think of entity as a primary key used to
18+
# fetch features.
19+
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)
20+
21+
# Our parquet files contain sample data that includes a driver_id column, timestamps and
22+
# three feature column. Here we define a Feature View that will allow us to serve this
23+
# data to our model online.
24+
driver_hourly_stats_view = FeatureView(
25+
name="driver_hourly_stats",
26+
entities=["driver_id"],
27+
ttl=Duration(seconds=86400 * 1),
28+
features=[
29+
Feature(name="conv_rate", dtype=ValueType.FLOAT),
30+
Feature(name="acc_rate", dtype=ValueType.FLOAT),
31+
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
32+
],
33+
online=True,
34+
input=driver_hourly_stats,
35+
tags={},
36+
)

sdk/python/feast/repo_config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ def __repr__(self) -> str:
9797

9898
def load_repo_config(repo_path: Path) -> RepoConfig:
9999
config_path = repo_path / "feature_store.yaml"
100+
100101
with open(config_path) as f:
101102
raw_config = yaml.safe_load(f)
102103
try:

sdk/python/feast/repo_operations.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22
import os
33
import random
44
import sys
5+
from datetime import datetime, timedelta
56
from pathlib import Path
7+
from textwrap import dedent
68
from typing import List, NamedTuple, Union
79

810
from feast import Entity, FeatureTable
11+
from feast.driver_test_data import create_driver_hourly_stats_df
912
from feast.feature_view import FeatureView
1013
from feast.infra.provider import get_provider
1114
from feast.names import adjectives, animals
@@ -136,6 +139,83 @@ def registry_dump(repo_config: RepoConfig):
136139
print(table)
137140

138141

142+
def cli_check_repo(repo_path: Path):
143+
config_path = repo_path / "feature_store.yaml"
144+
if not config_path.exists():
145+
print(
146+
f"Can't find feature_store.yaml at {repo_path}. Make sure you're running this command in an initialized feast repository."
147+
)
148+
sys.exit(1)
149+
150+
151+
def init_repo(repo_path: Path, minimal: bool):
152+
153+
repo_config = repo_path / "feature_store.yaml"
154+
155+
if repo_config.exists():
156+
print("Feature repository is already initalized, nothing to do.")
157+
sys.exit(1)
158+
159+
project_id = generate_project_name()
160+
161+
if minimal:
162+
repo_config.write_text(
163+
dedent(
164+
f"""
165+
project: {project_id}
166+
metadata_store: /path/to/metadata.db
167+
provider: local
168+
online_store:
169+
local:
170+
path: /path/to/online_store.db
171+
"""
172+
)
173+
)
174+
print(
175+
"Generated example feature_store.yaml. Please edit metadata_store and online_store"
176+
"location before running apply"
177+
)
178+
179+
else:
180+
example_py = (Path(__file__).parent / "example_repo.py").read_text()
181+
182+
data_path = repo_path / "data"
183+
data_path.mkdir(exist_ok=True)
184+
185+
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
186+
start_date = end_date - timedelta(days=15)
187+
188+
driver_entities = [1001, 1002, 1003, 1004, 1005]
189+
driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date)
190+
191+
driver_stats_path = data_path / "driver_stats.parquet"
192+
driver_df.to_parquet(
193+
path=str(driver_stats_path), allow_truncated_timestamps=True
194+
)
195+
196+
with open(repo_path / "example.py", "wt") as f:
197+
f.write(example_py.replace("%PARQUET_PATH%", str(driver_stats_path)))
198+
199+
# Generate config
200+
repo_config.write_text(
201+
dedent(
202+
f"""
203+
project: {project_id}
204+
metadata_store: {"data/metadata.db"}
205+
provider: local
206+
online_store:
207+
local:
208+
path: {"data/online_store.db"}
209+
"""
210+
)
211+
)
212+
213+
print("Generated feature_store.yaml and example features in example_repo.py")
214+
print(
215+
"Now try runing `feast apply` to apply, or `feast materialize` to sync data to the online store"
216+
)
217+
218+
139219
def generate_project_name() -> str:
140220
"""Generates a unique project name"""
141221
return f"{random.choice(adjectives)}_{random.choice(animals)}"

sdk/python/tests/cli/test_e2e_local.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import pandas as pd
77

8-
import tests.driver_test_data as driver_data
8+
import feast.driver_test_data as driver_data
99
from tests.cli.utils import CliRunner, get_example_repo
1010

1111

@@ -57,9 +57,9 @@ def test_basic(self) -> None:
5757
r = runner.run(
5858
[
5959
"materialize",
60-
str(store.repo_path),
6160
start_date.isoformat(),
6261
end_date.isoformat(),
62+
str(store.repo_path),
6363
],
6464
cwd=Path(store.repo_path),
6565
)

sdk/python/tests/test_historical_retrieval.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from google.cloud import bigquery
1010
from pandas.testing import assert_frame_equal
1111

12-
import tests.driver_test_data as driver_data
12+
import feast.driver_test_data as driver_data
1313
from feast.data_source import BigQuerySource, FileSource
1414
from feast.entity import Entity
1515
from feast.feature import Feature

sdk/python/tests/test_init.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import tempfile
2+
from datetime import datetime, timedelta
3+
from pathlib import Path
4+
5+
from tests.cli.utils import CliRunner
6+
7+
8+
def test_repo_init() -> None:
9+
"""
10+
This test simply makes sure that you can run `feast apply && feast materialize` on
11+
the repo created by "feast init" without errors.
12+
"""
13+
runner = CliRunner()
14+
with tempfile.TemporaryDirectory() as repo_dir_name:
15+
repo_path = Path(repo_dir_name)
16+
result = runner.run(["init"], cwd=repo_path)
17+
assert result.returncode == 0
18+
result = runner.run(["apply"], cwd=repo_path)
19+
assert result.returncode == 0
20+
21+
end_date = datetime.utcnow()
22+
start_date = end_date - timedelta(days=100)
23+
result = runner.run(
24+
["materialize", start_date.isoformat(), end_date.isoformat()], cwd=repo_path
25+
)
26+
assert result.returncode == 0

0 commit comments

Comments
 (0)