Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Cleanup
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Mar 10, 2022
commit 39a66e3c7306a1d7ae09aefd0a49a60beb6de141
7 changes: 5 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,10 @@ def apply(
service.name, project=self.project, commit=False
)

# If a go server is running, kill it so that it can be recreated in `update_infra` with
# the latest registry state.
self.kill_go_server()

self._get_provider().update_infra(
project=self.project,
tables_to_delete=views_to_delete if not partial else [],
Expand All @@ -750,8 +754,7 @@ def teardown(self):

entities = self.list_entities()

if self._go_server:
self._go_server.kill_go_server_explicitly()
self.kill_go_server()

self._get_provider().teardown_infra(self.project, tables, entities)
self._registry.teardown()
Expand Down
17 changes: 10 additions & 7 deletions sdk/python/feast/go_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,28 +246,28 @@ def __init__(

def run(self):
# Target function of the thread class
_logger.info(
_logger.debug(
"%s Started monitoring thread to keep go feature server alive", self.ident
)
try:
while not self._is_cancelled.is_set():

# If we fail to connect to grpc stub, terminate subprocess and repeat
_logger.info("%s Connecting to subprocess", self.ident)
_logger.debug("%s Connecting to subprocess", self.ident)
if not self._shared_connection.connect():
_logger.info(
_logger.debug(
"%s Failed to connect, killing and retrying", self.ident
)
self._shared_connection.kill_process()
continue
else:
_logger.info(
_logger.debug(
"%s Go feature server started, process: %s",
self.ident,
self._shared_connection._process.pid,
)
self._go_server_started.set()
_logger.info(
_logger.debug(
"%s is_cancelled status: %s", self.ident, self._is_cancelled
)
while not self._is_cancelled.is_set():
Expand All @@ -276,7 +276,7 @@ def run(self):
self._shared_connection.wait_for_process(3600)
Comment thread
achals marked this conversation as resolved.
except subprocess.TimeoutExpired:
pass
_logger.info(
_logger.debug(
"%s No longer waiting for process: %s, %s, %s",
self.ident,
self._shared_connection._process.pid,
Expand All @@ -290,6 +290,9 @@ def run(self):
self._shared_connection.kill_process()

def stop(self):
# _logger.info("%s Stopping monitoring thread and terminating go feature server", self.ident)
_logger.debug(
"%s Stopping monitoring thread and terminating go feature server",
self.ident,
)
self._is_cancelled.set()
self._shared_connection.kill_process()