Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
write logged features by path
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Apr 29, 2022
commit fb26fe8f5050af0e3b48c0fd60acc4392a77b88e
18 changes: 16 additions & 2 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,23 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
}

func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
// TODO(oleksii): enable logging
// Disable logging for now
return s.StartGprcServerWithLogging(host, port, nil)
}

func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback) error {
var loggingService *logging.LoggingService = nil
var err error
if writeLoggedFeaturesCallback != nil {
sink, err := logging.NewOfflineStoreSink(writeLoggedFeaturesCallback)
if err != nil {
return err
}

loggingService, err = logging.NewLoggingService(s.fs, sink)
if err != nil {
return err
}
}
ser := server.NewGrpcServingServiceServer(s.fs, loggingService)
log.Printf("Starting a gRPC server on host %s port %d\n", host, port)
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
Expand Down
2 changes: 1 addition & 1 deletion go/internal/feast/server/logging/filelogsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *FileLogSink) Write(record arrow.Record) error {
return pqarrow.WriteTable(table, writer, 100, props, arrProps)
}

func (s *FileLogSink) Flush() error {
func (s *FileLogSink) Flush(featureServiceName string) error {
// files are already flushed during Write
return nil
}
6 changes: 3 additions & 3 deletions go/internal/feast/server/logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type LogSink interface {
// Flush actually send data to a sink.
// We want to control amount to interaction with sink, since it could be a costly operation.
// Also, some sinks like BigQuery might have quotes and physically limit amount of write requests per day.
Flush() error
Flush(featureSeviceName string) error
}

type Logger interface {
Expand Down Expand Up @@ -144,7 +144,7 @@ func (l *LoggerImpl) loggerLoop() (lErr error) {
if err != nil {
log.Printf("Log write failed: %+v", err)
}
err = l.sink.Flush()
err = l.sink.Flush(l.featureServiceName)
if err != nil {
log.Printf("Log flush failed: %+v", err)
}
Expand All @@ -155,7 +155,7 @@ func (l *LoggerImpl) loggerLoop() (lErr error) {
log.Printf("Log write failed: %+v", err)
}
case <-time.After(l.config.FlushInterval):
err := l.sink.Flush()
err := l.sink.Flush(l.featureServiceName)
if err != nil {
log.Printf("Log flush failed: %+v", err)
}
Expand Down
82 changes: 82 additions & 0 deletions go/internal/feast/server/logging/offlinestoresink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package logging

import (
"errors"
"fmt"
"github.com/apache/arrow/go/v8/arrow"
"github.com/apache/arrow/go/v8/arrow/array"
"github.com/apache/arrow/go/v8/parquet"
"github.com/apache/arrow/go/v8/parquet/pqarrow"
"github.com/google/uuid"
"io"
"io/ioutil"
"os"
"path/filepath"
)

type OfflineStoreWriteCallback func(featureServiceName, datasetDir string) string
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably should specify name as string too?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a short form


type OfflineStoreSink struct {
datasetDir string
writeCallback OfflineStoreWriteCallback
}

func NewOfflineStoreSink(writeCallback OfflineStoreWriteCallback) (*OfflineStoreSink, error) {
return &OfflineStoreSink{
datasetDir: "",
writeCallback: writeCallback,
}, nil
}

func (s *OfflineStoreSink) getOrCreateDatasetDir() (string, error) {
if s.datasetDir != "" {
return s.datasetDir, nil
}
dir, err := ioutil.TempDir("", "*")
if err != nil {
return "", err
}
s.datasetDir = dir
return s.datasetDir, nil
}

func (s *OfflineStoreSink) cleanCurrentDatasetDir() error {
if s.datasetDir != "" {
return nil
}
datasetDir := s.datasetDir
s.datasetDir = ""
return os.RemoveAll(datasetDir)
}

func (s *OfflineStoreSink) Write(record arrow.Record) error {
fileName, _ := uuid.NewUUID()
datasetDir, err := s.getOrCreateDatasetDir()
if err != nil {
return err
}

var writer io.Writer
writer, err = os.Create(filepath.Join(datasetDir, fmt.Sprintf("%s.parquet", fileName.String())))
if err != nil {
return err
}
table := array.NewTableFromRecords(record.Schema(), []arrow.Record{record})

props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false))
arrProps := pqarrow.DefaultWriterProps()
return pqarrow.WriteTable(table, writer, 1000, props, arrProps)
}

func (s *OfflineStoreSink) Flush(featureServiceName string) error {
if s.datasetDir == "" {
return nil
}

errMsg := s.writeCallback(featureServiceName, s.datasetDir)
if errMsg != "" {
return errors.New(errMsg)
}

return s.cleanCurrentDatasetDir()
}
15 changes: 14 additions & 1 deletion sdk/python/feast/embedded_go/online_features_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

import pyarrow as pa
Expand Down Expand Up @@ -132,7 +133,7 @@ def get_online_features(
resp = record_batch_to_online_response(record_batch)
return OnlineResponse(resp)

def start_grpc_server(self, host: str, port: int):
def start_grpc_server(self, host: str, port: int, enable_logging=True):
self._service.StartGprcServer(host, port)

def stop_grpc_server(self):
Expand Down Expand Up @@ -182,6 +183,18 @@ def transformation_callback(
return output_record.num_rows


def logging_callback(
fs: "FeatureStore", feature_service_name: str, dataset_dir: str,
) -> str:
feature_service = fs.get_feature_service(feature_service_name, allow_cache=True)
try:
fs.write_logged_features(logs=Path(dataset_dir), source=feature_service)
except Exception as exc:
return repr(exc)

return "" # no error


def allocate_schema_and_array():
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
Expand Down
10 changes: 9 additions & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1988,10 +1988,16 @@ def serve_transformations(self, port: int) -> None:
def _teardown_go_server(self):
self._go_server = None

def write_logged_features(self, logs: pa.Table, source: Union[FeatureService]):
def write_logged_features(
self, logs: Union[pa.Table, Path], source: Union[FeatureService]
):
"""
Write logs produced by a source (currently only feature service is supported as a source)
to an offline store.

Args:
logs: Arrow Table or path to parquet dataset directory on disk
source: Object that produces logs
"""
if not isinstance(source, FeatureService):
raise ValueError("Only feature service is currently supported as a source")
Expand All @@ -2000,6 +2006,8 @@ def write_logged_features(self, logs: pa.Table, source: Union[FeatureService]):
source.logging_config is not None
), "Feature service must be configured with logging config in order to use this functionality"

assert isinstance(logs, (pa.Table, Path))

self._get_provider().write_feature_service_logs(
feature_service=source,
logs=logs,
Expand Down
15 changes: 14 additions & 1 deletion sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import contextlib
import os
import tempfile
import uuid
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import (
Callable,
ContextManager,
Expand Down Expand Up @@ -258,7 +260,7 @@ def query_generator() -> Iterator[str]:
@staticmethod
def write_logged_features(
config: RepoConfig,
data: pyarrow.Table,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
Expand All @@ -280,6 +282,17 @@ def write_logged_features(
),
)

if isinstance(data, Path):
for file in data.iterdir():
with file.open("rb") as f:
client.load_table_from_file(
file_obj=f,
destination=destination.table,
job_config=job_config,
)

return

with tempfile.TemporaryFile() as parquet_temp_file:
pyarrow.parquet.write_table(table=data, where=parquet_temp_file)

Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from pathlib import Path
from typing import Callable, List, Optional, Tuple, Union

import dask.dataframe as dd
Expand Down Expand Up @@ -375,14 +376,17 @@ def pull_all_from_table_or_query(
@staticmethod
def write_logged_features(
config: RepoConfig,
data: pyarrow.Table,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
):
destination = logging_config.destination
assert isinstance(destination, FileLoggingDestination)

if isinstance(data, Path):
data = pyarrow.parquet.read_table(data)

filesystem, path = FileSource.create_filesystem_and_path(
destination.path, destination.s3_endpoint_override,
)
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import warnings
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Union

import pandas as pd
Expand Down Expand Up @@ -246,7 +247,7 @@ def pull_all_from_table_or_query(
@staticmethod
def write_logged_features(
config: RepoConfig,
data: pyarrow.Table,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
Expand All @@ -259,7 +260,7 @@ def write_logged_features(

Args:
config: Repo configuration object
data: Arrow table produced by logging source.
data: Arrow table or path to parquet directory that contains logs dataset.
source: Logging source that provides schema and some additional metadata.
logging_config: used to determine destination
registry: Feast registry
Expand Down
8 changes: 6 additions & 2 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import uuid
from datetime import datetime
from pathlib import Path
from typing import (
Callable,
ContextManager,
Expand Down Expand Up @@ -265,7 +266,7 @@ def query_generator() -> Iterator[str]:
@staticmethod
def write_logged_features(
config: RepoConfig,
data: pyarrow.Table,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
Expand All @@ -277,7 +278,10 @@ def write_logged_features(
config.offline_store.region
)
s3_resource = aws_utils.get_s3_resource(config.offline_store.region)
s3_path = f"{config.offline_store.s3_staging_location}/logged_features/{uuid.uuid4()}.parquet"
if isinstance(data, Path):
s3_path = f"{config.offline_store.s3_staging_location}/logged_features/{uuid.uuid4()}"
else:
s3_path = f"{config.offline_store.s3_staging_location}/logged_features/{uuid.uuid4()}.parquet"

aws_utils.upload_arrow_table_to_redshift(
table=data,
Expand Down
24 changes: 17 additions & 7 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
execute_snowflake_statement,
get_snowflake_conn,
write_pandas,
write_parquet,
)
from feast.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
Expand Down Expand Up @@ -280,7 +281,7 @@ def query_generator() -> Iterator[str]:
@staticmethod
def write_logged_features(
config: RepoConfig,
data: pyarrow.Table,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
Expand All @@ -289,12 +290,21 @@ def write_logged_features(

snowflake_conn = get_snowflake_conn(config.offline_store)

write_pandas(
snowflake_conn,
data.to_pandas(),
table_name=logging_config.destination.table_name,
auto_create_table=True,
)
if isinstance(data, Path):
write_parquet(
snowflake_conn,
data,
source.get_schema(registry),
table_name=logging_config.destination.table_name,
auto_create_table=True,
)
else:
write_pandas(
snowflake_conn,
data.to_pandas(),
table_name=logging_config.destination.table_name,
auto_create_table=True,
)


class SnowflakeRetrievalJob(RetrievalJob):
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def retrieve_saved_dataset(
def write_feature_service_logs(
self,
feature_service: FeatureService,
logs: pyarrow.Table,
logs: Union[pyarrow.Table, str],
config: RepoConfig,
registry: Registry,
):
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def retrieve_saved_dataset(
def write_feature_service_logs(
self,
feature_service: FeatureService,
logs: pyarrow.Table,
logs: Union[pyarrow.Table, Path],
config: RepoConfig,
registry: Registry,
):
Expand All @@ -199,6 +199,8 @@ def write_feature_service_logs(

Schema of logs table is being inferred from the provided feature service.
Only feature services with configured logging are accepted.

Logs dataset can be passed as Arrow Table or path to parquet directory.
"""
...

Expand Down
Loading