Skip to content

Commit 69d3f47

Browse files
authored
Streaming Ingestion Job supports AVRO format as input (feast-dev#1072)
* dataformat in spark job Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * clean Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * avro format supported by streaming job Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * better check if started Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * lint Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * ci deps Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * cleam some python deps Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * use develop version Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
1 parent 2060053 commit 69d3f47

File tree

22 files changed

+419
-71
lines changed

22 files changed

+419
-71
lines changed

.github/workflows/master_only.yml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,15 @@ jobs:
7979
- uses: stCarolas/setup-maven@v3
8080
with:
8181
maven-version: 3.6.3
82-
- name: build-jar
82+
- name: Publish develop version of ingestion job
83+
run: |
84+
if [ ${GITHUB_REF#refs/*/} == "master" ]; then
85+
make build-java-no-tests REVISION=develop
86+
gsutil cp ./spark/ingestion/target/feast-ingestion-spark-develop.jar gs://${PUBLISH_BUCKET}/spark/ingestion/
87+
fi
88+
- name: Get version
89+
run: echo ::set-env name=RELEASE_VERSION::${GITHUB_REF#refs/*/}
90+
- name: Publish tagged version of ingestion job
8391
run: |
8492
SEMVER_REGEX='^v[0-9]+\.[0-9]+\.[0-9]+(-([0-9A-Za-z-]+(\.[0-9A-Za-z-]+)*))?$'
8593
if echo "${RELEASE_VERSION}" | grep -P "$SEMVER_REGEX" &>/dev/null ; then

core/src/main/java/feast/core/validators/DataSourceValidator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public static void validate(DataSource spec) {
5151
spec.getKafkaOptions().getMessageFormat().getProtoFormat().getClassPath(),
5252
"FeatureTable");
5353
break;
54+
case AVRO_FORMAT:
55+
break;
5456
default:
5557
throw new UnsupportedOperationException(
5658
String.format(
@@ -68,6 +70,8 @@ public static void validate(DataSource spec) {
6870
spec.getKinesisOptions().getRecordFormat().getProtoFormat().getClassPath(),
6971
"FeatureTable");
7072
break;
73+
case AVRO_FORMAT:
74+
break;
7175
default:
7276
throw new UnsupportedOperationException(
7377
String.format("Unsupported Stream Format for Kafka Source Type: %s", recordFormat));

infra/charts/feast/values.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ feast-core:
44

55
feast-jobcontroller:
66
# feast-jobcontroller.enabled -- Flag to install Feast Job Controller
7-
enabled: true
7+
enabled: false
88

99
feast-online-serving:
1010
# feast-online-serving.enabled -- Flag to install Feast Online Serving

sdk/python/feast/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ class AuthProvider(Enum):
123123
# Authentication Provider - Google OpenID/OAuth
124124
CONFIG_AUTH_PROVIDER: "google",
125125
CONFIG_SPARK_LAUNCHER: "dataproc",
126-
CONFIG_SPARK_INGESTION_JOB_JAR: "gs://feast-jobs/feast-ingestion-spark-0.8-SNAPSHOT.jar",
126+
CONFIG_SPARK_INGESTION_JOB_JAR: "gs://feast-jobs/feast-ingestion-spark-develop.jar",
127127
CONFIG_REDIS_HOST: "localhost",
128128
CONFIG_REDIS_PORT: "6379",
129129
CONFIG_REDIS_SSL: "False",

sdk/python/feast/pyspark/abc.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class SparkJobFailure(Exception):
1919

2020

2121
class SparkJobStatus(Enum):
22+
STARTING = 0
2223
IN_PROGRESS = 1
2324
FAILED = 2
2425
COMPLETED = 3
@@ -48,6 +49,13 @@ def get_status(self) -> SparkJobStatus:
4849
"""
4950
raise NotImplementedError
5051

52+
@abc.abstractmethod
53+
def cancel(self):
54+
"""
55+
Manually terminate job
56+
"""
57+
raise NotImplementedError
58+
5159

5260
class SparkJobParameters(abc.ABC):
5361
@abc.abstractmethod

sdk/python/feast/pyspark/historical_feature_retrieval_job.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ class FileSource(Source):
7575
options (Optional[Dict[str, str]]): Options to be passed to spark while reading the file source.
7676
"""
7777

78+
PROTO_FORMAT_TO_SPARK = {
79+
"ParquetFormat": "parquet",
80+
"AvroFormat": "avro",
81+
"CSVFormat": "csv",
82+
}
83+
7884
def __init__(
7985
self,
8086
format: str,
@@ -147,7 +153,7 @@ def spark_path(self) -> str:
147153
def _source_from_dict(dct: Dict) -> Source:
148154
if "file" in dct.keys():
149155
return FileSource(
150-
dct["file"]["format"],
156+
FileSource.PROTO_FORMAT_TO_SPARK[dct["file"]["format"]["json_class"]],
151157
dct["file"]["path"],
152158
dct["file"]["event_timestamp_column"],
153159
dct["file"].get("created_timestamp_column"),
@@ -635,20 +641,20 @@ def retrieve_historical_features(
635641
636642
Example:
637643
>>> entity_source_conf = {
638-
"format": "csv",
644+
"format": {"jsonClass": "ParquetFormat"},
639645
"path": "file:///some_dir/customer_driver_pairs.csv"),
640646
"options": {"inferSchema": "true", "header": "true"},
641647
"field_mapping": {"id": "driver_id"}
642648
}
643649
644650
>>> feature_tables_sources_conf = [
645651
{
646-
"format": "parquet",
652+
"format": {"json_class": "ParquetFormat"},
647653
"path": "gs://some_bucket/bookings.parquet"),
648654
"field_mapping": {"id": "driver_id"}
649655
},
650656
{
651-
"format": "avro",
657+
"format": {"json_class": "AvroFormat", schema_json: "..avro schema.."},
652658
"path": "s3://some_bucket/transactions.avro"),
653659
}
654660
]

sdk/python/feast/pyspark/launcher.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,6 @@ def resolve_launcher(config: Config) -> JobLauncher:
8686
return _launchers[config.get(CONFIG_SPARK_LAUNCHER)](config)
8787

8888

89-
_SOURCES = {FileSource: "file", BigQuerySource: "bq", KafkaSource: "kafka"}
90-
91-
9289
def _source_to_argument(source: DataSource):
9390
common_properties = {
9491
"field_mapping": dict(source.field_mapping),
@@ -97,20 +94,28 @@ def _source_to_argument(source: DataSource):
9794
"date_partition_column": source.date_partition_column,
9895
}
9996

100-
kind = _SOURCES[type(source)]
10197
properties = {**common_properties}
98+
10299
if isinstance(source, FileSource):
103100
properties["path"] = source.file_options.file_url
104-
properties["format"] = str(source.file_options.file_format)
105-
return {kind: properties}
101+
properties["format"] = dict(
102+
json_class=source.file_options.file_format.__class__.__name__
103+
)
104+
return {"file": properties}
105+
106106
if isinstance(source, BigQuerySource):
107107
properties["table_ref"] = source.bigquery_options.table_ref
108-
return {kind: properties}
108+
return {"bq": properties}
109+
109110
if isinstance(source, KafkaSource):
110-
properties["topic"] = source.kafka_options.topic
111-
properties["classpath"] = source.kafka_options.class_path
112111
properties["bootstrap_servers"] = source.kafka_options.bootstrap_servers
113-
return {kind: properties}
112+
properties["topic"] = source.kafka_options.topic
113+
properties["format"] = {
114+
**source.kafka_options.message_format.__dict__,
115+
"json_class": source.kafka_options.message_format.__class__.__name__,
116+
}
117+
return {"kafka": properties}
118+
114119
raise NotImplementedError(f"Unsupported Datasource: {type(source)}")
115120

116121

sdk/python/feast/pyspark/launchers/aws/emr.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ def get_status(self) -> SparkJobStatus:
6262
# we should never get here
6363
raise Exception("Invalid EMR state")
6464

65+
def cancel(self):
66+
raise NotImplementedError
67+
6568

6669
class EmrRetrievalJob(EmrJobMixin, RetrievalJob):
6770
"""

sdk/python/feast/pyspark/launchers/gcloud/dataproc.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ def get_status(self) -> SparkJobStatus:
4545

4646
return SparkJobStatus.FAILED
4747

48+
def cancel(self):
49+
self._operation.cancel()
50+
4851

4952
class DataprocRetrievalJob(DataprocJobMixin, RetrievalJob):
5053
"""
@@ -71,7 +74,13 @@ def get_output_file_uri(self, timeout_sec=None):
7174

7275
class DataprocBatchIngestionJob(DataprocJobMixin, BatchIngestionJob):
7376
"""
74-
Ingestion job result for a Dataproc cluster
77+
Batch Ingestion job result for a Dataproc cluster
78+
"""
79+
80+
81+
class DataprocStreamingIngestionJob(DataprocJobMixin, StreamIngestionJob):
82+
"""
83+
Streaming Ingestion job result for a Dataproc cluster
7584
"""
7685

7786

@@ -151,14 +160,14 @@ def historical_feature_retrieval(
151160
)
152161

153162
def offline_to_online_ingestion(
154-
self, job_params: BatchIngestionJobParameters
163+
self, ingestion_job_params: BatchIngestionJobParameters
155164
) -> BatchIngestionJob:
156-
return DataprocBatchIngestionJob(self.dataproc_submit(job_params))
165+
return DataprocBatchIngestionJob(self.dataproc_submit(ingestion_job_params))
157166

158167
def start_stream_to_online_ingestion(
159168
self, ingestion_job_params: StreamIngestionJobParameters
160169
) -> StreamIngestionJob:
161-
raise NotImplementedError
170+
return DataprocStreamingIngestionJob(self.dataproc_submit(ingestion_job_params))
162171

163172
def stage_dataframe(
164173
self, df, event_timestamp_column: str, created_timestamp_column: str,

sdk/python/feast/pyspark/launchers/standalone/local.py

Lines changed: 90 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import os
2+
import socket
23
import subprocess
34
import uuid
5+
from contextlib import closing
6+
7+
import requests
8+
from requests.exceptions import RequestException
49

510
from feast.pyspark.abc import (
611
BatchIngestionJob,
@@ -16,28 +21,77 @@
1621
)
1722

1823

24+
def _find_free_port():
25+
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
26+
s.bind(("", 0))
27+
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
28+
return s.getsockname()[1]
29+
30+
1931
class StandaloneClusterJobMixin:
20-
def __init__(self, job_id: str, process: subprocess.Popen):
32+
def __init__(
33+
self, job_id: str, job_name: str, process: subprocess.Popen, ui_port: int = None
34+
):
2135
self._job_id = job_id
36+
self._job_name = job_name
2237
self._process = process
38+
self._ui_port = ui_port
2339

2440
def get_id(self) -> str:
2541
return self._job_id
2642

43+
def check_if_started(self):
44+
if not self._ui_port:
45+
return True
46+
47+
try:
48+
applications = requests.get(
49+
f"http://localhost:{self._ui_port}/api/v1/applications"
50+
).json()
51+
except RequestException:
52+
return False
53+
54+
app = next(
55+
iter(app for app in applications if app["name"] == self._job_name), None
56+
)
57+
if not app:
58+
return False
59+
60+
stages = requests.get(
61+
f"http://localhost:{self._ui_port}/api/v1/applications/{app['id']}/stages"
62+
).json()
63+
return bool(stages)
64+
2765
def get_status(self) -> SparkJobStatus:
2866
code = self._process.poll()
2967
if code is None:
68+
if not self.check_if_started():
69+
return SparkJobStatus.STARTING
70+
3071
return SparkJobStatus.IN_PROGRESS
3172

3273
if code != 0:
3374
return SparkJobStatus.FAILED
3475

3576
return SparkJobStatus.COMPLETED
3677

78+
def cancel(self):
79+
self._process.terminate()
80+
3781

3882
class StandaloneClusterBatchIngestionJob(StandaloneClusterJobMixin, BatchIngestionJob):
3983
"""
40-
Ingestion job result for a standalone spark cluster
84+
Batch Ingestion job result for a standalone spark cluster
85+
"""
86+
87+
pass
88+
89+
90+
class StandaloneClusterStreamingIngestionJob(
91+
StandaloneClusterJobMixin, StreamIngestionJob
92+
):
93+
"""
94+
Streaming Ingestion job result for a standalone spark cluster
4195
"""
4296

4397
pass
@@ -48,7 +102,13 @@ class StandaloneClusterRetrievalJob(StandaloneClusterJobMixin, RetrievalJob):
48102
Historical feature retrieval job result for a standalone spark cluster
49103
"""
50104

51-
def __init__(self, job_id: str, process: subprocess.Popen, output_file_uri: str):
105+
def __init__(
106+
self,
107+
job_id: str,
108+
job_name: str,
109+
process: subprocess.Popen,
110+
output_file_uri: str,
111+
):
52112
"""
53113
This is the returned historical feature retrieval job result for StandaloneClusterLauncher.
54114
@@ -57,7 +117,7 @@ def __init__(self, job_id: str, process: subprocess.Popen, output_file_uri: str)
57117
process (subprocess.Popen): Pyspark driver process, spawned by the launcher.
58118
output_file_uri (str): Uri to the historical feature retrieval job output file.
59119
"""
60-
super().__init__(job_id, process)
120+
super().__init__(job_id, job_name, process)
61121
self._output_file_uri = output_file_uri
62122

63123
def get_output_file_uri(self, timeout_sec: int = None):
@@ -100,7 +160,9 @@ def __init__(self, master_url: str, spark_home: str = None):
100160
def spark_submit_script_path(self):
101161
return os.path.join(self.spark_home, "bin/spark-submit")
102162

103-
def spark_submit(self, job_params: SparkJobParameters) -> subprocess.Popen:
163+
def spark_submit(
164+
self, job_params: SparkJobParameters, ui_port: int = None
165+
) -> subprocess.Popen:
104166
submission_cmd = [
105167
self.spark_submit_script_path,
106168
"--master",
@@ -112,6 +174,9 @@ def spark_submit(self, job_params: SparkJobParameters) -> subprocess.Popen:
112174
if job_params.get_class_name():
113175
submission_cmd.extend(["--class", job_params.get_class_name()])
114176

177+
if ui_port:
178+
submission_cmd.extend(["--conf", f"spark.ui.port={ui_port}"])
179+
115180
submission_cmd.append(job_params.get_main_file_path())
116181
submission_cmd.extend(job_params.get_arguments())
117182

@@ -122,19 +187,35 @@ def historical_feature_retrieval(
122187
) -> RetrievalJob:
123188
job_id = str(uuid.uuid4())
124189
return StandaloneClusterRetrievalJob(
125-
job_id, self.spark_submit(job_params), job_params.get_destination_path()
190+
job_id,
191+
job_params.get_name(),
192+
self.spark_submit(job_params),
193+
job_params.get_destination_path(),
126194
)
127195

128196
def offline_to_online_ingestion(
129-
self, job_params: BatchIngestionJobParameters
197+
self, ingestion_job_params: BatchIngestionJobParameters
130198
) -> BatchIngestionJob:
131199
job_id = str(uuid.uuid4())
132-
return StandaloneClusterBatchIngestionJob(job_id, self.spark_submit(job_params))
200+
ui_port = _find_free_port()
201+
return StandaloneClusterBatchIngestionJob(
202+
job_id,
203+
ingestion_job_params.get_name(),
204+
self.spark_submit(ingestion_job_params, ui_port),
205+
ui_port,
206+
)
133207

134208
def start_stream_to_online_ingestion(
135209
self, ingestion_job_params: StreamIngestionJobParameters
136210
) -> StreamIngestionJob:
137-
raise NotImplementedError
211+
job_id = str(uuid.uuid4())
212+
ui_port = _find_free_port()
213+
return StandaloneClusterStreamingIngestionJob(
214+
job_id,
215+
ingestion_job_params.get_name(),
216+
self.spark_submit(ingestion_job_params, ui_port),
217+
ui_port,
218+
)
138219

139220
def stage_dataframe(
140221
self, df, event_timestamp_column: str, created_timestamp_column: str,

0 commit comments

Comments
 (0)