-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathmatcher.py
More file actions
129 lines (106 loc) · 4.54 KB
/
matcher.py
File metadata and controls
129 lines (106 loc) · 4.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
This module provides utility matching functions.
"""
import logging
import re
from typing import TYPE_CHECKING, Any, Optional
from unittest.mock import Mock
from feast.permissions.action import AuthzedAction
if TYPE_CHECKING:
from feast.feast_object import FeastObject
logger = logging.getLogger(__name__)
def is_a_feast_object(resource: Any):
"""
A matcher to verify that a given object is one of the Feast objects defined in the `FeastObject` type.
Args:
resource: An object instance to verify.
Returns:
`True` if the given object is one of the types in the FeastObject alias or a subclass of one of them.
"""
from feast.feast_object import ALL_RESOURCE_TYPES
for t in ALL_RESOURCE_TYPES:
# Use isinstance to pass Mock validation
if isinstance(resource, t):
return True
return False
def _get_type(resource: "FeastObject") -> Any:
is_mock = isinstance(resource, Mock)
if not is_mock:
return type(resource)
else:
return getattr(resource, "_spec_class", None)
def resource_match_config(
resource: "FeastObject",
expected_types: list["FeastObject"],
name_pattern: Optional[str] = None,
required_tags: Optional[dict[str, str]] = None,
) -> bool:
"""
Match a given Feast object against the configured type, name and tags in a permission configuration.
Args:
resource: A FeastObject instance to match agains the permission.
expected_types: The list of object types configured in the permission. Type match also includes all the sub-classes.
name_pattern: The optional name pattern filter configured in the permission.
required_tags: The optional dictionary of required tags configured in the permission.
Returns:
bool: `True` if the resource matches the configured permission filters.
"""
if resource is None:
logger.warning(f"None passed to {resource_match_config.__name__}")
return False
_type = _get_type(resource)
if not is_a_feast_object(resource):
logger.warning(f"Given resource is not of a managed type but {_type}")
return False
# mypy check ignored because of https://github.com/python/mypy/issues/11673, or it raises "Argument 2 to "isinstance" has incompatible type "tuple[Featu ..."
if not isinstance(resource, tuple(expected_types)): # type: ignore
logger.info(
f"Resource does not match any of the expected type {expected_types}"
)
return False
if name_pattern is not None:
if hasattr(resource, "name"):
if isinstance(resource.name, str):
match = bool(re.fullmatch(name_pattern, resource.name))
if not match:
logger.info(
f"Resource name {resource.name} does not match pattern {name_pattern}"
)
return False
else:
logger.warning(
f"Resource {resource} has no `name` attribute of unexpected type {type(resource.name)}"
)
else:
logger.warning(f"Resource {resource} has no `name` attribute")
if required_tags:
if hasattr(resource, "required_tags"):
if isinstance(resource.required_tags, dict):
for tag in required_tags.keys():
required_value = required_tags.get(tag)
actual_value = resource.required_tags.get(tag)
if required_value != actual_value:
logger.info(
f"Unmatched value {actual_value} for required tag {tag}: expected {required_value}"
)
return False
else:
logger.warning(
f"Resource {resource} has no `required_tags` attribute of unexpected type {type(resource.required_tags)}"
)
else:
logger.warning(f"Resource {resource} has no `required_tags` attribute")
return True
def actions_match_config(
requested_actions: list[AuthzedAction],
allowed_actions: list[AuthzedAction],
) -> bool:
"""
Match a list of actions against the actions defined in a permission configuration.
Args:
requested_actions: A list of actions to be executed.
allowed_actions: The list of actions configured in the permission.
Returns:
bool: `True` if all the given `requested_actions` are defined in the `allowed_actions`.
"""
return all(a in allowed_actions for a in requested_actions)