Skip to content

Commit de4313b

Browse files
authored
Add historical retrieval via job service (feast-dev#1107)
* historical retrieval via job service Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> * rename things to address comments Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> * add test Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
1 parent 24de261 commit de4313b

12 files changed

Lines changed: 389 additions & 82 deletions

File tree

protos/feast/core/JobService.proto

Lines changed: 21 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ option java_package = "feast.proto.core";
2323

2424
import "google/protobuf/timestamp.proto";
2525
import "feast/core/DataSource.proto";
26-
import "feast/serving/ServingService.proto";
26+
2727

2828
service JobService {
2929
// Start job to ingest data from offline store into online store
@@ -38,8 +38,8 @@ service JobService {
3838
// List all types of jobs
3939
rpc ListJobs (ListJobsRequest) returns (ListJobsResponse);
4040

41-
// Stop a single job
42-
rpc StopJob (StopJobRequest) returns (StopJobResponse);
41+
// Cancel a single job
42+
rpc CancelJob (CancelJobRequest) returns (CancelJobResponse);
4343

4444
// Get details of a single job
4545
rpc GetJob (GetJobRequest) returns (GetJobResponse);
@@ -48,9 +48,9 @@ service JobService {
4848

4949
enum JobType {
5050
INVALID_JOB = 0;
51-
OFFLINE_TO_ONLINE_JOB = 1;
52-
STREAM_TO_ONLINE_JOB = 2;
53-
EXPORT_JOB = 4;
51+
BATCH_INGESTION_JOB = 1;
52+
STREAM_INGESTION_JOB = 2;
53+
RETRIEVAL_JOB = 4;
5454
}
5555

5656
enum JobStatus {
@@ -68,42 +68,26 @@ enum JobStatus {
6868
message Job {
6969
// Identifier of the Job
7070
string id = 1;
71-
// External Identifier of the Job assigned by the Spark executor
72-
string external_id = 2;
7371
// Type of the Job
74-
JobType type = 3;
72+
JobType type = 2;
7573
// Current job status
76-
JobStatus status = 4;
77-
// Timestamp on when the job was is created
78-
google.protobuf.Timestamp created_timestamp = 5;
79-
// Timestamp on when the job has stopped.
80-
google.protobuf.Timestamp stop_timestamp = 6;
81-
82-
message ExportJobMeta {
83-
// Glob of the exported files that should be retrieved to reconstruct
84-
// the dataframe with retrieved features.
85-
repeated string file_glob = 1;
86-
// The Historical Features request that triggered this export job
87-
GetHistoricalFeaturesRequest request = 2;
74+
JobStatus status = 3;
75+
76+
message RetrievalJobMeta {
77+
string output_location = 4;
8878
}
8979

9080
message OfflineToOnlineMeta {
91-
// Reference to the Feature Table being populated by this job
92-
string project = 1;
93-
string table_name = 2;
9481
}
9582

9683
message StreamToOnlineMeta {
97-
// Reference to the Feature Table being populated by this job
98-
string project = 1;
99-
string table_name = 2;
10084
}
10185

10286
// JobType specific metadata on the job
10387
oneof meta {
104-
ExportJobMeta export = 7;
105-
OfflineToOnlineMeta offline_to_online = 8;
106-
StreamToOnlineMeta stream_to_online = 9;
88+
RetrievalJobMeta retrieval = 5;
89+
OfflineToOnlineMeta batch_ingestion = 6;
90+
StreamToOnlineMeta stream_ingestion = 7;
10791
}
10892
}
10993

@@ -127,13 +111,13 @@ message StartOfflineToOnlineIngestionJobResponse {
127111

128112
message GetHistoricalFeaturesRequest {
129113
// List of features that are being retrieved
130-
repeated feast.serving.FeatureReferenceV2 features = 1;
114+
repeated string feature_refs = 1;
131115

132116
// Batch DataSource that can be used to obtain entity values for historical retrieval.
133117
// For each entity value, a feature value will be retrieved for that value/timestamp
134118
// Only 'BATCH_*' source types are supported.
135119
// Currently only BATCH_FILE source type is supported.
136-
DataSource entities_source = 2;
120+
DataSource entity_source = 2;
137121

138122
// Optional field to specify project name override. If specified, uses the
139123
// given project for retrieval. Overrides the projects specified in
@@ -143,12 +127,13 @@ message GetHistoricalFeaturesRequest {
143127
// Specifies the path in a bucket to write the exported feature data files
144128
// Export to AWS S3 - s3://path/to/features
145129
// Export to GCP GCS - gs://path/to/features
146-
string destination_path = 4;
130+
string output_location = 4;
147131
}
148132

149133
message GetHistoricalFeaturesResponse {
150134
// Export Job with ID assigned by Feast
151135
string id = 1;
136+
string output_file_uri = 2;
152137
}
153138

154139
message StartStreamToOnlineIngestionJobRequest {
@@ -163,13 +148,7 @@ message StartStreamToOnlineIngestionJobResponse {
163148
}
164149

165150
message ListJobsRequest {
166-
Filter filter = 1;
167-
message Filter {
168-
// Filter jobs by job type
169-
JobType type = 1;
170-
// Filter jobs by current job status
171-
JobStatus status = 2;
172-
}
151+
bool include_terminated = 1;
173152
}
174153

175154
message ListJobsResponse {
@@ -184,14 +163,8 @@ message GetJobResponse {
184163
Job job = 1;
185164
}
186165

187-
message RestartJobRequest {
188-
string job_id = 1;
189-
}
190-
191-
message RestartJobResponse {}
192-
193-
message StopJobRequest{
166+
message CancelJobRequest{
194167
string job_id = 1;
195168
}
196169

197-
message StopJobResponse {}
170+
message CancelJobResponse {}

sdk/python/feast/cli.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,8 +471,16 @@ def list_jobs():
471471
help="Path to entity df in CSV format. It is assumed to have event_timestamp column and a header.",
472472
required=True,
473473
)
474+
@click.option(
475+
"--entity-df-dtype",
476+
"-d",
477+
help="Dtypes for entity df, in JSON format",
478+
required=False,
479+
)
474480
@click.option("--destination", "-d", help="Destination", default="")
475-
def get_historical_features(features: str, entity_df_path: str, destination: str):
481+
def get_historical_features(
482+
features: str, entity_df_path: str, entity_df_dtype: str, destination: str
483+
):
476484
"""
477485
Get historical features
478486
"""
@@ -481,7 +489,14 @@ def get_historical_features(features: str, entity_df_path: str, destination: str
481489
client = Client()
482490

483491
# TODO: clean this up
484-
entity_df = pandas.read_csv(entity_df_path, sep=None, engine="python",)
492+
493+
if entity_df_dtype:
494+
dtype = json.loads(entity_df_dtype)
495+
entity_df = pandas.read_csv(
496+
entity_df_path, sep=None, engine="python", dtype=dtype
497+
)
498+
else:
499+
entity_df = pandas.read_csv(entity_df_path, sep=None, engine="python")
485500

486501
entity_df["event_timestamp"] = pandas.to_datetime(entity_df["event_timestamp"])
487502

sdk/python/feast/client.py

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
ListProjectsResponse,
6767
)
6868
from feast.core.CoreService_pb2_grpc import CoreServiceStub
69+
from feast.core.JobService_pb2 import GetHistoricalFeaturesRequest
6970
from feast.core.JobService_pb2_grpc import JobServiceStub
7071
from feast.data_format import ParquetFormat
7172
from feast.data_source import BigQuerySource, FileSource
@@ -86,12 +87,15 @@
8687
from feast.online_response import OnlineResponse, _infer_online_entity_rows
8788
from feast.pyspark.abc import RetrievalJob, SparkJob
8889
from feast.pyspark.launcher import (
90+
get_job_by_id,
91+
list_jobs,
8992
stage_dataframe,
9093
start_historical_feature_retrieval_job,
9194
start_historical_feature_retrieval_spark_session,
9295
start_offline_to_online_ingestion,
9396
start_stream_to_online_ingestion,
9497
)
98+
from feast.remote_job import RemoteRetrievalJob
9599
from feast.serving.ServingService_pb2 import (
96100
GetFeastServingInfoRequest,
97101
GetOnlineFeaturesRequestV2,
@@ -183,6 +187,10 @@ def _serving_service(self):
183187
self._serving_service_stub = ServingServiceStub(channel)
184188
return self._serving_service_stub
185189

190+
@property
191+
def _use_job_service(self) -> bool:
192+
return self._config.exists(CONFIG_JOB_SERVICE_URL_KEY)
193+
186194
@property
187195
def _job_service(self):
188196
"""
@@ -204,6 +212,12 @@ def _job_service(self):
204212
self._job_service_service_stub = JobServiceStub(channel)
205213
return self._job_service_service_stub
206214

215+
def _extra_grpc_params(self) -> Dict[str, Any]:
216+
return dict(
217+
timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY),
218+
metadata=self._get_grpc_metadata(),
219+
)
220+
207221
@property
208222
def core_url(self) -> str:
209223
"""
@@ -854,6 +868,7 @@ def get_historical_features(
854868
feature_refs: List[str],
855869
entity_source: Union[pd.DataFrame, FileSource, BigQuerySource],
856870
project: str = None,
871+
output_location: str = None,
857872
) -> RetrievalJob:
858873
"""
859874
Launch a historical feature retrieval job.
@@ -894,10 +909,12 @@ def get_historical_features(
894909
feature_tables = self._get_feature_tables_from_feature_refs(
895910
feature_refs, project
896911
)
897-
output_location = os.path.join(
898-
self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION),
899-
str(uuid.uuid4()),
900-
)
912+
913+
if output_location is None:
914+
output_location = os.path.join(
915+
self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION),
916+
str(uuid.uuid4()),
917+
)
901918
output_format = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT)
902919

903920
if isinstance(entity_source, pd.DataFrame):
@@ -920,13 +937,30 @@ def get_historical_features(
920937
"event_timestamp", ParquetFormat(), entity_staging_uri.geturl(),
921938
)
922939

923-
return start_historical_feature_retrieval_job(
924-
self,
925-
entity_source,
926-
feature_tables,
927-
output_format,
928-
os.path.join(output_location, str(uuid.uuid4())),
929-
)
940+
if self._use_job_service:
941+
response = self._job_service.GetHistoricalFeatures(
942+
GetHistoricalFeaturesRequest(
943+
feature_refs=feature_refs,
944+
entity_source=entity_source.to_proto(),
945+
project=project,
946+
output_location=output_location,
947+
),
948+
**self._extra_grpc_params(),
949+
)
950+
return RemoteRetrievalJob(
951+
self._job_service,
952+
self._extra_grpc_params,
953+
response.id,
954+
output_file_uri=response.output_file_uri,
955+
)
956+
else:
957+
return start_historical_feature_retrieval_job(
958+
self,
959+
entity_source,
960+
feature_tables,
961+
output_format,
962+
os.path.join(output_location, str(uuid.uuid4())),
963+
)
930964

931965
def get_historical_features_df(
932966
self,
@@ -1009,6 +1043,12 @@ def start_stream_to_online_ingestion(
10091043
) -> SparkJob:
10101044
return start_stream_to_online_ingestion(feature_table, extra_jars or [], self)
10111045

1046+
def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
1047+
return list_jobs(include_terminated, self)
1048+
1049+
def get_job_by_id(self, job_id: str) -> SparkJob:
1050+
return get_job_by_id(job_id, self)
1051+
10121052
def stage_dataframe(
10131053
self, df: pd.DataFrame, event_timestamp_column: str,
10141054
) -> FileSource:

sdk/python/feast/constants.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class AuthProvider(Enum):
116116
# Path to certificate(s) to secure connection to Feast Serving
117117
CONFIG_SERVING_SERVER_SSL_CERT_KEY: "",
118118
# Default connection timeout to Feast Serving and Feast Core (in seconds)
119-
CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "3",
119+
CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "10",
120120
# Default gRPC connection timeout when sending an ApplyFeatureSet command to
121121
# Feast Core (in seconds)
122122
CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY: "600",
@@ -133,4 +133,8 @@ class AuthProvider(Enum):
133133
CONFIG_REDIS_SSL: "False",
134134
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT: "parquet",
135135
CONFIG_SPARK_EXTRA_OPTIONS: "",
136+
# Enable or disable TLS/SSL to Feast Service
137+
CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False",
138+
# Path to certificate(s) to secure connection to Feast Job Service
139+
CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "",
136140
}

0 commit comments

Comments
 (0)