Skip to content

Commit 9ec90f4

Browse files
authored
Improve exception handling, logging, and validation (feast-dev#1477)
* Add logging to apply method Signed-off-by: Willem Pienaar <git@willem.co> * Linting changes Signed-off-by: Willem Pienaar <git@willem.co> * Add error message when missing authentication for GCP provider Signed-off-by: Willem Pienaar <git@willem.co> * Add proper error message for missing feature views in historical retrieval Signed-off-by: Willem Pienaar <git@willem.co> * Add proper error message for credential error for bigquery client Signed-off-by: Willem Pienaar <git@willem.co> * Add proper gcp project missing error for bigquery client Signed-off-by: Willem Pienaar <git@willem.co> * Ensure all apply commands initialize a feature repository Signed-off-by: Willem Pienaar <git@willem.co> * Ensure FeatureStore object can be run from outside a repository Signed-off-by: Willem Pienaar <git@willem.co> * Fix linting Signed-off-by: Willem Pienaar <git@willem.co> * Fix missing sqlite database when using relative paths Signed-off-by: Willem Pienaar <git@willem.co> * Small tweaks based on PR feedback Signed-off-by: Willem Pienaar <git@willem.co>
1 parent b8ef24e commit 9ec90f4

12 files changed

Lines changed: 230 additions & 68 deletions

File tree

sdk/python/feast/cli.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import pkg_resources
2222
import yaml
2323

24-
from feast.errors import FeastObjectNotFoundException
24+
from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError
2525
from feast.feature_store import FeatureStore
2626
from feast.repo_config import load_repo_config
2727
from feast.repo_operations import (
@@ -156,8 +156,10 @@ def apply_total_command():
156156
"""
157157
cli_check_repo(Path.cwd())
158158
repo_config = load_repo_config(Path.cwd())
159-
160-
apply_total(repo_config, Path.cwd())
159+
try:
160+
apply_total(repo_config, Path.cwd())
161+
except FeastProviderLoginError as e:
162+
print(str(e))
161163

162164

163165
@cli.command("teardown")
@@ -179,7 +181,7 @@ def registry_dump_command():
179181
cli_check_repo(Path.cwd())
180182
repo_config = load_repo_config(Path.cwd())
181183

182-
registry_dump(repo_config)
184+
registry_dump(repo_config, repo_path=Path.cwd())
183185

184186

185187
@cli.command("materialize")
@@ -244,14 +246,6 @@ def init_command(project_directory, minimal: bool, template: str):
244246
"""Create a new Feast repository"""
245247
if not project_directory:
246248
project_directory = generate_project_name()
247-
if template and minimal:
248-
from colorama import Fore, Style
249-
250-
click.echo(
251-
f"Please select either a {Style.BRIGHT + Fore.GREEN}template{Style.RESET_ALL} or "
252-
f"{Style.BRIGHT + Fore.GREEN}minimal{Style.RESET_ALL}, not both"
253-
)
254-
exit(1)
255249

256250
if minimal:
257251
template = "minimal"

sdk/python/feast/errors.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,30 @@ class FeastObjectNotFoundException(Exception):
33

44

55
class EntityNotFoundException(FeastObjectNotFoundException):
6-
def __init__(self, project, name):
7-
super().__init__(f"Entity {name} does not exist in project {project}")
6+
def __init__(self, name, project=None):
7+
if project:
8+
super().__init__(f"Entity {name} does not exist in project {project}")
9+
else:
10+
super().__init__(f"Entity {name} does not exist")
811

912

1013
class FeatureViewNotFoundException(FeastObjectNotFoundException):
11-
def __init__(self, project, name):
12-
super().__init__(f"Feature view {name} does not exist in project {project}")
14+
def __init__(self, name, project=None):
15+
if project:
16+
super().__init__(f"Feature view {name} does not exist in project {project}")
17+
else:
18+
super().__init__(f"Feature view {name} does not exist")
1319

1420

1521
class FeatureTableNotFoundException(FeastObjectNotFoundException):
16-
def __init__(self, project, name):
17-
super().__init__(f"Feature table {name} does not exist in project {project}")
22+
def __init__(self, name, project=None):
23+
if project:
24+
super().__init__(
25+
f"Feature table {name} does not exist in project {project}"
26+
)
27+
else:
28+
super().__init__(f"Feature table {name} does not exist")
29+
30+
31+
class FeastProviderLoginError(Exception):
32+
"""Error class that indicates a user has not authenticated with their provider."""

sdk/python/feast/feature_store.py

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import os
15+
import sys
1416
from collections import OrderedDict, defaultdict
1517
from datetime import datetime, timedelta
1618
from pathlib import Path
@@ -21,6 +23,7 @@
2123

2224
from feast import utils
2325
from feast.entity import Entity
26+
from feast.errors import FeastProviderLoginError, FeatureViewNotFoundException
2427
from feast.feature_view import FeatureView
2528
from feast.infra.provider import Provider, RetrievalJob, get_provider
2629
from feast.online_response import OnlineResponse, _infer_online_entity_rows
@@ -41,7 +44,7 @@ class FeatureStore:
4144
"""
4245

4346
config: RepoConfig
44-
repo_path: Optional[str]
47+
repo_path: Path
4548
_registry: Registry
4649

4750
def __init__(
@@ -53,19 +56,21 @@ def __init__(
5356
repo_path: Path to a `feature_store.yaml` used to configure the feature store
5457
config (RepoConfig): Configuration object used to configure the feature store
5558
"""
56-
self.repo_path = repo_path
5759
if repo_path is not None and config is not None:
5860
raise ValueError("You cannot specify both repo_path and config")
5961
if config is not None:
62+
self.repo_path = Path(os.getcwd())
6063
self.config = config
6164
elif repo_path is not None:
65+
self.repo_path = Path(repo_path)
6266
self.config = load_repo_config(Path(repo_path))
6367
else:
6468
raise ValueError("Please specify one of repo_path or config")
6569

6670
registry_config = self.config.get_registry_config()
6771
self._registry = Registry(
6872
registry_path=registry_config.path,
73+
repo_path=self.repo_path,
6974
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
7075
)
7176
self._tele = Telemetry()
@@ -80,7 +85,8 @@ def project(self) -> str:
8085
return self.config.project
8186

8287
def _get_provider(self) -> Provider:
83-
return get_provider(self.config)
88+
# TODO: Bake self.repo_path into self.config so that we dont only have one interface to paths
89+
return get_provider(self.config, self.repo_path)
8490

8591
def refresh_registry(self):
8692
"""Fetches and caches a copy of the feature registry in memory.
@@ -101,6 +107,7 @@ def refresh_registry(self):
101107
registry_config = self.config.get_registry_config()
102108
self._registry = Registry(
103109
registry_path=registry_config.path,
110+
repo_path=self.repo_path,
104111
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
105112
)
106113
self._registry.refresh()
@@ -271,16 +278,26 @@ def get_historical_features(
271278
all_feature_views = self._registry.list_feature_views(
272279
project=self.config.project
273280
)
274-
feature_views = _get_requested_feature_views(feature_refs, all_feature_views)
281+
try:
282+
feature_views = _get_requested_feature_views(
283+
feature_refs, all_feature_views
284+
)
285+
except FeatureViewNotFoundException as e:
286+
sys.exit(e)
287+
275288
provider = self._get_provider()
276-
job = provider.get_historical_features(
277-
self.config,
278-
feature_views,
279-
feature_refs,
280-
entity_df,
281-
self._registry,
282-
self.project,
283-
)
289+
try:
290+
job = provider.get_historical_features(
291+
self.config,
292+
feature_views,
293+
feature_refs,
294+
entity_df,
295+
self._registry,
296+
self.project,
297+
)
298+
except FeastProviderLoginError as e:
299+
sys.exit(e)
300+
284301
return job
285302

286303
def materialize_incremental(
@@ -529,7 +546,7 @@ def _group_refs(
529546
for ref in feature_refs:
530547
view_name, feat_name = ref.split(":")
531548
if view_name not in view_index:
532-
raise ValueError(f"Could not find feature view from reference {ref}")
549+
raise FeatureViewNotFoundException(view_name)
533550
views_features[view_name].append(feat_name)
534551

535552
result = []

sdk/python/feast/infra/gcp.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
import mmh3
77
import pandas
88
import pyarrow
9+
from google.auth.exceptions import DefaultCredentialsError
910

1011
from feast import FeatureTable, utils
12+
from feast.errors import FeastProviderLoginError
1113
from feast.feature_view import FeatureView
1214
from feast.infra.key_encoding_utils import serialize_entity_key
1315
from feast.infra.offline_stores.helpers import get_offline_store_from_sources
@@ -37,10 +39,16 @@ def __init__(self, config: RepoConfig):
3739
def _initialize_client(self):
3840
from google.cloud import datastore
3941

40-
if self._gcp_project_id is not None:
41-
return datastore.Client(self._gcp_project_id)
42-
else:
43-
return datastore.Client()
42+
try:
43+
if self._gcp_project_id is not None:
44+
return datastore.Client(self._gcp_project_id)
45+
else:
46+
return datastore.Client()
47+
except DefaultCredentialsError as e:
48+
raise FeastProviderLoginError(
49+
str(e)
50+
+ '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your local Google Cloud account'
51+
)
4452

4553
def update_infra(
4654
self,

sdk/python/feast/infra/local.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,19 @@
2525

2626

2727
class LocalProvider(Provider):
28-
_db_path: str
28+
_db_path: Path
2929

30-
def __init__(self, config: RepoConfig):
30+
def __init__(self, config: RepoConfig, repo_path: Path):
3131

3232
assert config is not None
3333
assert config.online_store is not None
3434
local_online_store_config = config.online_store
3535
assert isinstance(local_online_store_config, SqliteOnlineStoreConfig)
36-
self._db_path = local_online_store_config.path
36+
local_path = Path(local_online_store_config.path)
37+
if local_path.is_absolute():
38+
self._db_path = local_path
39+
else:
40+
self._db_path = repo_path.joinpath(local_path)
3741

3842
def _get_conn(self):
3943
Path(self._db_path).parent.mkdir(exist_ok=True)

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55

66
import pandas
77
import pyarrow
8+
from google.auth.exceptions import DefaultCredentialsError
89
from google.cloud import bigquery
910
from jinja2 import BaseLoader, Environment
1011

1112
from feast.data_source import BigQuerySource, DataSource
13+
from feast.errors import FeastProviderLoginError
1214
from feast.feature_view import FeatureView
1315
from feast.infra.offline_stores.offline_store import OfflineStore
1416
from feast.infra.provider import (
@@ -59,9 +61,7 @@ def pull_latest_from_table_or_query(
5961

6062
@staticmethod
6163
def _pull_query(query: str) -> pyarrow.Table:
62-
from google.cloud import bigquery
63-
64-
client = bigquery.Client()
64+
client = _get_bigquery_client()
6565
query_job = client.query(query)
6666
return query_job.to_arrow()
6767

@@ -76,14 +76,18 @@ def get_historical_features(
7676
) -> RetrievalJob:
7777
# TODO: Add entity_df validation in order to fail before interacting with BigQuery
7878

79+
client = _get_bigquery_client()
80+
7981
if type(entity_df) is str:
8082
entity_df_sql_table = f"({entity_df})"
8183
elif isinstance(entity_df, pandas.DataFrame):
8284
if "event_timestamp" not in entity_df.columns:
8385
raise ValueError(
8486
"Please provide an entity_df with a column named event_timestamp representing the time of events."
8587
)
86-
table_id = _upload_entity_df_into_bigquery(config.project, entity_df)
88+
table_id = _upload_entity_df_into_bigquery(
89+
config.project, entity_df, client
90+
)
8791
entity_df_sql_table = f"`{table_id}`"
8892
else:
8993
raise ValueError(
@@ -104,18 +108,19 @@ def get_historical_features(
104108
max_timestamp=datetime.now() + timedelta(days=1),
105109
left_table_query_string=entity_df_sql_table,
106110
)
107-
job = BigQueryRetrievalJob(query=query)
111+
112+
job = BigQueryRetrievalJob(query=query, client=client)
108113
return job
109114

110115

111116
class BigQueryRetrievalJob(RetrievalJob):
112-
def __init__(self, query):
117+
def __init__(self, query, client):
113118
self.query = query
119+
self.client = client
114120

115121
def to_df(self):
116122
# TODO: Ideally only start this job when the user runs "get_historical_features", not when they run to_df()
117-
client = bigquery.Client()
118-
df = client.query(self.query).to_dataframe(create_bqstorage_client=True)
123+
df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True)
119124
return df
120125

121126

@@ -135,9 +140,8 @@ class FeatureViewQueryContext:
135140
entity_selections: List[str]
136141

137142

138-
def _upload_entity_df_into_bigquery(project, entity_df) -> str:
143+
def _upload_entity_df_into_bigquery(project, entity_df, client) -> str:
139144
"""Uploads a Pandas entity dataframe into a BigQuery table and returns a reference to the resulting table"""
140-
client = bigquery.Client()
141145

142146
# First create the BigQuery dataset if it doesn't exist
143147
dataset = bigquery.Dataset(f"{client.project}.feast_{project}")
@@ -244,6 +248,28 @@ def build_point_in_time_query(
244248
return query
245249

246250

251+
def _get_bigquery_client():
252+
try:
253+
from google.cloud import bigquery
254+
255+
client = bigquery.Client()
256+
except DefaultCredentialsError as e:
257+
raise FeastProviderLoginError(
258+
str(e)
259+
+ '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your '
260+
"local Google Cloud account"
261+
)
262+
except EnvironmentError as e:
263+
raise FeastProviderLoginError(
264+
"GCP error: "
265+
+ str(e)
266+
+ "\nIt may be necessary to set a default GCP project by running "
267+
'"gcloud config set project your-project"'
268+
)
269+
270+
return client
271+
272+
247273
# TODO: Optimizations
248274
# * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly
249275
# * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe

sdk/python/feast/infra/provider.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import abc
22
from datetime import datetime
3+
from pathlib import Path
34
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
45

56
import pandas
@@ -123,15 +124,15 @@ def online_read(
123124
...
124125

125126

126-
def get_provider(config: RepoConfig) -> Provider:
127+
def get_provider(config: RepoConfig, repo_path: Path) -> Provider:
127128
if config.provider == "gcp":
128129
from feast.infra.gcp import GcpProvider
129130

130131
return GcpProvider(config)
131132
elif config.provider == "local":
132133
from feast.infra.local import LocalProvider
133134

134-
return LocalProvider(config)
135+
return LocalProvider(config, repo_path)
135136
else:
136137
raise ValueError(config)
137138

0 commit comments

Comments
 (0)