Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wait on grpc connection check
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Mar 10, 2022
commit 88e2c49052c238032a9321b9cac86af57a6a2556
12 changes: 10 additions & 2 deletions sdk/python/feast/go_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,26 +250,34 @@ def run(self):
while not self._is_cancelled:

# If we fail to connect to grpc stub, terminate subprocess and repeat
_logger.info("Connecting to subprocess")
if not self._shared_connection.connect():
_logger.info("Failed to connect, killing and retrying")
self._shared_connection.kill_process()
continue
else:
_logger.debug("Go feature server started")
self._go_server_started.set()
_logger.info("Status: %s", self._is_cancelled)
while not self._is_cancelled:
try:
# Making a blocking wait by setting timeout to a very long time so we don't waste cpu cycle
self._shared_connection.wait_for_process(3600)
Comment thread
achals marked this conversation as resolved.
except subprocess.TimeoutExpired:
pass

_logger.info(
"No longer waiting for process: %s, %s, %s",
self._shared_connection._process.pid,
self._shared_connection._process.returncode,
self._shared_connection.is_process_alive(),
)
if not self._shared_connection.is_process_alive():
break
finally:
# Main thread exits
self._shared_connection.kill_process()

def stop(self):
_logger.debug("Stopping monitoring thread and terminating go feature server")
_logger.info("Stopping monitoring thread and terminating go feature server")
self._is_cancelled = True
self._shared_connection.kill_process()
Original file line number Diff line number Diff line change
Expand Up @@ -1020,14 +1020,17 @@ def test_go_server_life_cycle(go_cycle_environment, go_data_sources):
os.kill(go_fs_pid, signal.SIGTERM)
# At the same time checking that resources are clean up properly once child process is killed
# Check that background thread has terminated
monitor_thread = None
for thread in threading.enumerate():
if thread.name == "GoServerMonitorThread":
monitor_thread = thread
assert monitor_thread
assert monitor_thread.is_alive()
monitor_thread_alive = False
monitor_thread = fs._go_server._go_server_background_thread
assert monitor_thread.daemon

print(f"Monitor thread: {monitor_thread}, {monitor_thread.ident}")

for thread in threading.enumerate():
if thread.ident == monitor_thread.ident and thread.is_alive():
monitor_thread_alive = True
assert monitor_thread_alive

# Check if go server subprocess is still active even if background thread and process are killed
go_server_still_alive = False
for proc in psutil.process_iter():
Expand Down Expand Up @@ -1055,6 +1058,7 @@ def test_go_server_life_cycle(go_cycle_environment, go_data_sources):
)
new_go_fs_pid = fs._go_server._shared_connection._process.pid
assert new_go_fs_pid != go_fs_pid
fs._go_server._shared_connection._check_grpc_connection()

# Ensure process is still running.
assert fs._go_server._shared_connection._process.poll() is None
Expand All @@ -1068,8 +1072,8 @@ def test_go_server_life_cycle(go_cycle_environment, go_data_sources):
# Ensure process is dead.
assert fs._go_server._shared_connection._process.poll() is not None
Comment thread
achals marked this conversation as resolved.
Outdated
# Ensure monitoring thread is also dead.
live_threads = [t.name for t in threading.enumerate()]
assert "GoServerMonitorThread" not in live_threads
live_threads = [t.ident for t in threading.enumerate()]
assert monitor_thread.ident not in live_threads


def response_feature_name(feature: str, full_feature_names: bool) -> str:
Expand Down