Skip to content

Commit 05d47c2

Browse files
authored
Add protos and update CI (#23)
1 parent e1e3d0c commit 05d47c2

File tree

19 files changed

+307
-59
lines changed

19 files changed

+307
-59
lines changed

Makefile

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
MVN := mvn ${MAVEN_EXTRA_OPTS}
22
ROOT_DIR := $(shell dirname $(realpath $(firstword $(MAKEFILE_LIST))))
33

4+
PROTO_TYPE_SUBDIRS = api
5+
PROTO_SERVICE_SUBDIRS = api
6+
47
# Make sure env vars are available to submakes
58
export
69

@@ -26,8 +29,14 @@ format-python:
2629
install-python-ci-dependencies:
2730
pip install --no-cache-dir -r python/requirements-ci.txt
2831

32+
compile-protos-python: install-python-ci-dependencies
33+
@$(eval FEAST_PATH=`python -c "import feast; import os; print(os.path.dirname(feast.__file__))"`)
34+
@$(foreach dir,$(PROTO_TYPE_SUBDIRS),cd ${ROOT_DIR}/protos; python -m grpc_tools.protoc -I. -I$(FEAST_PATH)/protos/ --python_out=../python/ --mypy_out=../python/ feast_spark/$(dir)/*.proto;)
35+
@$(foreach dir,$(PROTO_SERVICE_SUBDIRS),cd ${ROOT_DIR}/protos; python -m grpc_tools.protoc -I. -I$(FEAST_PATH)/protos/ --grpc_python_out=../python/ feast_spark/$(dir)/*.proto;)
36+
cd ${ROOT_DIR}/protos; python -m grpc_tools.protoc -I. --python_out=../python/ --grpc_python_out=../python/ --mypy_out=../python/ feast_spark/third_party/grpc/health/v1/*.proto
37+
2938
# Supports feast-dev repo master branch
30-
install-python: install-python-ci-dependencies
39+
install-python: compile-protos-python
3140
cd ${ROOT_DIR}; python -m pip install -e python
3241

3342
lint-python:

infra/docker/jobservice/Dockerfile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
FROM jupyter/pyspark-notebook:399cbb986c6b
22

33
USER root
4-
WORKDIR /feast
4+
WORKDIR /app
55

66
COPY python python
7+
COPY protos protos
78
COPY Makefile Makefile
89

910
# Install necessary tools for later steps
@@ -12,7 +13,7 @@ RUN apt-get update && apt-get -y install make git wget
1213
# Install Feast SDK
1314
RUN git init .
1415
COPY README.md README.md
15-
RUN pip install -U -e python
16+
RUN make install-python
1617
RUN pip install "s3fs" "boto3" "urllib3>=1.25.4"
1718

1819
#

infra/scripts/test-end-to-end-gcp.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ CMD=$(printf '%s' \
6363
"--core-url feast-release-feast-core:6565 " \
6464
"--serving-url feast-release-feast-online-serving:6566 " \
6565
"--job-service-url js-feast-jobservice:6568 " \
66-
"--kafka-brokers 10.128.0.103:9094 --bq-project kf-feast")
66+
"--kafka-brokers 10.128.0.103:9094 --bq-project kf-feast --feast-version dev")
6767

6868
# Delete old test running pod if it exists
6969
kubectl delete pod -n "$NAMESPACE" ci-test-runner 2>/dev/null || true

pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,8 @@
1010
<module>spark/ingestion</module>
1111
</modules>
1212

13+
<properties>
14+
<revision>0.9.0-SNAPSHOT</revision>
15+
</properties>
16+
1317
</project>
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
//
2+
// Copyright 2018 The Feast Authors
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// https://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
17+
syntax = "proto3";
18+
package feast_spark.api;
19+
20+
option go_package = "github.com/feast-dev/feast-spark/sdk/go/protos/feast_spark/api";
21+
option java_outer_classname = "JobServiceProto";
22+
option java_package = "feast_spark.proto.jobservice";
23+
24+
import "google/protobuf/timestamp.proto";
25+
import "feast/core/DataSource.proto";
26+
27+
28+
service JobService {
29+
// Start job to ingest data from offline store into online store
30+
rpc StartOfflineToOnlineIngestionJob (StartOfflineToOnlineIngestionJobRequest) returns (StartOfflineToOnlineIngestionJobResponse);
31+
32+
// Produce a training dataset, return a job id that will provide a file reference
33+
rpc GetHistoricalFeatures (GetHistoricalFeaturesRequest) returns (GetHistoricalFeaturesResponse);
34+
35+
// Start job to ingest data from stream into online store
36+
rpc StartStreamToOnlineIngestionJob (StartStreamToOnlineIngestionJobRequest) returns (StartStreamToOnlineIngestionJobResponse);
37+
38+
// List all types of jobs
39+
rpc ListJobs (ListJobsRequest) returns (ListJobsResponse);
40+
41+
// Cancel a single job
42+
rpc CancelJob (CancelJobRequest) returns (CancelJobResponse);
43+
44+
// Get details of a single job
45+
rpc GetJob (GetJobRequest) returns (GetJobResponse);
46+
}
47+
48+
49+
enum JobType {
50+
INVALID_JOB = 0;
51+
BATCH_INGESTION_JOB = 1;
52+
STREAM_INGESTION_JOB = 2;
53+
RETRIEVAL_JOB = 4;
54+
}
55+
56+
enum JobStatus {
57+
JOB_STATUS_INVALID = 0;
58+
// The Job has be registered and waiting to get scheduled to run
59+
JOB_STATUS_PENDING = 1;
60+
// The Job is currently processing its task
61+
JOB_STATUS_RUNNING = 2;
62+
// The Job has successfully completed its task
63+
JOB_STATUS_DONE = 3;
64+
// The Job has encountered an error while processing its task
65+
JOB_STATUS_ERROR = 4;
66+
}
67+
68+
message Job {
69+
// Identifier of the Job
70+
string id = 1;
71+
// Type of the Job
72+
JobType type = 2;
73+
// Current job status
74+
JobStatus status = 3;
75+
// Deterministic hash of the Job
76+
string hash = 4;
77+
// Start time of the Job
78+
google.protobuf.Timestamp start_time = 5;
79+
80+
message RetrievalJobMeta {
81+
string output_location = 1;
82+
}
83+
84+
message OfflineToOnlineMeta {
85+
string table_name = 1;
86+
}
87+
88+
message StreamToOnlineMeta {
89+
string table_name = 1;
90+
}
91+
92+
// JobType specific metadata on the job
93+
oneof meta {
94+
RetrievalJobMeta retrieval = 6;
95+
OfflineToOnlineMeta batch_ingestion = 7;
96+
StreamToOnlineMeta stream_ingestion = 8;
97+
}
98+
99+
// Path to Spark job logs, if available
100+
string log_uri = 9;
101+
}
102+
103+
// Ingest data from offline store into online store
104+
message StartOfflineToOnlineIngestionJobRequest {
105+
// Feature table to ingest
106+
string project = 1;
107+
string table_name = 2;
108+
109+
// Start of time range for source data from offline store
110+
google.protobuf.Timestamp start_date = 3;
111+
112+
// End of time range for source data from offline store
113+
google.protobuf.Timestamp end_date = 4;
114+
}
115+
116+
message StartOfflineToOnlineIngestionJobResponse {
117+
// Job ID assigned by Feast
118+
string id = 1;
119+
120+
// Job start time
121+
google.protobuf.Timestamp job_start_time = 2;
122+
123+
// Feature table associated with the job
124+
string table_name = 3;
125+
126+
// Path to Spark job logs, if available
127+
string log_uri = 4;
128+
}
129+
130+
message GetHistoricalFeaturesRequest {
131+
// List of feature references that are being retrieved
132+
repeated string feature_refs = 1;
133+
134+
// Batch DataSource that can be used to obtain entity values for historical retrieval.
135+
// For each entity value, a feature value will be retrieved for that value/timestamp
136+
// Only 'BATCH_*' source types are supported.
137+
// Currently only BATCH_FILE source type is supported.
138+
feast.core.DataSource entity_source = 2;
139+
140+
// Optional field to specify project name override. If specified, uses the
141+
// given project for retrieval. Overrides the projects specified in
142+
// Feature References if both are specified.
143+
string project = 3;
144+
145+
// Specifies the path in a bucket to write the exported feature data files
146+
// Export to AWS S3 - s3://path/to/features
147+
// Export to GCP GCS - gs://path/to/features
148+
string output_location = 4;
149+
150+
// Specify format name for output, eg. parquet
151+
string output_format = 5;
152+
}
153+
154+
message GetHistoricalFeaturesResponse {
155+
// Export Job with ID assigned by Feast
156+
string id = 1;
157+
158+
// Uri to the join result output file
159+
string output_file_uri = 2;
160+
161+
// Job start time
162+
google.protobuf.Timestamp job_start_time = 3;
163+
164+
// Path to Spark job logs, if available
165+
string log_uri = 4;
166+
167+
}
168+
169+
message StartStreamToOnlineIngestionJobRequest {
170+
// Feature table to ingest
171+
string project = 1;
172+
string table_name = 2;
173+
}
174+
175+
message StartStreamToOnlineIngestionJobResponse {
176+
// Job ID assigned by Feast
177+
string id = 1;
178+
179+
// Job start time
180+
google.protobuf.Timestamp job_start_time = 2;
181+
182+
// Feature table associated with the job
183+
string table_name = 3;
184+
185+
// Path to Spark job logs, if available
186+
string log_uri = 4;
187+
}
188+
189+
message ListJobsRequest {
190+
bool include_terminated = 1;
191+
string table_name = 2;
192+
string project = 3;
193+
}
194+
195+
message ListJobsResponse {
196+
repeated Job jobs = 1;
197+
}
198+
199+
message GetJobRequest {
200+
string job_id = 1;
201+
}
202+
203+
message GetJobResponse {
204+
Job job = 1;
205+
}
206+
207+
message CancelJobRequest{
208+
string job_id = 1;
209+
}
210+
211+
message CancelJobResponse {}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
syntax = "proto3";
2+
3+
package grpc.health.v1;
4+
5+
option java_package = "io.grpc.health.v1";
6+
option java_outer_classname = "HealthProto";
7+
8+
message HealthCheckRequest {
9+
string service = 1;
10+
}
11+
12+
enum ServingStatus {
13+
UNKNOWN = 0;
14+
SERVING = 1;
15+
NOT_SERVING = 2;
16+
}
17+
18+
message HealthCheckResponse {
19+
ServingStatus status = 1;
20+
}
21+
22+
service Health {
23+
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
24+
}

python/feast_spark/api/__init__.py

Whitespace-only changes.

python/feast_spark/client.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,28 @@
22
import uuid
33
from datetime import datetime
44
from itertools import groupby
5-
from typing import TYPE_CHECKING, List, Optional, Union
5+
from typing import List, Optional, Union, cast
66

77
import pandas as pd
88

99
import feast
10+
from feast.config import Config
1011
from feast.constants import ConfigOptions as opt
11-
from feast.core.JobService_pb2 import (
12-
GetHistoricalFeaturesRequest,
13-
GetJobRequest,
14-
ListJobsRequest,
15-
StartOfflineToOnlineIngestionJobRequest,
16-
StartStreamToOnlineIngestionJobRequest,
17-
)
18-
from feast.core.JobService_pb2_grpc import JobServiceStub
1912
from feast.data_source import BigQuerySource, FileSource
2013
from feast.grpc.grpc import create_grpc_channel
2114
from feast.staging.entities import (
2215
stage_entities_to_bq,
2316
stage_entities_to_fs,
2417
table_reference_from_string,
2518
)
19+
from feast_spark.api.JobService_pb2 import (
20+
GetHistoricalFeaturesRequest,
21+
GetJobRequest,
22+
ListJobsRequest,
23+
StartOfflineToOnlineIngestionJobRequest,
24+
StartStreamToOnlineIngestionJobRequest,
25+
)
26+
from feast_spark.api.JobService_pb2_grpc import JobServiceStub
2627
from feast_spark.pyspark.abc import RetrievalJob, SparkJob
2728
from feast_spark.pyspark.launcher import (
2829
get_job_by_id,
@@ -39,9 +40,6 @@
3940
get_remote_job_from_proto,
4041
)
4142

42-
if TYPE_CHECKING:
43-
from feast.config import Config
44-
4543

4644
class Client:
4745
_feast: feast.Client
@@ -51,7 +49,7 @@ def __init__(self, feast_client: feast.Client):
5149
self._job_service_stub: Optional[JobServiceStub] = None
5250

5351
@property
54-
def config(self) -> "Config":
52+
def config(self) -> Config:
5553
return self._feast._config
5654

5755
@property
@@ -332,7 +330,7 @@ def list_jobs(
332330
return list_jobs(include_terminated, self, table_name)
333331
else:
334332
request = ListJobsRequest(
335-
include_terminated=include_terminated, table_name=table_name
333+
include_terminated=include_terminated, table_name=cast(str, table_name)
336334
)
337335
response = self._job_service.ListJobs(request)
338336
return [

0 commit comments

Comments
 (0)