Skip to content

Commit 5d0f37b

Browse files
authored
Use set when parsing repos to prevent duplicates (feast-dev#1913)
* Use set when parsing repos to prevent duplicates Signed-off-by: Achal Shah <achals@gmail.com> * Undo feast-dev#1905 changes Signed-off-by: Achal Shah <achals@gmail.com> * use id() in addition to name to differentiate different objects with the same name Signed-off-by: Achal Shah <achals@gmail.com> * add a test Signed-off-by: Achal Shah <achals@gmail.com> * add one more test Signed-off-by: Achal Shah <achals@gmail.com> * remove debugging Signed-off-by: Achal Shah <achals@gmail.com>
1 parent ce5a130 commit 5d0f37b

9 files changed

Lines changed: 143 additions & 85 deletions

File tree

sdk/python/feast/entity.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,13 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import inspect
1514
from datetime import datetime
1615
from typing import Dict, Optional
1716

1817
import yaml
1918
from google.protobuf import json_format
2019
from google.protobuf.json_format import MessageToDict, MessageToJson
2120

22-
from feast.importer import get_calling_file_name
2321
from feast.loaders import yaml as feast_yaml
2422
from feast.protos.feast.core.Entity_pb2 import Entity as EntityV2Proto
2523
from feast.protos.feast.core.Entity_pb2 import EntityMeta as EntityMetaProto
@@ -50,8 +48,6 @@ class Entity:
5048
_created_timestamp: Optional[datetime]
5149
_last_updated_timestamp: Optional[datetime]
5250

53-
defined_in: str
54-
5551
@log_exceptions
5652
def __init__(
5753
self,
@@ -78,7 +74,8 @@ def __init__(
7874
self._created_timestamp: Optional[datetime] = None
7975
self._last_updated_timestamp: Optional[datetime] = None
8076

81-
self.defined_in = get_calling_file_name(inspect.stack())
77+
def __hash__(self) -> int:
78+
return hash((id(self), self.name))
8279

8380
def __eq__(self, other):
8481
if not isinstance(other, Entity):

sdk/python/feast/feature_service.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import inspect
21
from datetime import datetime
32
from typing import Dict, List, Optional, Union
43

@@ -7,7 +6,6 @@
76
from feast.feature_table import FeatureTable
87
from feast.feature_view import FeatureView
98
from feast.feature_view_projection import FeatureViewProjection
10-
from feast.importer import get_calling_file_name
119
from feast.on_demand_feature_view import OnDemandFeatureView
1210
from feast.protos.feast.core.FeatureService_pb2 import (
1311
FeatureService as FeatureServiceProto,
@@ -39,8 +37,6 @@ class FeatureService:
3937
created_timestamp: Optional[datetime] = None
4038
last_updated_timestamp: Optional[datetime] = None
4139

42-
defined_in: str
43-
4440
@log_exceptions
4541
def __init__(
4642
self,
@@ -75,8 +71,6 @@ def __init__(
7571
self.created_timestamp = None
7672
self.last_updated_timestamp = None
7773

78-
self.defined_in = get_calling_file_name(inspect.stack())
79-
8074
def __repr__(self):
8175
items = (f"{k} = {v}" for k, v in self.__dict__.items())
8276
return f"<{self.__class__.__name__}({', '.join(items)})>"
@@ -85,7 +79,7 @@ def __str__(self):
8579
return str(MessageToJson(self.to_proto()))
8680

8781
def __hash__(self):
88-
return hash(self.name)
82+
return hash((id(self), self.name))
8983

9084
def __eq__(self, other):
9185
if not isinstance(other, FeatureService):

sdk/python/feast/feature_table.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import inspect
1514
from typing import Dict, List, MutableMapping, Optional, Union
1615

1716
import yaml
@@ -22,7 +21,6 @@
2221

2322
from feast.data_source import DataSource, KafkaSource, KinesisSource
2423
from feast.feature import Feature
25-
from feast.importer import get_calling_file_name
2624
from feast.loaders import yaml as feast_yaml
2725
from feast.protos.feast.core.FeatureTable_pb2 import FeatureTable as FeatureTableProto
2826
from feast.protos.feast.core.FeatureTable_pb2 import (
@@ -67,11 +65,12 @@ def __init__(
6765
self._created_timestamp: Optional[Timestamp] = None
6866
self._last_updated_timestamp: Optional[Timestamp] = None
6967

70-
self.defined_in = get_calling_file_name(inspect.stack())
71-
7268
def __str__(self):
7369
return str(MessageToJson(self.to_proto()))
7470

71+
def __hash__(self) -> int:
72+
return hash((id(self), self.name))
73+
7574
def __eq__(self, other):
7675
if not isinstance(other, FeatureTable):
7776
raise TypeError(

sdk/python/feast/feature_view.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import inspect
1514
import re
1615
import warnings
1716
from datetime import datetime, timedelta
@@ -25,7 +24,6 @@
2524
from feast.errors import RegistryInferenceFailure
2625
from feast.feature import Feature
2726
from feast.feature_view_projection import FeatureViewProjection
28-
from feast.importer import get_calling_file_name
2927
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
3028
from feast.protos.feast.core.FeatureView_pb2 import (
3129
FeatureViewMeta as FeatureViewMetaProto,
@@ -81,8 +79,6 @@ class FeatureView:
8179
last_updated_timestamp: Optional[datetime] = None
8280
materialization_intervals: List[Tuple[datetime, datetime]]
8381

84-
defined_in: str
85-
8682
@log_exceptions
8783
def __init__(
8884
self,
@@ -145,8 +141,6 @@ def __init__(
145141
self.created_timestamp: Optional[datetime] = None
146142
self.last_updated_timestamp: Optional[datetime] = None
147143

148-
self.defined_in = get_calling_file_name(inspect.stack())
149-
150144
def __repr__(self):
151145
items = (f"{k} = {v}" for k, v in self.__dict__.items())
152146
return f"<{self.__class__.__name__}({', '.join(items)})>"
@@ -155,7 +149,7 @@ def __str__(self):
155149
return str(MessageToJson(self.to_proto()))
156150

157151
def __hash__(self):
158-
return hash(self.name)
152+
return hash((id(self), self.name))
159153

160154
def __getitem__(self, item) -> FeatureViewProjection:
161155
assert isinstance(item, list)

sdk/python/feast/importer.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,6 @@
33
from feast import errors
44

55

6-
def get_calling_file_name(stack) -> str:
7-
# Get two levels up from current, to ignore usage.py
8-
previous_stack_frame = stack[1]
9-
if "feast/usage.py" in previous_stack_frame.filename:
10-
previous_stack_frame = stack[2]
11-
return previous_stack_frame.filename
12-
13-
146
def get_class_from_type(module_name: str, class_name: str, class_type: str):
157
if not class_name.endswith(class_type):
168
raise errors.FeastClassInvalidName(class_name, class_type)

sdk/python/feast/on_demand_feature_view.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import functools
2-
import inspect
32
from types import MethodType
43
from typing import Dict, List, Union, cast
54

@@ -12,7 +11,6 @@
1211
from feast.feature import Feature
1312
from feast.feature_view import FeatureView
1413
from feast.feature_view_projection import FeatureViewProjection
15-
from feast.importer import get_calling_file_name
1614
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
1715
OnDemandFeatureView as OnDemandFeatureViewProto,
1816
)
@@ -48,8 +46,6 @@ class OnDemandFeatureView:
4846
inputs: Dict[str, Union[FeatureView, RequestDataSource]]
4947
udf: MethodType
5048

51-
defined_in: str
52-
5349
@log_exceptions
5450
def __init__(
5551
self,
@@ -67,7 +63,8 @@ def __init__(
6763
self.inputs = inputs
6864
self.udf = udf
6965

70-
self.defined_in = get_calling_file_name(inspect.stack())
66+
def __hash__(self) -> int:
67+
return hash((id(self), self.name))
7168

7269
def to_proto(self) -> OnDemandFeatureViewProto:
7370
"""

sdk/python/feast/repo_operations.py

Lines changed: 38 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ def py_path_to_module(path: Path, repo_root: Path) -> str:
3131

3232

3333
class ParsedRepo(NamedTuple):
34-
feature_tables: List[FeatureTable]
35-
feature_views: List[FeatureView]
36-
on_demand_feature_views: List[OnDemandFeatureView]
37-
entities: List[Entity]
38-
feature_services: List[FeatureService]
34+
feature_tables: Set[FeatureTable]
35+
feature_views: Set[FeatureView]
36+
on_demand_feature_views: Set[OnDemandFeatureView]
37+
entities: Set[Entity]
38+
feature_services: Set[FeatureService]
3939

4040

4141
def read_feastignore(repo_root: Path) -> List[str]:
@@ -94,11 +94,11 @@ def get_repo_files(repo_root: Path) -> List[Path]:
9494
def parse_repo(repo_root: Path) -> ParsedRepo:
9595
""" Collect feature table definitions from feature repo """
9696
res = ParsedRepo(
97-
feature_tables=[],
98-
entities=[],
99-
feature_views=[],
100-
feature_services=[],
101-
on_demand_feature_views=[],
97+
feature_tables=set(),
98+
entities=set(),
99+
feature_views=set(),
100+
feature_services=set(),
101+
on_demand_feature_views=set(),
102102
)
103103

104104
for repo_file in get_repo_files(repo_root):
@@ -107,25 +107,15 @@ def parse_repo(repo_root: Path) -> ParsedRepo:
107107
for attr_name in dir(module):
108108
obj = getattr(module, attr_name)
109109
if isinstance(obj, FeatureTable):
110-
assert obj.defined_in is not None
111-
if obj.defined_in == module.__file__:
112-
res.feature_tables.append(obj)
110+
res.feature_tables.add(obj)
113111
if isinstance(obj, FeatureView):
114-
assert obj.defined_in is not None
115-
if obj.defined_in == module.__file__:
116-
res.feature_views.append(obj)
112+
res.feature_views.add(obj)
117113
elif isinstance(obj, Entity):
118-
assert obj.defined_in is not None
119-
if obj.defined_in == module.__file__:
120-
res.entities.append(obj)
114+
res.entities.add(obj)
121115
elif isinstance(obj, FeatureService):
122-
assert obj.defined_in is not None
123-
if obj.defined_in == module.__file__:
124-
res.feature_services.append(obj)
116+
res.feature_services.add(obj)
125117
elif isinstance(obj, OnDemandFeatureView):
126-
assert obj.defined_in is not None
127-
if obj.defined_in == module.__file__:
128-
res.on_demand_feature_views.append(obj)
118+
res.on_demand_feature_views.add(obj)
129119
return res
130120

131121

@@ -146,7 +136,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
146136
registry._initialize_registry()
147137
sys.dont_write_bytecode = True
148138
repo = parse_repo(repo_path)
149-
_validate_feature_views(repo.feature_views)
139+
_validate_feature_views(list(repo.feature_views))
150140

151141
if not skip_source_validation:
152142
data_sources = [t.batch_source for t in repo.feature_views]
@@ -259,8 +249,8 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
259249
project,
260250
tables_to_delete=all_to_delete,
261251
tables_to_keep=all_to_keep,
262-
entities_to_delete=entities_to_delete,
263-
entities_to_keep=entities_to_keep,
252+
entities_to_delete=list(entities_to_delete),
253+
entities_to_keep=list(entities_to_keep),
264254
partial=False,
265255
)
266256

@@ -270,63 +260,63 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
270260

271261
def _tag_registry_entities_for_keep_delete(
272262
project: str, registry: Registry, repo: ParsedRepo
273-
) -> Tuple[List[Entity], List[Entity]]:
274-
entities_to_keep: List[Entity] = repo.entities
275-
entities_to_delete: List[Entity] = []
263+
) -> Tuple[Set[Entity], Set[Entity]]:
264+
entities_to_keep: Set[Entity] = repo.entities
265+
entities_to_delete: Set[Entity] = set()
276266
repo_entities_names = set([e.name for e in repo.entities])
277267
for registry_entity in registry.list_entities(project=project):
278268
if registry_entity.name not in repo_entities_names:
279-
entities_to_delete.append(registry_entity)
269+
entities_to_delete.add(registry_entity)
280270
return entities_to_keep, entities_to_delete
281271

282272

283273
def _tag_registry_views_for_keep_delete(
284274
project: str, registry: Registry, repo: ParsedRepo
285-
) -> Tuple[List[FeatureView], List[FeatureView]]:
286-
views_to_keep: List[FeatureView] = repo.feature_views
287-
views_to_delete: List[FeatureView] = []
275+
) -> Tuple[Set[FeatureView], Set[FeatureView]]:
276+
views_to_keep: Set[FeatureView] = repo.feature_views
277+
views_to_delete: Set[FeatureView] = set()
288278
repo_feature_view_names = set(t.name for t in repo.feature_views)
289279
for registry_view in registry.list_feature_views(project=project):
290280
if registry_view.name not in repo_feature_view_names:
291-
views_to_delete.append(registry_view)
281+
views_to_delete.add(registry_view)
292282
return views_to_keep, views_to_delete
293283

294284

295285
def _tag_registry_on_demand_feature_views_for_keep_delete(
296286
project: str, registry: Registry, repo: ParsedRepo
297-
) -> Tuple[List[OnDemandFeatureView], List[OnDemandFeatureView]]:
298-
odfvs_to_keep: List[OnDemandFeatureView] = repo.on_demand_feature_views
299-
odfvs_to_delete: List[OnDemandFeatureView] = []
287+
) -> Tuple[Set[OnDemandFeatureView], Set[OnDemandFeatureView]]:
288+
odfvs_to_keep: Set[OnDemandFeatureView] = repo.on_demand_feature_views
289+
odfvs_to_delete: Set[OnDemandFeatureView] = set()
300290
repo_on_demand_feature_view_names = set(
301291
t.name for t in repo.on_demand_feature_views
302292
)
303293
for registry_odfv in registry.list_on_demand_feature_views(project=project):
304294
if registry_odfv.name not in repo_on_demand_feature_view_names:
305-
odfvs_to_delete.append(registry_odfv)
295+
odfvs_to_delete.add(registry_odfv)
306296
return odfvs_to_keep, odfvs_to_delete
307297

308298

309299
def _tag_registry_tables_for_keep_delete(
310300
project: str, registry: Registry, repo: ParsedRepo
311-
) -> Tuple[List[FeatureTable], List[FeatureTable]]:
312-
tables_to_keep: List[FeatureTable] = repo.feature_tables
313-
tables_to_delete: List[FeatureTable] = []
301+
) -> Tuple[Set[FeatureTable], Set[FeatureTable]]:
302+
tables_to_keep: Set[FeatureTable] = repo.feature_tables
303+
tables_to_delete: Set[FeatureTable] = set()
314304
repo_table_names = set(t.name for t in repo.feature_tables)
315305
for registry_table in registry.list_feature_tables(project=project):
316306
if registry_table.name not in repo_table_names:
317-
tables_to_delete.append(registry_table)
307+
tables_to_delete.add(registry_table)
318308
return tables_to_keep, tables_to_delete
319309

320310

321311
def _tag_registry_services_for_keep_delete(
322312
project: str, registry: Registry, repo: ParsedRepo
323-
) -> Tuple[List[FeatureService], List[FeatureService]]:
324-
services_to_keep: List[FeatureService] = repo.feature_services
325-
services_to_delete: List[FeatureService] = []
313+
) -> Tuple[Set[FeatureService], Set[FeatureService]]:
314+
services_to_keep: Set[FeatureService] = repo.feature_services
315+
services_to_delete: Set[FeatureService] = set()
326316
repo_feature_service_names = set(t.name for t in repo.feature_services)
327317
for registry_service in registry.list_feature_services(project=project):
328318
if registry_service.name not in repo_feature_service_names:
329-
services_to_delete.append(registry_service)
319+
services_to_delete.add(registry_service)
330320
return services_to_keep, services_to_delete
331321

332322

0 commit comments

Comments
 (0)