-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathtest_lock_manager.py
More file actions
71 lines (55 loc) · 1.93 KB
/
test_lock_manager.py
File metadata and controls
71 lines (55 loc) · 1.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from unittest.mock import patch
import pytest
from feast_spark.lock_manager import JobOperation, JobOperationLock
job_hash = "dummy_hash"
class MockRedis:
def __init__(self, cache=dict()):
self.cache = cache
def get(self, name):
if name in self.cache:
return self.cache[name]
return None
def set(self, name, value, *args, **kwargs):
if name not in self.cache:
self.cache[name] = value.encode("utf-8")
return "OK"
def delete(self, name):
if name in self.cache:
self.cache.pop(name)
return None
@pytest.fixture
def lock_config():
return {"redis_host": "localhost", "redis_port": 0, "lock_expiry": 5}
@patch("redis.Redis")
def test_lock_manager_context(mock_redis, lock_config):
mock_redis_connection = MockRedis()
mock_redis.return_value = mock_redis_connection
with JobOperationLock(
job_hash=job_hash, operation=JobOperation.START, **lock_config
) as lock:
# test lock acquired
assert lock
# verify lock key in cache
assert (
f"lock_{JobOperation.START.value}_{job_hash}" in mock_redis_connection.cache
)
# verify release
assert (
f"lock_{JobOperation.START.value}_{job_hash}" not in mock_redis_connection.cache
)
@patch("redis.Redis")
def test_lock_manager_lock_not_available(mock_redis, lock_config):
cache = {"lock_st_dummy_hash": b"127a32aaf729dc87"}
mock_redis_connection = MockRedis(cache)
mock_redis.return_value = mock_redis_connection
with JobOperationLock(
job_hash=job_hash, operation=JobOperation.START, **lock_config
) as lock:
# test lock not acquired
assert not lock
def test_lock_manager_connection_error(lock_config):
with JobOperationLock(
job_hash=job_hash, operation=JobOperation.START, **lock_config
) as lock:
# test lock not acquired
assert not lock