From fa653f1732612b0d317e0d1a478f8985280ddb88 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Wed, 27 Apr 2022 00:38:23 -0700 Subject: [PATCH 1/3] CLI command 'feast serve' should start go-based server if flag is enabled Signed-off-by: Tsotne Tabidze --- Makefile | 2 +- .../logging/feature_repo => }/__init__.py | 0 .../logging/feature_repo/data/online_store.db | Bin 16384 -> 0 bytes go/cmd/server/main.go | 74 ------------------ go/embedded/online_features.go | 38 +++++++++ go/internal/__init__.py | 0 go/internal/feast/__init__.py | 0 go/internal/feast/onlinestore/onlinestore.go | 8 +- .../feast/server/grpc_server.go} | 18 +++-- .../feast/server/grpc_server_test.go} | 6 +- .../server/logging/feature_repo/__init__.py | 0 .../logging/feature_repo/driver_stats.parquet | Bin .../server/logging/feature_repo/example.py | 0 .../logging/feature_repo/feature_store.yaml | 0 .../feast}/server/logging/filelogstorage.go | 0 .../server/logging/filelogstorage_test.go | 0 .../feast}/server/logging/logging.go | 0 .../feast}/server/logging/logging_test.go | 0 .../server/logging/offlinelogstorage.go | 0 .../embedded_go/online_features_service.py | 11 +++ sdk/python/feast/feature_store.py | 31 +++++--- 21 files changed, 89 insertions(+), 99 deletions(-) rename go/{cmd/server/logging/feature_repo => }/__init__.py (100%) delete mode 100644 go/cmd/server/logging/feature_repo/data/online_store.db delete mode 100644 go/cmd/server/main.go create mode 100644 go/internal/__init__.py create mode 100644 go/internal/feast/__init__.py rename go/{cmd/server/server.go => internal/feast/server/grpc_server.go} (76%) rename go/{cmd/server/server_test.go => internal/feast/server/grpc_server_test.go} (98%) create mode 100644 go/internal/feast/server/logging/feature_repo/__init__.py rename go/{cmd => internal/feast}/server/logging/feature_repo/driver_stats.parquet (100%) rename go/{cmd => internal/feast}/server/logging/feature_repo/example.py (100%) rename go/{cmd => internal/feast}/server/logging/feature_repo/feature_store.yaml (100%) rename go/{cmd => internal/feast}/server/logging/filelogstorage.go (100%) rename go/{cmd => internal/feast}/server/logging/filelogstorage_test.go (100%) rename go/{cmd => internal/feast}/server/logging/logging.go (100%) rename go/{cmd => internal/feast}/server/logging/logging_test.go (100%) rename go/{cmd => internal/feast}/server/logging/offlinelogstorage.go (100%) diff --git a/Makefile b/Makefile index 61549d4d0f4..c1260dbe308 100644 --- a/Makefile +++ b/Makefile @@ -167,7 +167,7 @@ compile-protos-go: install-go-proto-dependencies install-protoc-dependencies cd sdk/python && python setup.py build_go_protos compile-go-lib: install-go-proto-dependencies install-go-ci-dependencies - cd sdk/python && python setup.py build_go_lib + cd sdk/python && COMPILE_GO=True python setup.py build_ext --inplace # Needs feast package to setup the feature store test-go: compile-protos-go diff --git a/go/cmd/server/logging/feature_repo/__init__.py b/go/__init__.py similarity index 100% rename from go/cmd/server/logging/feature_repo/__init__.py rename to go/__init__.py diff --git a/go/cmd/server/logging/feature_repo/data/online_store.db b/go/cmd/server/logging/feature_repo/data/online_store.db deleted file mode 100644 index b6ccea139e5aca0c0b1985409dabf0578b8dcd14..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16384 zcmeI#K~KUk6bJBb5HThQH?Hp}2{9pl0YP^lGEmq7PIby7E^G|9j>y3SehEIHU&5o^ z0!BHwc<@60P1>gG+uHYAFCE@>J;gK)qrg>kA*#YK#3hjs!ql~>>#ppNjorjAy(!Lz z_vOlmD3ssD>87&L9SQ^>009U<00Izz00bZa0SNq$z`9u|)rPjQ?tAkIzf9RxNyIYZ zMJOkcx8z7Zg-PVEWUO2jOP=M06t4zW%dx1_yS7G@7p*@?{a$Vuo$*|GY9(j9s;^&- zQq8uFw?MfMKIbiwH*an??6lfl>rXqOcDLVde7!Pv1185BISpF6e!|;nUh2I|6_fG; z){4QRL64D!=E>eux*c@v)?iFG*7#e(KUb*NrFm3~Z9`;#EPnR&9bLMK3bV1yOIJ4Y zA`1Gb9jC{woHo_8L4g1SAOHafKmY;|fB*y_009U<;7|ol(!_B8Kh)ogydVGp2tWV= q5P$##AOHafKmY=n0Pg=d2oQh(1Rwwb2tWV=5P$##AOL~G7x)Cr+SMcg diff --git a/go/cmd/server/main.go b/go/cmd/server/main.go deleted file mode 100644 index 33d56e0a7a2..00000000000 --- a/go/cmd/server/main.go +++ /dev/null @@ -1,74 +0,0 @@ -package main - -import ( - "fmt" - "log" - "net" - "os" - - "github.com/feast-dev/feast/go/cmd/server/logging" - "github.com/feast-dev/feast/go/internal/feast" - "github.com/feast-dev/feast/go/internal/feast/registry" - "github.com/feast-dev/feast/go/protos/feast/serving" - "google.golang.org/grpc" -) - -const ( - flagFeastRepoPath = "FEAST_REPO_PATH" - flagFeastRepoConfig = "FEAST_REPO_CONFIG" - flagFeastSockFile = "FEAST_GRPC_SOCK_FILE" - feastServerVersion = "0.18.0" -) - -// TODO: Add a proper logging library such as https://github.com/Sirupsen/logrus -func main() { - repoPath := os.Getenv(flagFeastRepoPath) - repoConfigJSON := os.Getenv(flagFeastRepoConfig) - sockFile := os.Getenv(flagFeastSockFile) - if repoPath == "" && repoConfigJSON == "" { - log.Fatalln(fmt.Sprintf("One of %s of %s environment variables must be set", flagFeastRepoPath, flagFeastRepoConfig)) - } - - var repoConfig *registry.RepoConfig - var err error - if repoConfigJSON != "" { - repoConfig, err = registry.NewRepoConfigFromJSON(repoPath, repoConfigJSON) - if err != nil { - log.Fatalln(err) - } - } else { - repoConfig, err = registry.NewRepoConfigFromFile(repoPath) - if err != nil { - log.Fatalln(err) - } - } - - log.Println("Initializing feature store...") - fs, err := feast.NewFeatureStore(repoConfig, nil) - if err != nil { - log.Fatalln(err) - } - // Disable logging for now - loggingService, err := logging.NewLoggingService(fs, 1000, "", false) - if err != nil { - log.Fatalln(err) - } - defer fs.DestructOnlineStore() - startGrpcServer(fs, loggingService, sockFile) -} - -func startGrpcServer(fs *feast.FeatureStore, loggingService *logging.LoggingService, sockFile string) { - server := newServingServiceServer(fs, loggingService) - log.Printf("Starting a gRPC server listening on %s\n", sockFile) - lis, err := net.Listen("unix", sockFile) - if err != nil { - log.Fatalln(err) - } - grpcServer := grpc.NewServer() - defer grpcServer.Stop() - serving.RegisterServingServiceServer(grpcServer, server) - err = grpcServer.Serve(lis) - if err != nil { - log.Fatalln(err) - } -} diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index 24a54894306..f77f644f83a 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -3,7 +3,15 @@ package embedded import ( "context" "fmt" + "github.com/feast-dev/feast/go/internal/feast/server" + "github.com/feast-dev/feast/go/internal/feast/server/logging" + "github.com/feast-dev/feast/go/protos/feast/serving" + "google.golang.org/grpc" "log" + "net" + "os" + "os/signal" + "syscall" "github.com/apache/arrow/go/v8/arrow" "github.com/apache/arrow/go/v8/arrow/array" @@ -198,6 +206,36 @@ func (s *OnlineFeatureService) GetOnlineFeatures( return nil } +func (s *OnlineFeatureService) StartGprcServer(host string, port int) error { + // TODO(oleksii): enable logging + // Disable logging for now + var loggingService *logging.LoggingService = nil + ser := server.NewGrpcServingServiceServer(s.fs, loggingService) + log.Printf("Starting a gRPC server on host %s port %d\n", host, port) + lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + return err + } + grpcServer := grpc.NewServer() + serving.RegisterServingServiceServer(grpcServer, ser) + + // Notify this channel when receiving interrupt or termination signals from OS + c := make(chan os.Signal) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + go func() { + // As soon as these signals are received from OS, try to gracefully stop the gRPC server + <-c + fmt.Println("Stopping the gRPC server...") + grpcServer.GracefulStop() + }() + + err = grpcServer.Serve(lis) + if err != nil { + return err + } + return nil +} + /* Read Record Batch from memory managed by Python caller. Python part uses C ABI interface to export this record into C Data Interface, diff --git a/go/internal/__init__.py b/go/internal/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/go/internal/feast/__init__.py b/go/internal/feast/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/go/internal/feast/onlinestore/onlinestore.go b/go/internal/feast/onlinestore/onlinestore.go index b4a25714808..433b96c08e4 100644 --- a/go/internal/feast/onlinestore/onlinestore.go +++ b/go/internal/feast/onlinestore/onlinestore.go @@ -43,7 +43,8 @@ type OnlineStore interface { func getOnlineStoreType(onlineStoreConfig map[string]interface{}) (string, bool) { if onlineStoreType, ok := onlineStoreConfig["type"]; !ok { - return "", false + // If online store type isn't specified, default to sqlite + return "sqlite", true } else { result, ok := onlineStoreType.(string) return result, ok @@ -53,10 +54,11 @@ func getOnlineStoreType(onlineStoreConfig map[string]interface{}) (string, bool) func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) { onlineStoreType, ok := getOnlineStoreType(config.OnlineStore) if !ok { + return nil, fmt.Errorf("could not get online store type from online store config: %+v", config.OnlineStore) + } else if onlineStoreType == "sqlite" { onlineStore, err := NewSqliteOnlineStore(config.Project, config, config.OnlineStore) return onlineStore, err - } - if onlineStoreType == "redis" { + } else if onlineStoreType == "redis" { onlineStore, err := NewRedisOnlineStore(config.Project, config.OnlineStore) return onlineStore, err } else { diff --git a/go/cmd/server/server.go b/go/internal/feast/server/grpc_server.go similarity index 76% rename from go/cmd/server/server.go rename to go/internal/feast/server/grpc_server.go index 3708689268b..08f624b6bda 100644 --- a/go/cmd/server/server.go +++ b/go/internal/feast/server/grpc_server.go @@ -1,27 +1,29 @@ -package main +package server import ( "context" - "github.com/feast-dev/feast/go/cmd/server/logging" "github.com/feast-dev/feast/go/internal/feast" + "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" "github.com/google/uuid" ) -type servingServiceServer struct { +const feastServerVersion = "0.0.1" + +type grpcServingServiceServer struct { fs *feast.FeatureStore loggingService *logging.LoggingService serving.UnimplementedServingServiceServer } -func newServingServiceServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *servingServiceServer { - return &servingServiceServer{fs: fs, loggingService: loggingService} +func NewGrpcServingServiceServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *grpcServingServiceServer { + return &grpcServingServiceServer{fs: fs, loggingService: loggingService} } -func (s *servingServiceServer) GetFeastServingInfo(ctx context.Context, request *serving.GetFeastServingInfoRequest) (*serving.GetFeastServingInfoResponse, error) { +func (s *grpcServingServiceServer) GetFeastServingInfo(ctx context.Context, request *serving.GetFeastServingInfoRequest) (*serving.GetFeastServingInfoResponse, error) { return &serving.GetFeastServingInfoResponse{ Version: feastServerVersion, }, nil @@ -30,7 +32,7 @@ func (s *servingServiceServer) GetFeastServingInfo(ctx context.Context, request // Returns an object containing the response to GetOnlineFeatures. // Metadata contains featurenames that corresponds to the number of rows in response.Results. // Results contains values including the value of the feature, the event timestamp, and feature status in a columnar format. -func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) { +func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) { requestId := GenerateRequestId() featuresOrService, err := s.fs.ParseFeatures(request.GetKind()) if err != nil { @@ -74,7 +76,7 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s EventTimestamps: vector.Timestamps, }) } - if featuresOrService.FeatureService != nil { + if featuresOrService.FeatureService != nil && s.loggingService != nil { go s.loggingService.GenerateLogs(featuresOrService.FeatureService, entityValuesMap, resp.Results[len(request.Entities):], request.RequestContext, requestId) } return resp, nil diff --git a/go/cmd/server/server_test.go b/go/internal/feast/server/grpc_server_test.go similarity index 98% rename from go/cmd/server/server_test.go rename to go/internal/feast/server/grpc_server_test.go index 9d4ffb50bf8..090a8738111 100644 --- a/go/cmd/server/server_test.go +++ b/go/internal/feast/server/grpc_server_test.go @@ -1,4 +1,4 @@ -package main +package server import ( "context" @@ -16,8 +16,8 @@ import ( "github.com/apache/arrow/go/v8/arrow/memory" "github.com/apache/arrow/go/v8/parquet/file" "github.com/apache/arrow/go/v8/parquet/pqarrow" - "github.com/feast-dev/feast/go/cmd/server/logging" "github.com/feast-dev/feast/go/internal/feast" + "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/feast-dev/feast/go/internal/test" "github.com/feast-dev/feast/go/protos/feast/serving" "github.com/feast-dev/feast/go/protos/feast/types" @@ -73,7 +73,7 @@ func getClient(ctx context.Context, offlineStoreType string, basePath string, en if err != nil { panic(err) } - servingServiceServer := newServingServiceServer(fs, loggingService) + servingServiceServer := NewGrpcServingServiceServer(fs, loggingService) serving.RegisterServingServiceServer(server, servingServiceServer) go func() { diff --git a/go/internal/feast/server/logging/feature_repo/__init__.py b/go/internal/feast/server/logging/feature_repo/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/go/cmd/server/logging/feature_repo/driver_stats.parquet b/go/internal/feast/server/logging/feature_repo/driver_stats.parquet similarity index 100% rename from go/cmd/server/logging/feature_repo/driver_stats.parquet rename to go/internal/feast/server/logging/feature_repo/driver_stats.parquet diff --git a/go/cmd/server/logging/feature_repo/example.py b/go/internal/feast/server/logging/feature_repo/example.py similarity index 100% rename from go/cmd/server/logging/feature_repo/example.py rename to go/internal/feast/server/logging/feature_repo/example.py diff --git a/go/cmd/server/logging/feature_repo/feature_store.yaml b/go/internal/feast/server/logging/feature_repo/feature_store.yaml similarity index 100% rename from go/cmd/server/logging/feature_repo/feature_store.yaml rename to go/internal/feast/server/logging/feature_repo/feature_store.yaml diff --git a/go/cmd/server/logging/filelogstorage.go b/go/internal/feast/server/logging/filelogstorage.go similarity index 100% rename from go/cmd/server/logging/filelogstorage.go rename to go/internal/feast/server/logging/filelogstorage.go diff --git a/go/cmd/server/logging/filelogstorage_test.go b/go/internal/feast/server/logging/filelogstorage_test.go similarity index 100% rename from go/cmd/server/logging/filelogstorage_test.go rename to go/internal/feast/server/logging/filelogstorage_test.go diff --git a/go/cmd/server/logging/logging.go b/go/internal/feast/server/logging/logging.go similarity index 100% rename from go/cmd/server/logging/logging.go rename to go/internal/feast/server/logging/logging.go diff --git a/go/cmd/server/logging/logging_test.go b/go/internal/feast/server/logging/logging_test.go similarity index 100% rename from go/cmd/server/logging/logging_test.go rename to go/internal/feast/server/logging/logging_test.go diff --git a/go/cmd/server/logging/offlinelogstorage.go b/go/internal/feast/server/logging/offlinelogstorage.go similarity index 100% rename from go/cmd/server/logging/offlinelogstorage.go rename to go/internal/feast/server/logging/offlinelogstorage.go diff --git a/sdk/python/feast/embedded_go/online_features_service.py b/sdk/python/feast/embedded_go/online_features_service.py index 410af1d8feb..2a8e8849f73 100644 --- a/sdk/python/feast/embedded_go/online_features_service.py +++ b/sdk/python/feast/embedded_go/online_features_service.py @@ -132,6 +132,17 @@ def get_online_features( resp = record_batch_to_online_response(record_batch) return OnlineResponse(resp) + def start_server(self, host: str, port: int, server_type: str): + server_type = server_type.lower() + if server_type == "grpc": + self._service.StartGprcServer(host, port) + elif server_type == "http": + # TODO(tsotne): implement go-based http server + raise NotImplemented("Go-based HTTP server will be implemented soon") + else: + raise ValueError("The server type must be either 'http' or 'grpc'") + + def _to_arrow(value, type_hint: Optional[ValueType]) -> pa.Array: if isinstance(value, Value_pb2.RepeatedValue): diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 95b14588499..ae3982e1690 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1306,6 +1306,18 @@ def get_online_features( native_entity_values=True, ) + def _lazy_init_go_server(self): + """Lazily initialize self._go_server if it hasn't been initialized before.""" + from feast.embedded_go.online_features_service import ( + EmbeddedOnlineFeatureServer, + ) + + # Lazily start the go server on the first request + if self._go_server is None: + self._go_server = EmbeddedOnlineFeatureServer( + str(self.repo_path.absolute()), self.config, self + ) + def _get_online_features( self, features: Union[List[str], FeatureService], @@ -1323,15 +1335,7 @@ def _get_online_features( # If Go feature server is enabled, send request to it instead of going through regular Python logic if self.config.go_feature_retrieval: - from feast.embedded_go.online_features_service import ( - EmbeddedOnlineFeatureServer, - ) - - # Lazily start the go server on the first request - if self._go_server is None: - self._go_server = EmbeddedOnlineFeatureServer( - str(self.repo_path.absolute()), self.config, self - ) + self._lazy_init_go_server() entity_native_values: Dict[str, List[Any]] if not native_entity_values: @@ -1957,7 +1961,14 @@ def _get_feature_views_to_use( @log_exceptions_and_usage def serve(self, host: str, port: int, no_access_log: bool) -> None: """Start the feature consumption server locally on a given port.""" - feature_server.start_server(self, host, port, no_access_log) + if self.config.go_feature_retrieval: + # Start go server instead of python if the flag is enabled + self._lazy_init_go_server() + # TODO(tsotne) add http/grpc flag in CLI and replace the hardcoded variable + self._go_server.start_server(host, port, "grpc") + else: + # Start the python server if go server isn't enabled + feature_server.start_server(self, host, port, no_access_log) @log_exceptions_and_usage def get_feature_server_endpoint(self) -> Optional[str]: From 6026b5fbe9a776520aa2ac337ed850d88efb87c2 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Wed, 27 Apr 2022 00:52:52 -0700 Subject: [PATCH 2/3] Format stuff Signed-off-by: Tsotne Tabidze --- Makefile | 2 +- go/embedded/online_features.go | 2 +- sdk/python/feast/embedded_go/online_features_service.py | 3 +-- sdk/python/feast/feature_store.py | 2 +- sdk/python/feast/transformation_server.py | 2 +- 5 files changed, 5 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index c1260dbe308..a808e3f4fb1 100644 --- a/Makefile +++ b/Makefile @@ -178,7 +178,7 @@ format-go: gofmt -s -w go/ lint-go: compile-protos-go - go vet ./go/internal/feast ./go/cmd/server + go vet ./go/internal/feast ./go/embedded # Docker diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index f77f644f83a..636ccd403b3 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -220,7 +220,7 @@ func (s *OnlineFeatureService) StartGprcServer(host string, port int) error { serving.RegisterServingServiceServer(grpcServer, ser) // Notify this channel when receiving interrupt or termination signals from OS - c := make(chan os.Signal) + c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) go func() { // As soon as these signals are received from OS, try to gracefully stop the gRPC server diff --git a/sdk/python/feast/embedded_go/online_features_service.py b/sdk/python/feast/embedded_go/online_features_service.py index 2a8e8849f73..42c18de263e 100644 --- a/sdk/python/feast/embedded_go/online_features_service.py +++ b/sdk/python/feast/embedded_go/online_features_service.py @@ -138,12 +138,11 @@ def start_server(self, host: str, port: int, server_type: str): self._service.StartGprcServer(host, port) elif server_type == "http": # TODO(tsotne): implement go-based http server - raise NotImplemented("Go-based HTTP server will be implemented soon") + raise NotImplementedError("Go-based HTTP server will be implemented soon") else: raise ValueError("The server type must be either 'http' or 'grpc'") - def _to_arrow(value, type_hint: Optional[ValueType]) -> pa.Array: if isinstance(value, Value_pb2.RepeatedValue): _proto_to_arrow(value) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index ae3982e1690..b37f108486b 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -111,7 +111,7 @@ class FeatureStore: repo_path: Path _registry: Registry _provider: Provider - _go_server: Optional["EmbeddedOnlineFeatureServer"] + _go_server: "EmbeddedOnlineFeatureServer" @log_exceptions def __init__( diff --git a/sdk/python/feast/transformation_server.py b/sdk/python/feast/transformation_server.py index 83f4af749e3..8e0efd73137 100644 --- a/sdk/python/feast/transformation_server.py +++ b/sdk/python/feast/transformation_server.py @@ -2,10 +2,10 @@ import sys from concurrent import futures -import grpc import pyarrow as pa from grpc_reflection.v1alpha import reflection +import grpc from feast.errors import OnDemandFeatureViewNotFoundException from feast.feature_store import FeatureStore from feast.protos.feast.serving.TransformationService_pb2 import ( From d6e98a2655e89c6c8a52751cfe907978e7a72b20 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Wed, 27 Apr 2022 10:45:23 -0700 Subject: [PATCH 3/3] Split grpc & http methods in python and sort imports correctly Signed-off-by: Tsotne Tabidze --- .../feast/embedded_go/online_features_service.py | 11 ++--------- sdk/python/feast/feature_store.py | 4 ++-- sdk/python/feast/transformation_server.py | 2 +- 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/embedded_go/online_features_service.py b/sdk/python/feast/embedded_go/online_features_service.py index 42c18de263e..5bb27e2ae03 100644 --- a/sdk/python/feast/embedded_go/online_features_service.py +++ b/sdk/python/feast/embedded_go/online_features_service.py @@ -132,15 +132,8 @@ def get_online_features( resp = record_batch_to_online_response(record_batch) return OnlineResponse(resp) - def start_server(self, host: str, port: int, server_type: str): - server_type = server_type.lower() - if server_type == "grpc": - self._service.StartGprcServer(host, port) - elif server_type == "http": - # TODO(tsotne): implement go-based http server - raise NotImplementedError("Go-based HTTP server will be implemented soon") - else: - raise ValueError("The server type must be either 'http' or 'grpc'") + def start_grpc_server(self, host: str, port: int): + self._service.StartGprcServer(host, port) def _to_arrow(value, type_hint: Optional[ValueType]) -> pa.Array: diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index b37f108486b..13c73612f09 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1964,8 +1964,8 @@ def serve(self, host: str, port: int, no_access_log: bool) -> None: if self.config.go_feature_retrieval: # Start go server instead of python if the flag is enabled self._lazy_init_go_server() - # TODO(tsotne) add http/grpc flag in CLI and replace the hardcoded variable - self._go_server.start_server(host, port, "grpc") + # TODO(tsotne) add http/grpc flag in CLI and call appropriate method here depending on that + self._go_server.start_grpc_server(host, port) else: # Start the python server if go server isn't enabled feature_server.start_server(self, host, port, no_access_log) diff --git a/sdk/python/feast/transformation_server.py b/sdk/python/feast/transformation_server.py index 8e0efd73137..83f4af749e3 100644 --- a/sdk/python/feast/transformation_server.py +++ b/sdk/python/feast/transformation_server.py @@ -2,10 +2,10 @@ import sys from concurrent import futures +import grpc import pyarrow as pa from grpc_reflection.v1alpha import reflection -import grpc from feast.errors import OnDemandFeatureViewNotFoundException from feast.feature_store import FeatureStore from feast.protos.feast.serving.TransformationService_pb2 import (