Skip to content
Prev Previous commit
Next Next commit
Make max_workers parameter configurable.
Signed-off-by: mehmettokgoz <mehmet.tokgoz@hazelcast.com>
Signed-off-by: Danny C <d.chiao@gmail.com>
  • Loading branch information
Mehmet Tokgöz authored and adchia committed Sep 5, 2023
commit 5ea58d12ea1beaa8d08e18f9ab42b9e5af11a85a
18 changes: 13 additions & 5 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,26 +701,34 @@ def serve_command(
type=click.STRING,
default="localhost:50051",
show_default=True,
help="Specify an address for the server",
help="Address of the gRPC server",
)
@click.option(
"--stream_feature_view",
"-sfv",
type=click.STRING,
show_default=False,
help="Specify the stream feature view to update",
help="Stream feature view to push streaming features",
)
@click.option(
"--push_mode",
"-to",
type=click.STRING,
default="online",
show_default=False,
help="Specify the stream feature view to update",
help="Push mode for the streaming data",
)
@click.option(
"--max_workers",
"-w",
type=click.INT,
default=10,
show_default=False,
help="The maximum number of threads that can be used to execute the gRPC calls",
)
@click.pass_context
def listen_command(
ctx: click.Context, address: str, stream_feature_view: str, push_mode: str
ctx: click.Context, address: str, stream_feature_view: str, push_mode: str, max_workers: int
):
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
Expand All @@ -733,7 +741,7 @@ def listen_command(
to = PushMode.OFFLINE
else:
to = PushMode.ONLINE_AND_OFFLINE
server = GetGrpcServer(store, address, stream_feature_view, to)
server = GetGrpcServer(store, address, stream_feature_view, to, max_workers)
server.start()


Expand Down
8 changes: 4 additions & 4 deletions sdk/python/feast/infra/contrib/grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ def GrpcIngestFeature(self, request, context):


class GrpcIngestFeatureServer:
def __init__(self, address, fs: FeatureStore, sfv_name, to):
def __init__(self, address, fs: FeatureStore, sfv_name, to, max_workers):
Comment thread
adchia marked this conversation as resolved.
Outdated
Comment thread
adchia marked this conversation as resolved.
Outdated
self.address = address
self.fs = fs
self.sfv = sfv_name
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
add_GrpcIngestFeatureServiceServicer_to_server(
Comment thread
adchia marked this conversation as resolved.
Outdated
GrpcIngestFeatureService(self.fs, self.sfv, to), self.server
)
Expand All @@ -51,5 +51,5 @@ def start(self):
self.server.wait_for_termination()


def GetGrpcServer(fs: FeatureStore, address: str, sfv: str, to: PushMode):
return GrpcIngestFeatureServer(address, fs, sfv, to)
def GetGrpcServer(fs: FeatureStore, address: str, sfv: str, to: PushMode, max_workers):
Comment thread
adchia marked this conversation as resolved.
Outdated
return GrpcIngestFeatureServer(address, fs, sfv, to, max_workers)