-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Write logged features to Offline Store (Go - Python integration) #2621
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
fb26fe8
37f85f4
081eed4
b5ca9ac
5f22182
25fc614
f2ed4ef
83f51a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,23 +8,22 @@ import ( | |
| "os" | ||
| "os/signal" | ||
| "syscall" | ||
|
|
||
| "google.golang.org/grpc" | ||
|
|
||
| "github.com/feast-dev/feast/go/internal/feast/server" | ||
| "github.com/feast-dev/feast/go/internal/feast/server/logging" | ||
| "github.com/feast-dev/feast/go/protos/feast/serving" | ||
| "time" | ||
|
|
||
| "github.com/apache/arrow/go/v8/arrow" | ||
| "github.com/apache/arrow/go/v8/arrow/array" | ||
| "github.com/apache/arrow/go/v8/arrow/cdata" | ||
| "github.com/apache/arrow/go/v8/arrow/memory" | ||
| "google.golang.org/grpc" | ||
|
|
||
| "github.com/feast-dev/feast/go/internal/feast" | ||
| "github.com/feast-dev/feast/go/internal/feast/model" | ||
| "github.com/feast-dev/feast/go/internal/feast/onlineserving" | ||
| "github.com/feast-dev/feast/go/internal/feast/registry" | ||
| "github.com/feast-dev/feast/go/internal/feast/server" | ||
| "github.com/feast-dev/feast/go/internal/feast/server/logging" | ||
| "github.com/feast-dev/feast/go/internal/feast/transformation" | ||
| "github.com/feast-dev/feast/go/protos/feast/serving" | ||
| prototypes "github.com/feast-dev/feast/go/protos/feast/types" | ||
| "github.com/feast-dev/feast/go/types" | ||
| ) | ||
|
|
@@ -44,6 +43,15 @@ type DataTable struct { | |
| SchemaPtr uintptr | ||
| } | ||
|
|
||
| // LoggingOptions is a public (embedded) copy of logging.LoggingOptions struct. | ||
| // See logging.LoggingOptions for properties description | ||
| type LoggingOptions struct { | ||
| ChannelCapacity int | ||
| EmitTimeout time.Duration | ||
| WriteInterval time.Duration | ||
| FlushInterval time.Duration | ||
| } | ||
|
|
||
| func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig, transformationCallback transformation.TransformationCallback) *OnlineFeatureService { | ||
| repoConfig, err := registry.NewRepoConfigFromJSON(conf.RepoPath, conf.RepoConfig) | ||
| if err != nil { | ||
|
|
@@ -215,16 +223,44 @@ func (s *OnlineFeatureService) GetOnlineFeatures( | |
| } | ||
|
|
||
| func (s *OnlineFeatureService) StartGprcServer(host string, port int) error { | ||
| // TODO(oleksii): enable logging | ||
| // Disable logging for now | ||
| return s.StartGprcServerWithLogging(host, port, nil, LoggingOptions{}) | ||
| } | ||
|
|
||
| func (s *OnlineFeatureService) StartGprcServerWithLoggingDefaultOpts(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback) error { | ||
| defaultOpts := LoggingOptions{ | ||
| ChannelCapacity: logging.DefaultOptions.ChannelCapacity, | ||
| EmitTimeout: logging.DefaultOptions.EmitTimeout, | ||
| WriteInterval: logging.DefaultOptions.WriteInterval, | ||
| FlushInterval: logging.DefaultOptions.FlushInterval, | ||
| } | ||
| return s.StartGprcServerWithLogging(host, port, writeLoggedFeaturesCallback, defaultOpts) | ||
| } | ||
|
|
||
| func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) error { | ||
| var loggingService *logging.LoggingService = nil | ||
| var err error | ||
| if writeLoggedFeaturesCallback != nil { | ||
| sink, err := logging.NewOfflineStoreSink(writeLoggedFeaturesCallback) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| loggingService, err = logging.NewLoggingService(s.fs, sink, logging.LoggingOptions{ | ||
| ChannelCapacity: loggingOpts.ChannelCapacity, | ||
| EmitTimeout: loggingOpts.EmitTimeout, | ||
| WriteInterval: loggingOpts.WriteInterval, | ||
| FlushInterval: loggingOpts.FlushInterval, | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| ser := server.NewGrpcServingServiceServer(s.fs, loggingService) | ||
| log.Printf("Starting a gRPC server on host %s port %d\n", host, port) | ||
| lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| log.Printf("Listening a gRPC server on host %s port %d\n", host, port) | ||
|
|
||
| grpcServer := grpc.NewServer() | ||
| serving.RegisterServingServiceServer(grpcServer, ser) | ||
|
|
@@ -234,6 +270,10 @@ func (s *OnlineFeatureService) StartGprcServer(host string, port int) error { | |
| <-s.grpcStopCh | ||
| fmt.Println("Stopping the gRPC server...") | ||
| grpcServer.GracefulStop() | ||
| if loggingService != nil { | ||
| loggingService.Stop() | ||
| } | ||
| fmt.Println("gRPC server terminated") | ||
|
Comment on lines
276
to
+281
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should really move some of this stuff to use a proper logger. |
||
| }() | ||
|
|
||
| err = grpcServer.Serve(lis) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| package logging | ||
|
|
||
| import ( | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "io/ioutil" | ||
| "os" | ||
| "path/filepath" | ||
|
|
||
| "github.com/apache/arrow/go/v8/arrow" | ||
| "github.com/apache/arrow/go/v8/arrow/array" | ||
| "github.com/apache/arrow/go/v8/parquet" | ||
| "github.com/apache/arrow/go/v8/parquet/pqarrow" | ||
| "github.com/google/uuid" | ||
| ) | ||
|
|
||
| type OfflineStoreWriteCallback func(featureServiceName, datasetDir string) string | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably should specify name as string too?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's a short form |
||
|
|
||
| type OfflineStoreSink struct { | ||
| datasetDir string | ||
| writeCallback OfflineStoreWriteCallback | ||
| } | ||
|
|
||
| func NewOfflineStoreSink(writeCallback OfflineStoreWriteCallback) (*OfflineStoreSink, error) { | ||
| return &OfflineStoreSink{ | ||
| datasetDir: "", | ||
| writeCallback: writeCallback, | ||
| }, nil | ||
| } | ||
|
|
||
| func (s *OfflineStoreSink) getOrCreateDatasetDir() (string, error) { | ||
| if s.datasetDir != "" { | ||
| return s.datasetDir, nil | ||
| } | ||
| dir, err := ioutil.TempDir("", "*") | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| s.datasetDir = dir | ||
| return s.datasetDir, nil | ||
| } | ||
|
|
||
| func (s *OfflineStoreSink) cleanCurrentDatasetDir() error { | ||
| if s.datasetDir == "" { | ||
| return nil | ||
| } | ||
| datasetDir := s.datasetDir | ||
| s.datasetDir = "" | ||
| return os.RemoveAll(datasetDir) | ||
| } | ||
|
|
||
| func (s *OfflineStoreSink) Write(record arrow.Record) error { | ||
| fileName, _ := uuid.NewUUID() | ||
| datasetDir, err := s.getOrCreateDatasetDir() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| var writer io.Writer | ||
| writer, err = os.Create(filepath.Join(datasetDir, fmt.Sprintf("%s.parquet", fileName.String()))) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| table := array.NewTableFromRecords(record.Schema(), []arrow.Record{record}) | ||
|
|
||
| props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false)) | ||
| arrProps := pqarrow.DefaultWriterProps() | ||
| return pqarrow.WriteTable(table, writer, 1000, props, arrProps) | ||
| } | ||
|
|
||
| func (s *OfflineStoreSink) Flush(featureServiceName string) error { | ||
| if s.datasetDir == "" { | ||
| return nil | ||
| } | ||
|
|
||
| errMsg := s.writeCallback(featureServiceName, s.datasetDir) | ||
| if errMsg != "" { | ||
| return errors.New(errMsg) | ||
| } | ||
|
|
||
| return s.cleanCurrentDatasetDir() | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between default logging opts and no logging opts? Is logging disable? Can we add comments for the methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added