forked from tortoise/tortoise-orm
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_two_databases.py
More file actions
98 lines (84 loc) · 4.43 KB
/
test_two_databases.py
File metadata and controls
98 lines (84 loc) · 4.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from tests.testmodels import Event, EventTwo, TeamTwo, Tournament
from tortoise import Tortoise, connections
from tortoise.backends.oracle import OracleClient
from tortoise.contrib import test
from tortoise.exceptions import OperationalError, ParamsError
from tortoise.transactions import in_transaction
class TestTwoDatabases(test.SimpleTestCase):
async def asyncSetUp(self):
await super().asyncSetUp()
if Tortoise._inited:
await self._tearDownDB()
first_db_config = test.getDBConfig(app_label="models", modules=["tests.testmodels"])
second_db_config = test.getDBConfig(app_label="events", modules=["tests.testmodels"])
merged_config = {
"connections": {**first_db_config["connections"], **second_db_config["connections"]},
"apps": {**first_db_config["apps"], **second_db_config["apps"]},
}
await Tortoise.init(merged_config, _create_db=True)
await Tortoise.generate_schemas()
self.db = connections.get("models")
self.second_db = connections.get("events")
async def asyncTearDown(self) -> None:
await Tortoise._drop_databases()
await super(TestTwoDatabases, self).asyncTearDown()
async def test_two_databases(self):
tournament = await Tournament.create(name="Tournament")
await EventTwo.create(name="Event", tournament_id=tournament.id)
with self.assertRaises(OperationalError):
if isinstance(self.db, OracleClient):
await self.db.execute_query('SELECT * FROM "eventtwo"')
else:
await self.db.execute_query("SELECT * FROM eventtwo")
if isinstance(self.db, OracleClient):
_, results = await self.second_db.execute_query('SELECT * FROM "eventtwo"')
else:
_, results = await self.second_db.execute_query("SELECT * FROM eventtwo")
self.assertEqual(dict(results[0]), {"id": 1, "name": "Event", "tournament_id": 1})
async def test_two_databases_relation(self):
tournament = await Tournament.create(name="Tournament")
event = await EventTwo.create(name="Event", tournament_id=tournament.id)
with self.assertRaises(OperationalError):
if isinstance(self.db, OracleClient):
await self.db.execute_query('SELECT * FROM "eventtwo"')
else:
await self.db.execute_query("SELECT * FROM eventtwo")
if isinstance(self.db, OracleClient):
_, results = await self.second_db.execute_query('SELECT * FROM "eventtwo"')
else:
_, results = await self.second_db.execute_query("SELECT * FROM eventtwo")
self.assertEqual(dict(results[0]), {"id": 1, "name": "Event", "tournament_id": 1})
teams = []
for i in range(2):
team = await TeamTwo.create(name=f"Team {(i + 1)}")
teams.append(team)
await event.participants.add(team)
self.assertEqual(await TeamTwo.all().order_by("name"), teams)
self.assertEqual(await event.participants.all().order_by("name"), teams)
self.assertEqual(
await TeamTwo.all().order_by("name").values("id", "name"),
[{"id": 1, "name": "Team 1"}, {"id": 2, "name": "Team 2"}],
)
self.assertEqual(
await event.participants.all().order_by("name").values("id", "name"),
[{"id": 1, "name": "Team 1"}, {"id": 2, "name": "Team 2"}],
)
async def test_two_databases_transactions_switch_db(self):
async with in_transaction("models"):
tournament = await Tournament.create(name="Tournament")
await Event.create(name="Event1", tournament=tournament)
async with in_transaction("events"):
event = await EventTwo.create(name="Event2", tournament_id=tournament.id)
team = await TeamTwo.create(name="Team 1")
await event.participants.add(team)
saved_tournament = await Tournament.filter(name="Tournament").first()
self.assertEqual(tournament.id, saved_tournament.id)
saved_event = await EventTwo.filter(tournament_id=tournament.id).first()
self.assertEqual(event.id, saved_event.id)
async def test_two_databases_transaction_paramerror(self):
with self.assertRaisesRegex(
ParamsError,
"You are running with multiple databases, so you should specify connection_name",
):
async with in_transaction():
pass