Skip to content

Commit 0632fdf

Browse files
committed
move ingestion job related code to contrib
1 parent 15ea27c commit 0632fdf

14 files changed

Lines changed: 441 additions & 257 deletions

File tree

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
package feast.common.it;public class ExternalApp {
2+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2019 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.jc.grpc;
18+
19+
import com.google.api.gax.rpc.InvalidArgumentException;
20+
import com.google.protobuf.InvalidProtocolBufferException;
21+
import feast.auth.service.AuthorizationService;
22+
import feast.common.interceptors.GrpcMessageInterceptor;
23+
import feast.jc.config.FeastProperties;
24+
import feast.jc.service.JobService;
25+
import feast.proto.core.CoreServiceGrpc.CoreServiceImplBase;
26+
import feast.proto.core.CoreServiceProto.*;
27+
import feast.proto.core.FeatureSetProto.FeatureSet;
28+
import io.grpc.Status;
29+
import io.grpc.StatusRuntimeException;
30+
import io.grpc.stub.StreamObserver;
31+
import lombok.extern.slf4j.Slf4j;
32+
import net.devh.boot.grpc.server.service.GrpcService;
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.security.access.AccessDeniedException;
35+
import org.springframework.security.core.context.SecurityContextHolder;
36+
37+
import java.util.List;
38+
import java.util.NoSuchElementException;
39+
import java.util.stream.Collectors;
40+
41+
/** Implementation of the feast core GRPC service. */
42+
@Slf4j
43+
@GrpcService(interceptors = {GrpcMessageInterceptor.class})
44+
public class CoreServiceImpl extends CoreServiceImplBase {
45+
46+
private final FeastProperties feastProperties;
47+
48+
private JobService jobService;
49+
50+
@Autowired
51+
public CoreServiceImpl(
52+
JobService jobService,
53+
FeastProperties feastProperties) {
54+
this.jobService = jobService;
55+
this.feastProperties = feastProperties;
56+
}
57+
58+
@Override
59+
public void listIngestionJobs(
60+
ListIngestionJobsRequest request,
61+
StreamObserver<ListIngestionJobsResponse> responseObserver) {
62+
try {
63+
ListIngestionJobsResponse response = this.jobService.listJobs(request);
64+
responseObserver.onNext(response);
65+
responseObserver.onCompleted();
66+
} catch (InvalidArgumentException e) {
67+
log.error("Received an invalid request on calling listIngestionJobs method:", e);
68+
responseObserver.onError(
69+
Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException());
70+
} catch (Exception e) {
71+
log.error("Unexpected exception on calling listIngestionJobs method:", e);
72+
responseObserver.onError(
73+
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
74+
}
75+
}
76+
77+
@Override
78+
public void restartIngestionJob(
79+
RestartIngestionJobRequest request,
80+
StreamObserver<RestartIngestionJobResponse> responseObserver) {
81+
try {
82+
RestartIngestionJobResponse response = this.jobService.restartJob(request);
83+
responseObserver.onNext(response);
84+
responseObserver.onCompleted();
85+
} catch (NoSuchElementException e) {
86+
log.error(
87+
"Attempted to restart an nonexistent job on calling restartIngestionJob method:", e);
88+
responseObserver.onError(
89+
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException());
90+
} catch (UnsupportedOperationException e) {
91+
log.error("Recieved an unsupported request on calling restartIngestionJob method:", e);
92+
responseObserver.onError(
93+
94+
Status.FAILED_PRECONDITION.withDescription(e.getMessage()).withCause(e).asException());
95+
} catch (Exception e) {
96+
log.error("Unexpected exception on calling restartIngestionJob method:", e);
97+
responseObserver.onError(
98+
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
99+
}
100+
}
101+
102+
@Override
103+
public void stopIngestionJob(
104+
StopIngestionJobRequest request, StreamObserver<StopIngestionJobResponse>
105+
responseObserver) {
106+
try {
107+
StopIngestionJobResponse response = this.jobService.stopJob(request);
108+
responseObserver.onNext(response);
109+
responseObserver.onCompleted();
110+
} catch (NoSuchElementException e) {
111+
log.error("Attempted to stop an nonexistent job on calling stopIngestionJob method:", e);
112+
responseObserver.onError(
113+
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException());
114+
} catch (UnsupportedOperationException e) {
115+
log.error("Recieved an unsupported request on calling stopIngestionJob method:", e);
116+
responseObserver.onError(
117+
118+
Status.FAILED_PRECONDITION.withDescription(e.getMessage()).withCause(e).asException());
119+
} catch (Exception e) {
120+
log.error("Unexpected exception on calling stopIngestionJob method:", e);
121+
responseObserver.onError(
122+
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
123+
}
124+
}
125+
}

protos/feast/core/CoreService.proto

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,27 +90,29 @@ service CoreService {
9090

9191
// Lists all projects active projects.
9292
rpc ListProjects (ListProjectsRequest) returns (ListProjectsResponse);
93-
93+
94+
// Internal API for Job Coordinator to update featureSet's status once responsible ingestion job is running
95+
rpc UpdateFeatureSetStatus(UpdateFeatureSetStatusRequest) returns (UpdateFeatureSetStatusResponse);
96+
97+
}
98+
99+
service JobCoordinatorService {
94100
// List Ingestion Jobs given an optional filter.
95-
// Returns allow ingestions matching the given request filter.
101+
// Returns allow ingestions matching the given request filter.
96102
// Returns all ingestion jobs if no filter is provided.
97103
// Returns an empty list if no ingestion jobs match the filter.
98104
rpc ListIngestionJobs(ListIngestionJobsRequest) returns (ListIngestionJobsResponse);
99105

100106
// Restart an Ingestion Job. Restarts the ingestion job with the given job id.
101107
// NOTE: Data might be lost during the restart for some job runners.
102-
// Does not support stopping a job in a transitional (ie pending, suspending, aborting),
108+
// Does not support stopping a job in a transitional (ie pending, suspending, aborting),
103109
// terminal state (ie suspended or aborted) or unknown status
104110
rpc RestartIngestionJob(RestartIngestionJobRequest) returns (RestartIngestionJobResponse);
105-
111+
106112
// Stop an Ingestion Job. Stop (Aborts) the ingestion job with the given job id.
107113
// Does nothing if the target job if already in a terminal state (ie suspended or aborted).
108114
// Does not support stopping a job in a transitional (ie pending, suspending, aborting) or unknown status
109115
rpc StopIngestionJob(StopIngestionJobRequest) returns (StopIngestionJobResponse);
110-
111-
// Internal API for Job Coordinator to update featureSet's status once responsible ingestion job is running
112-
rpc UpdateFeatureSetStatus(UpdateFeatureSetStatusRequest) returns (UpdateFeatureSetStatusResponse);
113-
114116
}
115117

116118
// Request for a single feature set

sdk/python/feast/cli.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import yaml
2323

2424
from feast.client import Client
25+
from feast.contrib.job_coordinator.client import Client as JobCoordinatorClient
2526
from feast.config import Config
2627
from feast.core.IngestionJob_pb2 import IngestionJobStatus
2728
from feast.feature_set import FeatureSet, FeatureSetRef
@@ -351,7 +352,7 @@ def ingest_job_list(job_id, feature_set_ref, store_name):
351352
feature_set_ref = FeatureSetRef.from_str(feature_set_ref)
352353

353354
# pull & render ingestion jobs as a table
354-
feast_client = Client()
355+
feast_client = JobCoordinatorClient()
355356
table = []
356357
for ingest_job in feast_client.list_ingest_jobs(
357358
job_id=job_id, feature_set_ref=feature_set_ref, store_name=store_name
@@ -370,7 +371,7 @@ def ingest_job_describe(job_id: str):
370371
Describe the ingestion job with the given id.
371372
"""
372373
# find ingestion job for id
373-
feast_client = Client()
374+
feast_client = JobCoordinatorClient()
374375
jobs = feast_client.list_ingest_jobs(job_id=job_id)
375376
if len(jobs) < 1:
376377
print(f"Ingestion Job with id {job_id} could not be found")
@@ -399,7 +400,7 @@ def ingest_job_stop(wait: bool, timeout: int, job_id: str):
399400
Stop ingestion job for id.
400401
"""
401402
# find ingestion job for id
402-
feast_client = Client()
403+
feast_client = JobCoordinatorClient()
403404
jobs = feast_client.list_ingest_jobs(job_id=job_id)
404405
if len(jobs) < 1:
405406
print(f"Ingestion Job with id {job_id} could not be found")

sdk/python/feast/client.py

Lines changed: 2 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,16 @@
5858
ListFeatureSetsResponse,
5959
ListFeaturesRequest,
6060
ListFeaturesResponse,
61-
ListIngestionJobsRequest,
6261
ListProjectsRequest,
6362
ListProjectsResponse,
64-
RestartIngestionJobRequest,
65-
StopIngestionJobRequest,
6663
)
6764
from feast.core.CoreService_pb2_grpc import CoreServiceStub
6865
from feast.core.FeatureSet_pb2 import FeatureSetStatus
6966
from feast.feature import Feature, FeatureRef
70-
from feast.feature_set import Entity, FeatureSet, FeatureSetRef
67+
from feast.feature_set import Entity, FeatureSet
7168
from feast.grpc import auth as feast_auth
7269
from feast.grpc.grpc import create_grpc_channel
73-
from feast.job import IngestJob, RetrievalJob
70+
from feast.job import RetrievalJob
7471
from feast.loaders.abstract_producer import get_producer
7572
from feast.loaders.file import export_source_to_staging_location
7673
from feast.loaders.ingest import KAFKA_CHUNK_PRODUCTION_TIMEOUT, get_feature_row_chunks
@@ -739,78 +736,6 @@ def get_online_features(
739736
response = OnlineResponse(response)
740737
return response
741738

742-
def list_ingest_jobs(
743-
self,
744-
job_id: str = None,
745-
feature_set_ref: FeatureSetRef = None,
746-
store_name: str = None,
747-
):
748-
"""
749-
List the ingestion jobs currently registered in Feast, with optional filters.
750-
Provides detailed metadata about each ingestion job.
751-
752-
Args:
753-
job_id: Select specific ingestion job with the given job_id
754-
feature_set_ref: Filter ingestion jobs by target feature set (via reference)
755-
store_name: Filter ingestion jobs by target feast store's name
756-
757-
Returns:
758-
List of IngestJobs matching the given filters
759-
"""
760-
# construct list request
761-
feature_set_ref_proto = None
762-
if feature_set_ref:
763-
feature_set_ref_proto = feature_set_ref.to_proto()
764-
list_filter = ListIngestionJobsRequest.Filter(
765-
id=job_id,
766-
feature_set_reference=feature_set_ref_proto,
767-
store_name=store_name,
768-
)
769-
request = ListIngestionJobsRequest(filter=list_filter)
770-
# make list request & unpack response
771-
response = self._core_service.ListIngestionJobs(request, metadata=self._get_grpc_metadata(),) # type: ignore
772-
ingest_jobs = [
773-
IngestJob(proto, self._core_service, auth_metadata_plugin=self._auth_metadata) for proto in response.jobs # type: ignore
774-
]
775-
776-
return ingest_jobs
777-
778-
def restart_ingest_job(self, job: IngestJob):
779-
"""
780-
Restart ingestion job currently registered in Feast.
781-
NOTE: Data might be lost during the restart for some job runners.
782-
Does not support stopping a job in a transitional (ie pending, suspending, aborting),
783-
terminal state (ie suspended or aborted) or unknown status
784-
785-
Args:
786-
job: IngestJob to restart
787-
"""
788-
request = RestartIngestionJobRequest(id=job.id)
789-
try:
790-
self._core_service.RestartIngestionJob(
791-
request, metadata=self._get_grpc_metadata(),
792-
) # type: ignore
793-
except grpc.RpcError as e:
794-
raise grpc.RpcError(e.details())
795-
796-
def stop_ingest_job(self, job: IngestJob):
797-
"""
798-
Stop ingestion job currently resgistered in Feast
799-
Does nothing if the target job if already in a terminal state (ie suspended or aborted).
800-
Does not support stopping a job in a transitional (ie pending, suspending, aborting)
801-
or in a unknown status
802-
803-
Args:
804-
job: IngestJob to restart
805-
"""
806-
request = StopIngestionJobRequest(id=job.id)
807-
try:
808-
self._core_service.StopIngestionJob(
809-
request, metadata=self._get_grpc_metadata(),
810-
) # type: ignore
811-
except grpc.RpcError as e:
812-
raise grpc.RpcError(e.details())
813-
814739
def ingest(
815740
self,
816741
feature_set: Union[str, FeatureSet],

sdk/python/feast/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class AuthProvider(Enum):
4545
CONFIG_ENABLE_AUTH_KEY = "enable_auth"
4646
CONFIG_ENABLE_AUTH_TOKEN_KEY = "auth_token"
4747
CONFIG_CORE_SERVER_SSL_CERT_KEY = "core_server_ssl_cert"
48+
CONFIG_JC_SERVER_KEY = "jc_url"
4849
CONFIG_SERVING_URL_KEY = "serving_url"
4950
CONFIG_SERVING_ENABLE_SSL_KEY = "serving_enable_ssl"
5051
CONFIG_SERVING_SERVER_SSL_CERT_KEY = "serving_server_ssl_cert"
@@ -75,6 +76,8 @@ class AuthProvider(Enum):
7576
CONFIG_ENABLE_AUTH_KEY: "False",
7677
# Path to certificate(s) to secure connection to Feast Core
7778
CONFIG_CORE_SERVER_SSL_CERT_KEY: "",
79+
# Default Feast Job Coordinator URL
80+
CONFIG_JC_SERVER_KEY: "localhost:6570",
7881
# Default Feast Serving URL
7982
CONFIG_SERVING_URL_KEY: "localhost:6565",
8083
# Enable or disable TLS/SSL to Feast Serving

sdk/python/feast/contrib/__init__.py

Whitespace-only changes.

sdk/python/feast/contrib/job_coordinator/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)