Skip to content

Commit c4b636f

Browse files
authored
chore: Configure feature logging via feature_store.yaml (#2739)
* Feature logging configurable via feature_store.yaml Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * allow local server type Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
1 parent 1f89758 commit c4b636f

File tree

14 files changed

+160
-40
lines changed

14 files changed

+160
-40
lines changed

docs/reference/feature-servers/go-feature-retrieval.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,36 @@ go_feature_retrieval: True
3535
```
3636
{% endcode %}
3737
38+
## Feature logging
39+
40+
Go feature server can log all requested entities and served features to a configured destination inside an offline store.
41+
This allows users to create new datasets from features served online. Those datasets could be used for future trainings or for
42+
feature validations. To enable feature logging we need to edit `feature_store.yaml`:
43+
```yaml
44+
project: my_feature_repo
45+
registry: data/registry.db
46+
provider: local
47+
online_store:
48+
type: redis
49+
connection_string: "localhost:6379"
50+
go_feature_retrieval: True
51+
feature_server:
52+
feature_logging:
53+
enable: True
54+
```
55+
56+
Feature logging configuration in `feature_store.yaml` also allows to tweak some low-level parameters to achieve the best performance:
57+
```yaml
58+
feature_server:
59+
feature_logging:
60+
enable: True
61+
flush_interval_secs: 300
62+
write_to_disk_interval_secs: 30
63+
emit_timeout_micro_secs: 10000
64+
queue_capacity: 10000
65+
```
66+
All these parameters are optional.
67+
3868
## Future/Current Work
3969

4070
The Go feature retrieval online feature logging for Data Quality Monitoring is currently in development. More information can be found [here](https://docs.google.com/document/d/110F72d4NTv80p35wDSONxhhPBqWRwbZXG4f9mNEMd98/edit#heading=h.9gaqqtox9jg6).

sdk/python/feast/cli.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@
4141
registry_dump,
4242
teardown,
4343
)
44+
from feast.utils import maybe_local_tz
4445

4546
_logger = logging.getLogger(__name__)
47+
warnings.filterwarnings("ignore", category=DeprecationWarning, module="(?!feast)")
4648

4749

4850
class NoOptionDefaultFormat(click.Command):
@@ -803,8 +805,8 @@ def validate(
803805
result = store.validate_logged_features(
804806
source=feature_service,
805807
reference=reference,
806-
start=datetime.fromisoformat(start_ts),
807-
end=datetime.fromisoformat(end_ts),
808+
start=maybe_local_tz(datetime.fromisoformat(start_ts)),
809+
end=maybe_local_tz(datetime.fromisoformat(end_ts)),
808810
throw_exception=False,
809811
cache_profile=not no_profile_cache,
810812
)

sdk/python/feast/dqm/profilers/ge_profiler.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import pandas as pd
99
from great_expectations.core import ExpectationSuite
1010
from great_expectations.dataset import PandasDataset
11-
from great_expectations.profile.base import ProfilerTypeMapping
1211

1312
from feast.dqm.profilers.profiler import (
1413
Profile,
@@ -29,9 +28,7 @@ def _prepare_dataset(dataset: PandasDataset) -> PandasDataset:
2928
dataset_copy = dataset.copy(deep=True)
3029

3130
for column in dataset.columns:
32-
if dataset.expect_column_values_to_be_in_type_list(
33-
column, type_list=sorted(list(ProfilerTypeMapping.DATETIME_TYPE_NAMES))
34-
).success:
31+
if pd.api.types.is_datetime64_any_dtype(dataset[column]):
3532
# GE cannot parse Timestamp or other pandas datetime time
3633
dataset_copy[column] = dataset[column].dt.strftime("%Y-%m-%dT%H:%M:%S")
3734

sdk/python/feast/embedded_go/online_features_service.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
RequestDataNotFoundInEntityRowsException,
1212
)
1313
from feast.feature_service import FeatureService
14+
from feast.infra.feature_servers.base_config import FeatureLoggingConfig
1415
from feast.online_response import OnlineResponse
1516
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse
1617
from feast.protos.feast.types import Value_pb2
@@ -30,6 +31,11 @@
3031
if TYPE_CHECKING:
3132
from feast.feature_store import FeatureStore
3233

34+
NANO_SECOND = 1
35+
MICRO_SECOND = 1000 * NANO_SECOND
36+
MILLI_SECOND = 1000 * MICRO_SECOND
37+
SECOND = 1000 * MILLI_SECOND
38+
3339

3440
class EmbeddedOnlineFeatureServer:
3541
def __init__(
@@ -144,12 +150,22 @@ def start_grpc_server(
144150
host: str,
145151
port: int,
146152
enable_logging: bool = True,
147-
logging_options: Optional[LoggingOptions] = None,
153+
logging_options: Optional[FeatureLoggingConfig] = None,
148154
):
149155
if enable_logging:
150156
if logging_options:
151157
self._service.StartGprcServerWithLogging(
152-
host, port, self._logging_callback, logging_options
158+
host,
159+
port,
160+
self._logging_callback,
161+
LoggingOptions(
162+
FlushInterval=logging_options.flush_interval_secs * SECOND,
163+
WriteInterval=logging_options.write_to_disk_interval_secs
164+
* SECOND,
165+
EmitTimeout=logging_options.emit_timeout_micro_secs
166+
* MICRO_SECOND,
167+
ChannelCapacity=logging_options.queue_capacity,
168+
),
153169
)
154170
else:
155171
self._service.StartGprcServerWithLoggingDefaultOpts(
@@ -163,12 +179,22 @@ def start_http_server(
163179
host: str,
164180
port: int,
165181
enable_logging: bool = True,
166-
logging_options: Optional[LoggingOptions] = None,
182+
logging_options: Optional[FeatureLoggingConfig] = None,
167183
):
168184
if enable_logging:
169185
if logging_options:
170186
self._service.StartHttpServerWithLogging(
171-
host, port, self._logging_callback, logging_options
187+
host,
188+
port,
189+
self._logging_callback,
190+
LoggingOptions(
191+
FlushInterval=logging_options.flush_interval_secs * SECOND,
192+
WriteInterval=logging_options.write_to_disk_interval_secs
193+
* SECOND,
194+
EmitTimeout=logging_options.emit_timeout_micro_secs
195+
* MICRO_SECOND,
196+
ChannelCapacity=logging_options.queue_capacity,
197+
),
172198
)
173199
else:
174200
self._service.StartHttpServerWithLoggingDefaultOpts(

sdk/python/feast/feature_store.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2008,13 +2008,30 @@ def serve(
20082008
if self.config.go_feature_retrieval:
20092009
# Start go server instead of python if the flag is enabled
20102010
self._lazy_init_go_server()
2011+
enable_logging = (
2012+
self.config.feature_server
2013+
and self.config.feature_server.feature_logging
2014+
and self.config.feature_server.feature_logging.enabled
2015+
and not no_feature_log
2016+
)
2017+
logging_options = (
2018+
self.config.feature_server.feature_logging
2019+
if enable_logging and self.config.feature_server
2020+
else None
2021+
)
20112022
if type_ == "http":
20122023
self._go_server.start_http_server(
2013-
host, port, enable_logging=not no_feature_log
2024+
host,
2025+
port,
2026+
enable_logging=enable_logging,
2027+
logging_options=logging_options,
20142028
)
20152029
elif type_ == "grpc":
20162030
self._go_server.start_grpc_server(
2017-
host, port, enable_logging=not no_feature_log
2031+
host,
2032+
port,
2033+
enable_logging=enable_logging,
2034+
logging_options=logging_options,
20182035
)
20192036
else:
20202037
raise ValueError(
@@ -2139,12 +2156,14 @@ def validate_logged_features(
21392156

21402157
# read and run validation
21412158
try:
2142-
j.to_arrow(validation_reference=reference)
2159+
t = j.to_arrow(validation_reference=reference)
21432160
except ValidationFailed as exc:
21442161
if throw_exception:
21452162
raise
21462163

21472164
return exc
2165+
else:
2166+
print(f"{t.shape[0]} rows were validated.")
21482167

21492168
if cache_profile:
21502169
self.apply(reference)

sdk/python/feast/infra/feature_servers/aws_lambda/config.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
11
from pydantic import StrictBool, StrictStr
22
from pydantic.typing import Literal
33

4-
from feast.repo_config import FeastConfigBaseModel
4+
from feast.infra.feature_servers.base_config import BaseFeatureServerConfig
55

66

7-
class AwsLambdaFeatureServerConfig(FeastConfigBaseModel):
7+
class AwsLambdaFeatureServerConfig(BaseFeatureServerConfig):
88
"""Feature server config for AWS Lambda."""
99

1010
type: Literal["aws_lambda"] = "aws_lambda"
1111
"""Feature server type selector."""
1212

13-
enabled: StrictBool = False
14-
"""Whether the feature server should be launched."""
15-
1613
public: StrictBool = True
1714
"""Whether the endpoint should be publicly accessible."""
1815

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from typing import Optional
2+
3+
from pydantic import StrictBool, StrictInt
4+
5+
from feast.repo_config import FeastConfigBaseModel
6+
7+
8+
class FeatureLoggingConfig(FeastConfigBaseModel):
9+
enabled: StrictBool = False
10+
"""Whether the feature server should log served features."""
11+
12+
flush_interval_secs: StrictInt = 600
13+
"""Interval of flushing logs to the destination in offline store."""
14+
15+
write_to_disk_interval_secs: StrictInt = 30
16+
"""Interval of dumping logs collected in memory to local disk."""
17+
18+
queue_capacity: StrictInt = 100000
19+
"""Log queue capacity. If number of produced logs is bigger than
20+
processing speed logs will be accumulated in the queue.
21+
After queue length will reach this number all new items will be rejected."""
22+
23+
emit_timeout_micro_secs: StrictInt = 10000
24+
"""Timeout for adding new log item to the queue."""
25+
26+
27+
class BaseFeatureServerConfig(FeastConfigBaseModel):
28+
"""Base Feature Server config that should be extended"""
29+
30+
enabled: StrictBool = False
31+
"""Whether the feature server should be launched."""
32+
33+
feature_logging: Optional[FeatureLoggingConfig]
34+
""" Feature logging configuration """

sdk/python/feast/infra/feature_servers/gcp_cloudrun/config.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
11
from pydantic import StrictBool
22
from pydantic.typing import Literal
33

4-
from feast.repo_config import FeastConfigBaseModel
4+
from feast.infra.feature_servers.base_config import BaseFeatureServerConfig
55

66

7-
class GcpCloudRunFeatureServerConfig(FeastConfigBaseModel):
7+
class GcpCloudRunFeatureServerConfig(BaseFeatureServerConfig):
88
"""Feature server config for GCP CloudRun."""
99

1010
type: Literal["gcp_cloudrun"] = "gcp_cloudrun"
1111
"""Feature server type selector."""
1212

13-
enabled: StrictBool = False
14-
"""Whether the feature server should be launched."""
15-
1613
public: StrictBool = True
1714
"""Whether the endpoint should be publicly accessible."""
1815

sdk/python/feast/infra/feature_servers/local_process/__init__.py

Whitespace-only changes.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from pydantic.typing import Literal
2+
3+
from feast.infra.feature_servers.base_config import BaseFeatureServerConfig
4+
5+
6+
class LocalFeatureServerConfig(BaseFeatureServerConfig):
7+
type: Literal["local"] = "local"
8+
"""Feature server type selector."""

0 commit comments

Comments
 (0)