Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: Use a daemon thread to monitor the go feature server exclusively
Also reenable the tests for lifecycle management of the goserver

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Mar 10, 2022
commit 612c962b700a8123bdf9f7fc37e77e6df388324b
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ build: protos build-java build-docker build-html

# Python SDK

install-python-ci-dependencies: install-go-ci-dependencies
install-python-ci-dependencies: install-go-proto-dependencies
cd sdk/python && python -m piptools sync requirements/py$(PYTHON)-ci-requirements.txt
cd sdk/python && COMPILE_GO=true python setup.py develop

Expand Down Expand Up @@ -125,19 +125,21 @@ build-java-no-tests:

# Go SDK

install-go-ci-dependencies:
install-go-proto-dependencies:
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.26.0
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1.0

compile-protos-go: install-go-ci-dependencies
install-protoc-dependencies:
pip install grpcio-tools==1.34.0

compile-protos-go: install-go-proto-dependencies install-protoc-dependencies
python sdk/python/setup.py build_go_protos

compile-go-feature-server: compile-protos-go
go mod tidy
go build -o ${ROOT_DIR}/sdk/python/feast/binaries/goserver github.com/feast-dev/feast/go/cmd/goserver

test-go: install-go-ci-dependencies
test-go: compile-protos-go
go test ./...

format-go:
Expand Down
1 change: 0 additions & 1 deletion go/cmd/goserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type FeastEnvConfig struct {
}

// TODO: Add a proper logging library such as https://github.com/Sirupsen/logrus

func main() {

var feastEnvConfig FeastEnvConfig
Expand Down
Binary file not shown.
18 changes: 2 additions & 16 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,7 @@ class FeatureStore:

@log_exceptions
def __init__(
self,
repo_path: Optional[str] = None,
config: Optional[RepoConfig] = None,
go_server_use_thread: bool = False,
self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None,
):
"""
Creates a FeatureStore object.
Expand All @@ -135,7 +132,6 @@ def __init__(
self._registry._initialize_registry()
self._provider = get_provider(self.config, self.repo_path)
self._go_server = None
self._go_server_use_thread = go_server_use_thread

@log_exceptions
def version(self) -> str:
Expand Down Expand Up @@ -1233,11 +1229,7 @@ def get_online_features(
if self.config.go_feature_server:
# Lazily start the go server on the first request
if self._go_server is None:
self._go_server = GoServer(
str(self.repo_path.absolute()),
self.config,
self._go_server_use_thread,
)
self._go_server = GoServer(str(self.repo_path.absolute()), self.config,)
return self._go_server.get_online_features(
features, columnar, full_feature_names
)
Expand Down Expand Up @@ -1861,12 +1853,6 @@ def kill_go_server(self):
if self._go_server:
self._go_server.kill_go_server_explicitly()

def set_go_server_use_thread(self, use: bool):
if self._go_server:
self._go_server.set_use_thread(use)
else:
self._go_server_use_thread = use


def _validate_entity_values(join_key_values: Dict[str, List[Value]]):
set_of_row_lengths = {len(v) for v in join_key_values.values()}
Expand Down
150 changes: 46 additions & 104 deletions sdk/python/feast/go_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import atexit
import ctypes
import logging
import os
import pathlib
import platform
import random
import shutil
import signal
import string
import subprocess
Expand All @@ -44,13 +42,22 @@
from feast.repo_config import RepoConfig
from feast.type_map import python_values_to_proto_values

_logger = logging.getLogger(__name__)
Comment thread
felixwang9817 marked this conversation as resolved.


class GoServerConnection:
def __init__(self, config: RepoConfig, repo_path: str):
self._process: Optional[Popen[bytes]] = None
self._config = config
self._repo_path = repo_path
self.temp_dir = tempfile.TemporaryDirectory()
self._client: Optional[ServingServiceStub] = None

@property
def client(self):
if self._client:
return self._client
raise RuntimeError("Client not established with go subprocess")

def _get_unix_domain_file_path(self) -> Path:
# This method should return a file that go server should listen on and that the python channel
Expand Down Expand Up @@ -81,7 +88,7 @@ def connect(self) -> bool:
self._process = Popen([executable], cwd=cwd, env=env,)

channel = grpc.insecure_channel(f"unix:{self.sock_file}")
self.client: ServingServiceStub = ServingServiceStub(channel)
self._client = ServingServiceStub(channel)
Comment thread
felixwang9817 marked this conversation as resolved.

try:
self._check_grpc_connection()
Expand All @@ -91,11 +98,10 @@ def connect(self) -> bool:

def kill_process(self):
if self._process:
# self._process.terminate()
self._process.send_signal(signal.SIGINT)
self._process.terminate()

def is_process_alive(self):
return self._process and self._process.poll()
return self._process and self._process.poll() is None

def wait_for_process(self, timeout):
self._process.wait(timeout)
Expand All @@ -107,7 +113,7 @@ def wait_for_process(self, timeout):
wait=wait_exponential(multiplier=0.1, min=0.1, max=5),
)
def _check_grpc_connection(self):
self.client.GetFeastServingInfo(request=GetFeastServingInfoRequest())
return self.client.GetFeastServingInfo(request=GetFeastServingInfoRequest())


class GoServer:
Expand All @@ -118,57 +124,20 @@ class GoServer:
Attributes:
_repo_path: The path to the Feast repo for which this go server is defined.
_config: The RepoConfig for the Feast repo for which this go server is defined.
_go_server_use_thread: set whether or not to use a background thread to monitor go server
"""

_repo_path: str
_config: RepoConfig
_go_server_use_thread: bool

def __init__(
self, repo_path: str, config: RepoConfig, go_server_use_thread: bool = False,
):
def __init__(self, repo_path: str, config: RepoConfig):
"""Creates a GoServer object."""
self._repo_path = repo_path
self._config = config
self._go_server_started = threading.Event()
self._use_thread = go_server_use_thread
self._shared_connection = GoServerConnection(config, repo_path)
self._dev_mode = "dev" in feast.__version__
if not is_test() and self._dev_mode:
self._build_binaries()

if self._check_use_thread():
self._start_go_server_use_thread()
else:
self._start_go_server()

def _check_use_thread(self):
return self._use_thread

def set_use_thread(self, use: bool):
self._use_thread = use

def _build_binaries(self):

goos = platform.system().lower()
goarch = "amd64" if platform.machine() == "x86_64" else "arm64"
binaries_path = (pathlib.Path(__file__).parent / "../feast/binaries").resolve()
binaries_path_abs = str(binaries_path.absolute())
if binaries_path.exists():
shutil.rmtree(binaries_path_abs)
os.mkdir(binaries_path_abs)

subprocess.check_output(
[
"go",
"build",
"-o",
f"{binaries_path_abs}/goserver_{goos}_{goarch}",
"github.com/feast-dev/feast/go/cmd/goserver",
],
env={"GOOS": goos, "GOARCH": goarch, **os.environ},
)
self._start_go_server_use_thread()

def get_online_features(
self,
Expand Down Expand Up @@ -198,7 +167,7 @@ def get_online_features(
ValueError: If some other error occurs.
"""
# Wait for go server subprocess to restart before asking for features
if self._check_use_thread() and not self._go_server_started.is_set():
if not self._go_server_started.is_set():
self._go_server_started.wait()

request = GetOnlineFeaturesRequest(full_feature_names=full_feature_names)
Expand All @@ -220,9 +189,7 @@ def get_online_features(
if rpc_error.code() == grpc.StatusCode.UNAVAILABLE:
# If the server became unavailable, it could mean that the subprocess died or fell
# into a bad state, so the resolution is to wait for go server to restart in the background
if not self._check_use_thread():
self._start_go_server()
elif not self._go_server_started.is_set():
if not self._go_server_started.is_set():
self._go_server_started.wait()
# Retry request with the new Go subprocess
response = self._shared_connection.client.GetOnlineFeatures(
Expand All @@ -249,96 +216,71 @@ def get_online_features(

def _start_go_server_use_thread(self):

self._go_server_background_thread = GoServerBackgroundThread(
"GoServerBackgroundThread",
self._shared_connection,
self._go_server_started,
self._go_server_background_thread = GoServerMonitorThread(
"GoServerMonitorThread", self._shared_connection, self._go_server_started
)
self._go_server_background_thread.start()
atexit.register(lambda: self._go_server_background_thread.stop_go_server())
atexit.register(lambda: self._go_server_background_thread.stop())
signal.signal(
signal.SIGTERM,
lambda sig, frame: self._go_server_background_thread.stop_go_server(),
signal.SIGTERM, lambda sig, frame: self._go_server_background_thread.stop(),
)
signal.signal(
signal.SIGINT,
lambda sig, frame: self._go_server_background_thread.stop_go_server(),
signal.SIGINT, lambda sig, frame: self._go_server_background_thread.stop(),
)

# Wait for go server subprocess to start for the first time before returning
self._go_server_started.wait()

def _start_go_server(self):
if self._shared_connection.is_process_alive():
self._shared_connection.kill_process()

self._shared_connection.connect()
atexit.register(lambda: self._shared_connection.kill_process())
signal.signal(
signal.SIGTERM, lambda sig, frame: self._shared_connection.kill_process()
)
signal.signal(
signal.SIGINT, lambda sig, frame: self._shared_connection.kill_process()
)

def kill_go_server_explicitly(self):
if self._check_use_thread():
self._go_server_background_thread.stop_go_server()
else:
self._shared_connection.kill_process()
self._go_server_background_thread.stop()


# https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/
class GoServerBackgroundThread(threading.Thread):
class GoServerMonitorThread(threading.Thread):
def __init__(
self,
name: str,
shared_connection: GoServerConnection,
go_server_started: threading.Event,
go_server_first_started: threading.Event,
):
threading.Thread.__init__(self)
self.name = name
self.setName(name)
Comment thread
achals marked this conversation as resolved.
Outdated
self._shared_connection = shared_connection
self._go_server_started = go_server_started
self._is_cancelled = False
self.setDaemon(True)
Comment thread
achals marked this conversation as resolved.
Outdated
self._go_server_started = go_server_first_started

def run(self):
# Target function of the thread class
_logger.info("Started GoServerMonitorThread")
try:
while True:
self._go_server_started.clear()
while not self._is_cancelled:

# If we fail to connect to grpc stub, terminate subprocess and repeat
if not self._shared_connection.connect():
self._shared_connection.kill_process()
continue
self._go_server_started.set()
while True:
else:
_logger.info("Set go server first started event")
self._go_server_started.set()
while not self._is_cancelled:
try:
# Making a blocking wait by setting timeout to a very long time so we don't waste cpu cycle
self._shared_connection.wait_for_process(3600)
Comment thread
achals marked this conversation as resolved.
except subprocess.TimeoutExpired:
pass

_logger.info(
"Process state: %s %s",
Comment thread
felixwang9817 marked this conversation as resolved.
Outdated
self._shared_connection._process.pid,
self._shared_connection._process.returncode,
)
if not self._shared_connection.is_process_alive():
break
finally:
# Main thread exits
self._shared_connection.kill_process()

def stop_go_server(self):
thread_id = self._get_id()
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
thread_id, ctypes.py_object(SystemExit)
)
# TODO: Review that kill process here but run also has to stop
if res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
else:
self._shared_connection.kill_process()
Comment thread
achals marked this conversation as resolved.

def _get_id(self):
# returns id of the respective thread
if hasattr(self, "_thread_id"):
return self._thread_id
for id, thread in threading._active.items():
if thread is self:
return id
def stop(self):
_logger.info("Stopping GoServerMonitorThread")
Comment thread
achals marked this conversation as resolved.
Outdated
self._is_cancelled = True
self._shared_connection.kill_process()
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
37 changes: 0 additions & 37 deletions sdk/python/go_build.py

This file was deleted.

Loading