forked from cosmicpython/code
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_repository.py
More file actions
96 lines (80 loc) · 3.14 KB
/
test_repository.py
File metadata and controls
96 lines (80 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# pylint: disable=protected-access
import pytest
from domain import model
from adapters import repository
from typing import Callable
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
AsyncSessionMaker = Callable[[], AsyncSession]
@pytest.mark.asyncio
async def test_repository_can_save_a_batch(session_maker: AsyncSessionMaker) -> None:
batch = model.Batch("batch1", "RUSTY-SOAPDISH", 100, eta=None)
async with session_maker() as session:
repo = repository.SqlAlchemyRepository(session)
await repo.add(batch)
async with session_maker() as session:
async with session.begin():
rows = await session.execute(
text('SELECT ref, sku, _purchased_quantity, eta FROM "batches"')
)
assert list(rows) == [("batch1", "RUSTY-SOAPDISH", 100, None)]
async def insert_order_line(session: AsyncSession) -> int:
await session.execute(
text(
"INSERT INTO order_lines (orderid, sku, qty)"
' VALUES ("order1", "GENERIC-SOFA", 12)'
)
)
[[orderline_id]] = await session.execute(
text("SELECT id FROM order_lines WHERE orderid=:orderid AND sku=:sku"),
dict(orderid="order1", sku="GENERIC-SOFA"),
)
return int(orderline_id)
async def insert_batch(session: AsyncSession, batch_ref: str) -> int:
await session.execute(
text(
"INSERT INTO batches (ref, sku, _purchased_quantity, eta)"
' VALUES (:batch_ref, "GENERIC-SOFA", 100, null)'
),
dict(batch_ref=batch_ref),
)
[[batch_id]] = await session.execute(
text(
'SELECT id FROM batches WHERE ref=:batch_ref AND sku="GENERIC-SOFA"'
),
dict(batch_ref=batch_ref),
)
return int(batch_id)
async def insert_allocation(
session: AsyncSession, orderline_id: int, batch_id: int
) -> None:
await session.execute(
text(
"INSERT INTO allocations (orderline_id, batch_id)"
" VALUES (:orderline_id, :batch_id)",
),
dict(orderline_id=orderline_id, batch_id=batch_id),
)
@pytest.mark.asyncio
async def test_repository_can_retrieve_a_batch_with_allocations(
session_maker: AsyncSessionMaker,
) -> None:
async with session_maker() as session:
async with session.begin():
orderline_id = await insert_order_line(session)
batch1_id = await insert_batch(session, "batch1")
await insert_batch(session, "batch2")
await insert_allocation(session, orderline_id, batch1_id)
async with session_maker() as session:
rows = await session.execute("SELECT * from batches")
print(list(rows))
async with session_maker() as session:
repo = repository.SqlAlchemyRepository(session)
retrieved = await repo.get("batch1")
expected = model.Batch("batch1", "GENERIC-SOFA", 100, eta=None)
assert retrieved == expected # Batch.__eq__ only compares ref
assert retrieved.sku == expected.sku
assert retrieved._purchased_quantity == expected._purchased_quantity
assert retrieved._allocations == {
model.OrderLine("order1", "GENERIC-SOFA", 12),
}