Skip to content

Commit 1cef42c

Browse files
committed
Add job.py
1 parent 2d1ed7e commit 1cef42c

2 files changed

Lines changed: 127 additions & 2 deletions

File tree

sdk/python/feast/job.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import tempfile
2+
import time
3+
from datetime import datetime, timedelta
4+
5+
import pandas as pd
6+
from fastavro import reader as fastavro_reader
7+
from google.cloud import storage
8+
9+
from feast.serving.ServingService_pb2 import (
10+
Job as JobProto,
11+
JOB_STATUS_DONE,
12+
DATA_FORMAT_AVRO,
13+
)
14+
from feast.serving.ServingService_pb2 import ReloadJobRequest
15+
from feast.serving.ServingService_pb2_grpc import ServingServiceStub
16+
17+
# TODO: Need to profile and check the performance and memory consumption of
18+
# the current approach to read files into pandas DataFrame or iterate the
19+
# data row by row.
20+
21+
# Maximum no of seconds to wait until the jobs status is DONE in Feast
22+
DEFAULT_TIMEOUT_SEC: int = 86400
23+
24+
# Maximum no of seconds to wait before reloading the job status in Feast
25+
MAX_WAIT_INTERVAL_SEC: int = 60
26+
27+
28+
class Job:
29+
"""
30+
A class representing a job for feature retrieval in Feast.
31+
"""
32+
33+
# noinspection PyShadowingNames
34+
def __init__(
35+
self,
36+
job_proto: JobProto,
37+
serving_stub: ServingServiceStub,
38+
storage_client: storage.Client,
39+
):
40+
"""
41+
Args:
42+
job_proto: Job proto object (wrapped by this job object)
43+
serving_stub: Stub for Feast serving service
44+
storage_client: Google Cloud Storage client
45+
"""
46+
self.job_proto = job_proto
47+
self.serving_stub = serving_stub
48+
self.storage_client = storage_client
49+
50+
@property
51+
def id(self):
52+
return self.job_proto.id
53+
54+
@property
55+
def status(self):
56+
return self.job_proto.status
57+
58+
def reload(self):
59+
"""
60+
Reload the latest job status
61+
Returns: None
62+
"""
63+
self.job_proto = self.serving_stub.GetJob(
64+
ReloadJobRequest(job=self.job_proto)
65+
).job
66+
67+
def result(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC):
68+
"""
69+
Wait until job is done to get an iterable rows of result
70+
The row represents can only represent an Avro row in Feast 0.3.
71+
72+
Args:
73+
timeout_sec: max no of seconds to wait until job is done. If "timeout_sec" is exceeded, an exception will be raised.
74+
75+
Returns: Iterable of Avro rows
76+
77+
"""
78+
max_wait_datetime = datetime.now() + timedelta(seconds=timeout_sec)
79+
wait_duration_sec = 2
80+
81+
while self.status != JOB_STATUS_DONE:
82+
if datetime.now() > max_wait_datetime:
83+
raise Exception(
84+
"Timeout exceeded while waiting for result. Please retry this method or use a longer timeout value."
85+
)
86+
87+
self.reload()
88+
time.sleep(wait_duration_sec)
89+
# Backoff the wait duration exponentially up till MAX_WAIT_INTERVAL_SEC
90+
wait_duration_sec = min(wait_duration_sec * 2, MAX_WAIT_INTERVAL_SEC)
91+
92+
if self.job_proto.error:
93+
raise Exception(self.job_proto.error)
94+
95+
if self.job_proto.data_format != DATA_FORMAT_AVRO:
96+
raise Exception(
97+
"Feast only supports Avro data format for now. Please check "
98+
"your Feast Serving deployment."
99+
)
100+
101+
for file_uri in self.job_proto.file_uris:
102+
if not file_uri.startswith("gs://"):
103+
raise Exception(
104+
"Feast only supports reading from Google Cloud "
105+
"Storage for now. Please check your Feast Serving deployment."
106+
)
107+
with tempfile.TemporaryFile() as file_obj:
108+
self.storage_client.download_blob_to_file(file_uri, file_obj)
109+
file_obj.seek(0)
110+
avro_reader = fastavro_reader(file_obj)
111+
for record in avro_reader:
112+
yield record
113+
114+
def to_dataframe(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC):
115+
"""
116+
Wait until job is done to get an interable rows of result
117+
Args:
118+
timeout_sec: max no of seconds to wait until job is done. If "timeout_sec" is exceeded, an exception will be raised.
119+
Returns: pandas Dataframe of the feature values
120+
"""
121+
records = [r for r in self.result()]
122+
return pd.DataFrame.from_records(records)
123+
124+
def __iter__(self):
125+
return iter(self.result())

sdk/python/setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
"google-api-core==1.*",
3030
"google-auth==1.*",
3131
"google-cloud-bigquery==1.*",
32-
"google-cloud-storage==1.*",
32+
"google-cloud-storage==1.20.*",
3333
"googleapis-common-protos==1.*",
3434
"google-cloud-bigquery-storage==0.*",
3535
"grpcio==1.*",
@@ -42,7 +42,7 @@
4242
"tqdm==4.*",
4343
"numpy",
4444
"google",
45-
"kafka-python",
45+
"kafka-python==1.4.*",
4646
]
4747

4848
setup(

0 commit comments

Comments
 (0)