Skip to content

Commit 6499fb2

Browse files
lock manager for job service (#135)
* Update version to v0.2.24 (#132) Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> Co-authored-by: Khor Shu Heng <khor.heng@gojek.com> Signed-off-by: KeshavSharma <keshav.sharma@gojek.com> * distributed lock for starting/stopping streaming ingestion jobs Signed-off-by: KeshavSharma <keshav.sharma@gojek.com> * unit tests for lock manager Signed-off-by: KeshavSharma <keshav.sharma@gojek.com> * Perform data type conversion automatically (#133) * Perform data type conversion automatically Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> * Use wheel installation for local setup to avoid module not found issue Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> Co-authored-by: Khor Shu Heng <khor.heng@gojek.com> Signed-off-by: KeshavSharma <keshav.sharma@gojek.com> * format python files Signed-off-by: KeshavSharma <keshav.sharma@gojek.com> * install feast-spark from setup.py Signed-off-by: KeshavSharma <keshav.sharma@gojek.com> Co-authored-by: Khor Shu Heng <32997938+khorshuheng@users.noreply.github.com> Co-authored-by: Khor Shu Heng <khor.heng@gojek.com> Co-authored-by: KeshavSharma <keshav.sharma@gojek.com>
1 parent 7d3aa9d commit 6499fb2

File tree

6 files changed

+247
-18
lines changed

6 files changed

+247
-18
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ install-python-ci-dependencies:
3434

3535
# Supports feast-dev repo master branch
3636
install-python: install-python-ci-dependencies
37-
pip install --user --upgrade setuptools wheel grpcio-tools mypy-protobuf
38-
cd ${ROOT_DIR}/python; rm -rf dist; python setup.py bdist_wheel; pip install --find-links=dist feast-spark
37+
pip install --user --upgrade setuptools wheel
38+
cd ${ROOT_DIR}/python; rm -rf dist; python setup.py install
3939

4040
lint-python:
4141
cd ${ROOT_DIR}/python ; mypy feast_spark/ tests/

python/feast_spark/constants.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,15 @@ class ConfigOptions(metaclass=ConfigMeta):
111111
#: Default Redis port to Redis Instance which stores Spark Ingestion Job metrics
112112
SPARK_METRICS_REDIS_PORT: Optional[str] = None
113113

114+
#: Host to Redis Instance which stores locks for job management
115+
LOCK_MGR_REDIS_HOST: Optional[str] = None
116+
117+
#: Port to Redis Instance which stores locks for job management
118+
LOCK_MGR_REDIS_PORT: Optional[str] = None
119+
120+
#: TTL for locks for job management
121+
LOCK_EXPIRY: Optional[str] = "60"
122+
114123
#: File format of historical retrieval features
115124
HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet"
116125

python/feast_spark/job_service.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
UnscheduleOfflineToOnlineIngestionJobResponse,
4141
)
4242
from feast_spark.constants import ConfigOptions as opt
43+
from feast_spark.lock_manager import JobOperation, JobOperationLock
4344
from feast_spark.metrics import (
4445
job_schedule_count,
4546
job_submission_count,
@@ -548,14 +549,28 @@ def ensure_stream_ingestion_jobs(client: Client, all_projects: bool):
548549
f"expected_job_hashes = {sorted(list(expected_job_hashes))}"
549550
)
550551

552+
lock_config = {
553+
"redis_host": client.config.get(opt.LOCK_MGR_REDIS_HOST),
554+
"redis_port": client.config.getint(opt.LOCK_MGR_REDIS_PORT),
555+
"lock_expiry": client.config.getint(opt.LOCK_EXPIRY),
556+
}
557+
551558
for job_hash in job_hashes_to_start:
552559
# Any job that we wish to start should be among expected table refs map
553560
project, feature_table = expected_job_hash_to_tables[job_hash]
554-
logger.warning(
555-
f"Starting a stream ingestion job for project={project}, "
556-
f"table_name={feature_table.name} with job_hash={job_hash}"
557-
)
558-
client.start_stream_to_online_ingestion(feature_table, [], project=project)
561+
562+
# start the job if lock is available
563+
with JobOperationLock(
564+
job_hash=job_hash, operation=JobOperation.START, **lock_config
565+
) as lock:
566+
if lock:
567+
logger.warning(
568+
f"Starting a stream ingestion job for project={project}, "
569+
f"table_name={feature_table.name} with job_hash={job_hash}"
570+
)
571+
client.start_stream_to_online_ingestion(
572+
feature_table, [], project=project
573+
)
559574

560575
# prevent scheduler from peak load
561576
time.sleep(client.config.getint(opt.JOB_SERVICE_PAUSE_BETWEEN_JOBS))
@@ -572,6 +587,10 @@ def ensure_stream_ingestion_jobs(client: Client, all_projects: bool):
572587
f"Cancelling a stream ingestion job with job_hash={job_hash} job_id={job.get_id()} status={job.get_status()}"
573588
)
574589
try:
575-
job.cancel()
590+
with JobOperationLock(
591+
job_hash=job_hash, operation=JobOperation.CANCEL, **lock_config
592+
) as lock:
593+
if lock:
594+
job.cancel()
576595
except FailedPrecondition as exc:
577596
logger.error(f"Job canceling failed with exception {exc}")

python/feast_spark/lock_manager.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
"""
2+
Classes to manage distributed locks
3+
"""
4+
import enum
5+
import logging
6+
import secrets
7+
import time
8+
9+
import redis
10+
from redis.exceptions import ConnectionError
11+
12+
# retries for acquiring lock
13+
LOCK_ACQUIRE_RETRIES = 3
14+
# wait between retries
15+
LOCK_ACQUIRE_WAIT = 1
16+
LOCK_KEY_PREFIX = "lock"
17+
18+
logger = logging.getLogger(__name__)
19+
20+
21+
class JobOperation(enum.Enum):
22+
"""
23+
Enum for job operations
24+
"""
25+
26+
START = "st"
27+
CANCEL = "cn"
28+
29+
30+
class JobOperationLock:
31+
"""
32+
Lock for starting and cancelling spark ingestion jobs.
33+
Implemented as a context manager to automatically release lock after operation.
34+
35+
Usage:
36+
with JobOperationLock(job_hash, <start/cancel>):
37+
client.start_stream_to_online_ingestion(feature_table, [], project=project)
38+
"""
39+
40+
def __init__(
41+
self,
42+
redis_host: str,
43+
redis_port: int,
44+
lock_expiry: int,
45+
job_hash: str,
46+
operation: JobOperation = JobOperation.START,
47+
):
48+
"""
49+
Init method, initialized redis key for the lock
50+
Args:
51+
redis_host: host to redis instance to store locks
52+
redis_port: port to redis instance to store locks
53+
lock_expiry: time in seconds for auto releasing lock
54+
job_hash: job hash string for the job which needs to be operated upon
55+
operation: operation to be performed <START/CANCEL>
56+
"""
57+
self._redis = redis.Redis(host=redis_host, port=redis_port)
58+
self._lock_expiry = lock_expiry
59+
self._lock_key = f"{LOCK_KEY_PREFIX}_{operation.value}_{job_hash}"
60+
self._lock_value = secrets.token_hex(nbytes=8)
61+
62+
def __enter__(self):
63+
"""
64+
Context manager method for setup - acquire lock
65+
66+
lock_key is a combination of a prefix, job hash and operation(start/cancel)
67+
68+
lock_value is a randomly generated 8 byte hexadecimal, this is to ensure
69+
that lock can be deleted only by the agent who created it
70+
71+
NX option is used only set the key if it does not already exist,
72+
this will ensure that locks are not overwritten
73+
74+
EX option is used to set the specified expire time to release the lock automatically after TTL
75+
"""
76+
# Retry acquiring lock on connection failures
77+
retry_attempts = 0
78+
while retry_attempts < LOCK_ACQUIRE_RETRIES:
79+
try:
80+
if self._redis.set(
81+
name=self._lock_key,
82+
value=self._lock_value,
83+
nx=True,
84+
ex=self._lock_expiry,
85+
):
86+
return self._lock_value
87+
else:
88+
logger.info(f"lock not available: {self._lock_key}")
89+
return False
90+
except ConnectionError:
91+
# wait before attempting to retry
92+
logger.warning(
93+
f"connection error while acquiring lock: {self._lock_key}"
94+
)
95+
time.sleep(LOCK_ACQUIRE_WAIT)
96+
retry_attempts += 1
97+
logger.warning(f"Can't acquire lock, backing off: {self._lock_key}")
98+
return False
99+
100+
def __exit__(self, *args, **kwargs):
101+
"""
102+
context manager method for teardown - release lock
103+
104+
safe release - delete lock key only if value exists and is same as set by this object
105+
otherwise rely on auto-release on expiry
106+
"""
107+
try:
108+
lock_value = self._redis.get(self._lock_key)
109+
if lock_value and lock_value.decode() == self._lock_value:
110+
self._redis.delete(self._lock_key)
111+
except ConnectionError:
112+
logger.warning(
113+
f"connection error while deleting lock: {self._lock_key}."
114+
f"rely on auto-release after {self._lock_expiry} seconds"
115+
)

python/tests/test_lock_manager.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from unittest.mock import patch
2+
3+
import pytest
4+
5+
from feast_spark.lock_manager import JobOperation, JobOperationLock
6+
7+
job_hash = "dummy_hash"
8+
9+
10+
class MockRedis:
11+
def __init__(self, cache=dict()):
12+
self.cache = cache
13+
14+
def get(self, name):
15+
if name in self.cache:
16+
return self.cache[name]
17+
return None
18+
19+
def set(self, name, value, *args, **kwargs):
20+
if name not in self.cache:
21+
self.cache[name] = value.encode("utf-8")
22+
return "OK"
23+
24+
def delete(self, name):
25+
if name in self.cache:
26+
self.cache.pop(name)
27+
return None
28+
29+
30+
@pytest.fixture
31+
def lock_config():
32+
return {"redis_host": "localhost", "redis_port": 0, "lock_expiry": 5}
33+
34+
35+
@patch("redis.Redis")
36+
def test_lock_manager_context(mock_redis, lock_config):
37+
mock_redis_connection = MockRedis()
38+
mock_redis.return_value = mock_redis_connection
39+
with JobOperationLock(
40+
job_hash=job_hash, operation=JobOperation.START, **lock_config
41+
) as lock:
42+
# test lock acquired
43+
assert lock
44+
# verify lock key in cache
45+
assert (
46+
f"lock_{JobOperation.START.value}_{job_hash}" in mock_redis_connection.cache
47+
)
48+
# verify release
49+
assert (
50+
f"lock_{JobOperation.START.value}_{job_hash}" not in mock_redis_connection.cache
51+
)
52+
53+
54+
@patch("redis.Redis")
55+
def test_lock_manager_lock_not_available(mock_redis, lock_config):
56+
cache = {"lock_st_dummy_hash": b"127a32aaf729dc87"}
57+
mock_redis_connection = MockRedis(cache)
58+
mock_redis.return_value = mock_redis_connection
59+
with JobOperationLock(
60+
job_hash=job_hash, operation=JobOperation.START, **lock_config
61+
) as lock:
62+
# test lock not acquired
63+
assert not lock
64+
65+
66+
def test_lock_manager_connection_error(lock_config):
67+
with JobOperationLock(
68+
job_hash=job_hash, operation=JobOperation.START, **lock_config
69+
) as lock:
70+
# test lock not acquired
71+
assert not lock

python/tests/test_streaming_job_scheduling.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import hashlib
33
import json
44
from datetime import datetime
5-
from unittest.mock import Mock
5+
from unittest.mock import Mock, patch
66

77
import pytest
88

@@ -21,7 +21,11 @@
2121
def feast_client():
2222
c = FeastClient(
2323
job_service_pause_between_jobs=0,
24-
options={"whitelisted_projects": "default,ride"},
24+
options={
25+
"whitelisted_projects": "default,ride",
26+
"lock_mgr_redis_host": "localhost",
27+
"lock_mgr_redis_port": "0",
28+
},
2529
)
2630
c.list_projects = Mock(return_value=["default", "ride", "invalid_project"])
2731
c.list_feature_tables = Mock()
@@ -88,7 +92,8 @@ def get_start_time(self) -> datetime:
8892
pass
8993

9094

91-
def test_new_job_creation(spark_client, feature_table):
95+
@patch("redis.Redis")
96+
def test_new_job_creation(mock_redis, spark_client, feature_table):
9297
""" No job existed prior to call """
9398

9499
spark_client.feature_store.list_feature_tables.return_value = [feature_table]
@@ -118,7 +123,8 @@ def test_no_changes(spark_client, feature_table):
118123
spark_client.start_stream_to_online_ingestion.assert_not_called()
119124

120125

121-
def test_update_existing_job(spark_client, feature_table):
126+
@patch("redis.Redis")
127+
def test_update_existing_job(mock_redis, spark_client, feature_table):
122128
""" Feature Table spec was updated """
123129

124130
new_ft = copy.deepcopy(feature_table)
@@ -136,7 +142,8 @@ def test_update_existing_job(spark_client, feature_table):
136142
assert spark_client.start_stream_to_online_ingestion.call_count == 2
137143

138144

139-
def test_not_cancelling_starting_job(spark_client, feature_table):
145+
@patch("redis.Redis")
146+
def test_not_cancelling_starting_job(mock_redis, spark_client, feature_table):
140147
""" Feature Table spec was updated but previous version is still starting """
141148

142149
new_ft = copy.deepcopy(feature_table)
@@ -154,7 +161,8 @@ def test_not_cancelling_starting_job(spark_client, feature_table):
154161
assert spark_client.start_stream_to_online_ingestion.call_count == 2
155162

156163

157-
def test_not_retrying_failed_job(spark_client, feature_table):
164+
@patch("redis.Redis")
165+
def test_not_retrying_failed_job(mock_redis, spark_client, feature_table):
158166
""" Job has failed on previous try """
159167

160168
job = SimpleStreamingIngestionJob(
@@ -173,7 +181,8 @@ def test_not_retrying_failed_job(spark_client, feature_table):
173181
)
174182

175183

176-
def test_restarting_completed_job(spark_client, feature_table):
184+
@patch("redis.Redis")
185+
def test_restarting_completed_job(mock_redis, spark_client, feature_table):
177186
""" Job has succesfully finished on previous try """
178187
job = SimpleStreamingIngestionJob(
179188
"", "default", feature_table, SparkJobStatus.COMPLETED
@@ -187,7 +196,8 @@ def test_restarting_completed_job(spark_client, feature_table):
187196
assert spark_client.start_stream_to_online_ingestion.call_count == 2
188197

189198

190-
def test_stopping_running_job(spark_client, feature_table):
199+
@patch("redis.Redis")
200+
def test_stopping_running_job(mock_redis, spark_client, feature_table):
191201
""" Streaming source was deleted """
192202
new_ft = copy.deepcopy(feature_table)
193203
new_ft.stream_source = None
@@ -205,13 +215,18 @@ def test_stopping_running_job(spark_client, feature_table):
205215
spark_client.start_stream_to_online_ingestion.assert_not_called()
206216

207217

208-
def test_restarting_failed_jobs(feature_table):
218+
@patch("redis.Redis")
219+
def test_restarting_failed_jobs(mock_redis, feature_table):
209220
""" If configured - restart failed jobs """
210221

211222
feast_client = FeastClient(
212223
job_service_pause_between_jobs=0,
213224
job_service_retry_failed_jobs=True,
214-
options={"whitelisted_projects": "default,ride"},
225+
options={
226+
"whitelisted_projects": "default,ride",
227+
"lock_mgr_redis_host": "localhost",
228+
"lock_mgr_redis_port": "0",
229+
},
215230
)
216231
feast_client.list_projects = Mock(return_value=["default"])
217232
feast_client.list_feature_tables = Mock()

0 commit comments

Comments
 (0)