-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathclient_test.py
More file actions
165 lines (106 loc) · 5.31 KB
/
client_test.py
File metadata and controls
165 lines (106 loc) · 5.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
from datetime import datetime, timezone
from unittest.mock import MagicMock, Mock, patch
import pytest
from riverqueue import Client, InsertOpts, UniqueOpts
from riverqueue.driver import DriverProtocol, ExecutorProtocol
import sqlalchemy
from tests.simple_args import SimpleArgs
@pytest.fixture
def mock_driver() -> DriverProtocol:
return MagicMock(spec=DriverProtocol)
@pytest.fixture
def mock_exec(mock_driver) -> ExecutorProtocol:
def mock_context_manager(val) -> Mock:
context_manager_mock = MagicMock()
context_manager_mock.__enter__.return_value = val
context_manager_mock.__exit__.return_value = Mock()
return context_manager_mock
# def mock_context_manager(val) -> Mock:
# return Mock(__enter__=val, __exit__=Mock())
mock_exec = MagicMock(spec=ExecutorProtocol)
mock_driver.executor.return_value = mock_context_manager(mock_exec)
return mock_exec
@pytest.fixture
def client(mock_driver) -> Client:
return Client(mock_driver)
def test_insert_with_only_args(client, mock_exec):
mock_exec.job_get_by_kind_and_unique_properties.return_value = None
mock_exec.job_insert.return_value = "job_row"
insert_res = client.insert(SimpleArgs())
mock_exec.job_insert.assert_called_once()
assert insert_res.job == "job_row"
def test_insert_tx(mock_driver, client):
mock_exec = MagicMock(spec=ExecutorProtocol)
mock_exec.job_get_by_kind_and_unique_properties.return_value = None
mock_exec.job_insert.return_value = "job_row"
mock_tx = MagicMock(spec=sqlalchemy.Transaction)
def mock_unwrap_executor(tx: sqlalchemy.Transaction):
assert tx == mock_tx
return mock_exec
mock_driver.unwrap_executor.side_effect = mock_unwrap_executor
insert_res = client.insert_tx(mock_tx, SimpleArgs())
mock_exec.job_insert.assert_called_once()
assert insert_res.job == "job_row"
def test_insert_with_opts(client, mock_exec):
insert_opts = InsertOpts(queue="high_priority")
mock_exec.job_get_by_kind_and_unique_properties.return_value = None
mock_exec.job_insert.return_value = "job_row"
insert_res = client.insert(SimpleArgs(), insert_opts=insert_opts)
mock_exec.job_insert.assert_called_once()
assert insert_res.job == "job_row"
# Check that the InsertOpts were correctly passed to make_insert_params
call_args = mock_exec.job_insert.call_args[0][0]
assert call_args.queue == "high_priority"
def test_insert_with_unique_opts_by_args(client, mock_exec):
insert_opts = InsertOpts(unique_opts=UniqueOpts(by_args=True))
mock_exec.job_get_by_kind_and_unique_properties.return_value = None
mock_exec.job_insert.return_value = "job_row"
insert_res = client.insert(SimpleArgs(), insert_opts=insert_opts)
mock_exec.job_insert.assert_called_once()
assert insert_res.job == "job_row"
# Check that the UniqueOpts were correctly processed
call_args = mock_exec.job_insert.call_args[0][0]
assert call_args.kind == "simple"
@patch("datetime.datetime")
def test_insert_with_unique_opts_by_period(mock_datetime, client, mock_exec):
mock_datetime.now.return_value = datetime(2024, 6, 1, 12, 0, 0, tzinfo=timezone.utc)
insert_opts = InsertOpts(unique_opts=UniqueOpts(by_period=900))
mock_exec.job_get_by_kind_and_unique_properties.return_value = None
mock_exec.job_insert.return_value = "job_row"
insert_res = client.insert(SimpleArgs(), insert_opts=insert_opts)
mock_exec.job_insert.assert_called_once()
assert insert_res.job == "job_row"
# Check that the UniqueOpts were correctly processed
call_args = mock_exec.job_insert.call_args[0][0]
assert call_args.kind == "simple"
def test_insert_with_unique_opts_by_queue(client, mock_exec):
insert_opts = InsertOpts(unique_opts=UniqueOpts(by_queue=True))
mock_exec.job_get_by_kind_and_unique_properties.return_value = None
mock_exec.job_insert.return_value = "job_row"
insert_res = client.insert(SimpleArgs(), insert_opts=insert_opts)
mock_exec.job_insert.assert_called_once()
assert insert_res.job == "job_row"
# Check that the UniqueOpts were correctly processed
call_args = mock_exec.job_insert.call_args[0][0]
assert call_args.kind == "simple"
def test_insert_with_unique_opts_by_state(client, mock_exec):
insert_opts = InsertOpts(unique_opts=UniqueOpts(by_state=["available", "running"]))
mock_exec.job_get_by_kind_and_unique_properties.return_value = None
mock_exec.job_insert.return_value = "job_row"
insert_res = client.insert(SimpleArgs(), insert_opts=insert_opts)
mock_exec.job_insert.assert_called_once()
assert insert_res.job == "job_row"
# Check that the UniqueOpts were correctly processed
call_args = mock_exec.job_insert.call_args[0][0]
assert call_args.kind == "simple"
def test_check_advisory_lock_prefix_bounds():
Client(mock_driver, advisory_lock_prefix=123)
with pytest.raises(OverflowError) as ex:
Client(mock_driver, advisory_lock_prefix=-1)
assert "can't convert negative int to unsigned" == str(ex.value)
# 2^32-1 is 0xffffffff (1s for 32 bits) which fits
Client(mock_driver, advisory_lock_prefix=2**32 - 1)
# 2^32 is 0x100000000, which does not
with pytest.raises(OverflowError) as ex:
Client(mock_driver, advisory_lock_prefix=2**32)
assert "int too big to convert" == str(ex.value)