Skip to content
Merged
Prev Previous commit
Next Next commit
add prefixes to system fields
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Apr 26, 2022
commit 4b983d6ae58bab81ebeff766016eb3b8e54cdd67
15 changes: 10 additions & 5 deletions sdk/python/feast/feature_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
from feast.registry import Registry


REQUEST_ID_FIELD = "__request_id"
LOG_TIMESTAMP_FIELD = "__log_timestamp"
LOG_DATE_FIELD = "__log_date"


class LoggingSource:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to check my understanding, users should be able to instantiate LoggingSources that could be wrappers of existing data sources right? e.g. we append to an already existing BigQuery table that already has the equivalent of these features?

Copy link
Copy Markdown
Collaborator Author

@pyalex pyalex Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly.

  1. User doesn't instantiate logging source, this is being created mostly internally depending on where logs are coming from.
  2. Currently the only available logging source is a feature server. Other example could be materialization job or streaming job.
  3. We do not append to feature sources (like BigQueryDataSource). Instead logs are being written to LoggingDestination. This one is defined by user as part of LoggingConfig in feature service (see changes to proto).
  4. LoggingDestination can be converted to DataSource (see LoggingDestination.to_data_source) when logs are loaded from offline store.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some doc string.

"""
Logging source describes object that produces logs (eg, feature service produces logs of served features).
Expand Down Expand Up @@ -92,19 +97,19 @@ def get_schema(self, registry: "Registry") -> pa.Schema:
]

# system columns
fields["request_id"] = pa.string()
fields["log_timestamp"] = pa.timestamp("us", tz=UTC)
fields["log_date"] = pa.date32()
fields[REQUEST_ID_FIELD] = pa.string()
fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=UTC)
fields[LOG_DATE_FIELD] = pa.date32()

return pa.schema(
[pa.field(name, data_type) for name, data_type in fields.items()]
)

def get_partition_column(self, registry: "Registry") -> str:
return "log_date"
return LOG_DATE_FIELD

def get_log_timestamp_column(self) -> str:
return "log_timestamp"
return LOG_TIMESTAMP_FIELD


class _DestinationRegistry(type):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
import pytest
from google.api_core.exceptions import NotFound

from feast.feature_logging import FeatureServiceLoggingSource, LoggingConfig
from feast.feature_logging import (
LOG_DATE_FIELD,
LOG_TIMESTAMP_FIELD,
REQUEST_ID_FIELD,
FeatureServiceLoggingSource,
LoggingConfig,
)
from feast.feature_service import FeatureService
from feast.protos.feast.serving.ServingService_pb2 import FieldStatus
from feast.wait import wait_retry_backoff
Expand Down Expand Up @@ -59,13 +65,13 @@ def test_feature_service_logging(environment, universal_data_sources):
store.write_logged_features(
source=feature_service, logs=pa.Table.from_pandas(second_batch, schema=schema),
)
expected_columns = list(set(logs_df.columns) - {"log_date"})
expected_columns = list(set(logs_df.columns) - {LOG_DATE_FIELD})

def retrieve():
retrieval_job = store._get_provider().retrieve_feature_service_logs(
feature_service=feature_service,
start_date=logs_df["log_timestamp"].min(),
end_date=logs_df["log_timestamp"].max() + datetime.timedelta(seconds=1),
start_date=logs_df[LOG_TIMESTAMP_FIELD].min(),
end_date=logs_df[LOG_TIMESTAMP_FIELD].max() + datetime.timedelta(seconds=1),
config=store.config,
registry=store._registry,
)
Expand All @@ -84,8 +90,8 @@ def retrieve():
persisted_logs = persisted_logs[expected_columns]
logs_df = logs_df[expected_columns]
pd.testing.assert_frame_equal(
logs_df.sort_values("request_id").reset_index(drop=True),
persisted_logs.sort_values("request_id").reset_index(drop=True),
logs_df.sort_values(REQUEST_ID_FIELD).reset_index(drop=True),
persisted_logs.sort_values(REQUEST_ID_FIELD).reset_index(drop=True),
check_dtype=False,
)

Expand All @@ -97,11 +103,11 @@ def prepare_logs(datasets: UniversalDatasets) -> pd.DataFrame:
num_rows = driver_df.shape[0]

logs_df = driver_df[["driver_id", "val_to_add"]]
logs_df["request_id"] = [str(uuid.uuid4()) for _ in range(num_rows)]
logs_df["log_timestamp"] = pd.Series(
logs_df[REQUEST_ID_FIELD] = [str(uuid.uuid4()) for _ in range(num_rows)]
logs_df[LOG_TIMESTAMP_FIELD] = pd.Series(
np.random.randint(0, 7 * 24 * 3600, num_rows)
).map(lambda secs: pd.Timestamp.utcnow() - datetime.timedelta(seconds=secs))
logs_df["log_date"] = logs_df["log_timestamp"].dt.date
logs_df[LOG_DATE_FIELD] = logs_df[LOG_TIMESTAMP_FIELD].dt.date

for view, features in (
("driver_stats", ("conv_rate", "avg_daily_trips")),
Expand Down