11import os
2+ import socket
23import subprocess
34import uuid
5+ from contextlib import closing
6+
7+ import requests
8+ from requests .exceptions import RequestException
49
510from feast .pyspark .abc import (
611 BatchIngestionJob ,
1621)
1722
1823
24+ def _find_free_port ():
25+ with closing (socket .socket (socket .AF_INET , socket .SOCK_STREAM )) as s :
26+ s .bind (("" , 0 ))
27+ s .setsockopt (socket .SOL_SOCKET , socket .SO_REUSEADDR , 1 )
28+ return s .getsockname ()[1 ]
29+
30+
1931class StandaloneClusterJobMixin :
20- def __init__ (self , job_id : str , process : subprocess .Popen ):
32+ def __init__ (
33+ self , job_id : str , job_name : str , process : subprocess .Popen , ui_port : int = None
34+ ):
2135 self ._job_id = job_id
36+ self ._job_name = job_name
2237 self ._process = process
38+ self ._ui_port = ui_port
2339
2440 def get_id (self ) -> str :
2541 return self ._job_id
2642
43+ def check_if_started (self ):
44+ if not self ._ui_port :
45+ return True
46+
47+ try :
48+ applications = requests .get (
49+ f"http://localhost:{ self ._ui_port } /api/v1/applications"
50+ ).json ()
51+ except RequestException :
52+ return False
53+
54+ app = next (
55+ iter (app for app in applications if app ["name" ] == self ._job_name ), None
56+ )
57+ if not app :
58+ return False
59+
60+ stages = requests .get (
61+ f"http://localhost:{ self ._ui_port } /api/v1/applications/{ app ['id' ]} /stages"
62+ ).json ()
63+ return bool (stages )
64+
2765 def get_status (self ) -> SparkJobStatus :
2866 code = self ._process .poll ()
2967 if code is None :
68+ if not self .check_if_started ():
69+ return SparkJobStatus .STARTING
70+
3071 return SparkJobStatus .IN_PROGRESS
3172
3273 if code != 0 :
3374 return SparkJobStatus .FAILED
3475
3576 return SparkJobStatus .COMPLETED
3677
78+ def cancel (self ):
79+ self ._process .terminate ()
80+
3781
3882class StandaloneClusterBatchIngestionJob (StandaloneClusterJobMixin , BatchIngestionJob ):
3983 """
40- Ingestion job result for a standalone spark cluster
84+ Batch Ingestion job result for a standalone spark cluster
85+ """
86+
87+ pass
88+
89+
90+ class StandaloneClusterStreamingIngestionJob (
91+ StandaloneClusterJobMixin , StreamIngestionJob
92+ ):
93+ """
94+ Streaming Ingestion job result for a standalone spark cluster
4195 """
4296
4397 pass
@@ -48,7 +102,13 @@ class StandaloneClusterRetrievalJob(StandaloneClusterJobMixin, RetrievalJob):
48102 Historical feature retrieval job result for a standalone spark cluster
49103 """
50104
51- def __init__ (self , job_id : str , process : subprocess .Popen , output_file_uri : str ):
105+ def __init__ (
106+ self ,
107+ job_id : str ,
108+ job_name : str ,
109+ process : subprocess .Popen ,
110+ output_file_uri : str ,
111+ ):
52112 """
53113 This is the returned historical feature retrieval job result for StandaloneClusterLauncher.
54114
@@ -57,7 +117,7 @@ def __init__(self, job_id: str, process: subprocess.Popen, output_file_uri: str)
57117 process (subprocess.Popen): Pyspark driver process, spawned by the launcher.
58118 output_file_uri (str): Uri to the historical feature retrieval job output file.
59119 """
60- super ().__init__ (job_id , process )
120+ super ().__init__ (job_id , job_name , process )
61121 self ._output_file_uri = output_file_uri
62122
63123 def get_output_file_uri (self , timeout_sec : int = None ):
@@ -100,7 +160,9 @@ def __init__(self, master_url: str, spark_home: str = None):
100160 def spark_submit_script_path (self ):
101161 return os .path .join (self .spark_home , "bin/spark-submit" )
102162
103- def spark_submit (self , job_params : SparkJobParameters ) -> subprocess .Popen :
163+ def spark_submit (
164+ self , job_params : SparkJobParameters , ui_port : int = None
165+ ) -> subprocess .Popen :
104166 submission_cmd = [
105167 self .spark_submit_script_path ,
106168 "--master" ,
@@ -112,6 +174,9 @@ def spark_submit(self, job_params: SparkJobParameters) -> subprocess.Popen:
112174 if job_params .get_class_name ():
113175 submission_cmd .extend (["--class" , job_params .get_class_name ()])
114176
177+ if ui_port :
178+ submission_cmd .extend (["--conf" , f"spark.ui.port={ ui_port } " ])
179+
115180 submission_cmd .append (job_params .get_main_file_path ())
116181 submission_cmd .extend (job_params .get_arguments ())
117182
@@ -122,19 +187,35 @@ def historical_feature_retrieval(
122187 ) -> RetrievalJob :
123188 job_id = str (uuid .uuid4 ())
124189 return StandaloneClusterRetrievalJob (
125- job_id , self .spark_submit (job_params ), job_params .get_destination_path ()
190+ job_id ,
191+ job_params .get_name (),
192+ self .spark_submit (job_params ),
193+ job_params .get_destination_path (),
126194 )
127195
128196 def offline_to_online_ingestion (
129- self , job_params : BatchIngestionJobParameters
197+ self , ingestion_job_params : BatchIngestionJobParameters
130198 ) -> BatchIngestionJob :
131199 job_id = str (uuid .uuid4 ())
132- return StandaloneClusterBatchIngestionJob (job_id , self .spark_submit (job_params ))
200+ ui_port = _find_free_port ()
201+ return StandaloneClusterBatchIngestionJob (
202+ job_id ,
203+ ingestion_job_params .get_name (),
204+ self .spark_submit (ingestion_job_params , ui_port ),
205+ ui_port ,
206+ )
133207
134208 def start_stream_to_online_ingestion (
135209 self , ingestion_job_params : StreamIngestionJobParameters
136210 ) -> StreamIngestionJob :
137- raise NotImplementedError
211+ job_id = str (uuid .uuid4 ())
212+ ui_port = _find_free_port ()
213+ return StandaloneClusterStreamingIngestionJob (
214+ job_id ,
215+ ingestion_job_params .get_name (),
216+ self .spark_submit (ingestion_job_params , ui_port ),
217+ ui_port ,
218+ )
138219
139220 def stage_dataframe (
140221 self , df , event_timestamp_column : str , created_timestamp_column : str ,
0 commit comments