-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_datapoint_manager.py
More file actions
106 lines (83 loc) · 3.51 KB
/
test_datapoint_manager.py
File metadata and controls
106 lines (83 loc) · 3.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import pytest
import asyncio
import datetime
from openscada_lite.common.config.config import Config
from openscada_lite.modules.datapoint.model import DatapointModel
from openscada_lite.modules.datapoint.service import DatapointService
from openscada_lite.common.bus.event_bus import EventBus
from openscada_lite.common.bus.event_types import EventType
from openscada_lite.common.models.dtos import RawTagUpdateMsg, TagUpdateMsg
@pytest.fixture(autouse=True)
def reset_config_singleton():
Config.reset_instance()
Config.get_instance("tests/test_config.json")
def setup_function():
Config.reset_instance()
#Reset the bus for each test
@pytest.fixture(autouse=True)
def reset_event_bus(monkeypatch):
# Reset the singleton before each test
monkeypatch.setattr(EventBus, "_instance", None)
@pytest.mark.asyncio
async def test_update_tag_quality_and_timestamp():
bus = EventBus.get_instance()
dp_engine = DatapointService(bus, DatapointModel(), None)
results = []
async def capture(msg: TagUpdateMsg):
results.append(msg)
bus.subscribe(EventType.TAG_UPDATE, capture)
now = datetime.datetime.now()
await dp_engine.handle_controller_message(RawTagUpdateMsg("WaterTank@TANK", 55.5, "bad", now))
tag = dp_engine.model.get("WaterTank@TANK")
assert tag.value == 55.5
assert tag.quality == "bad"
assert tag.timestamp == now
await asyncio.sleep(0.01)
assert results[0].quality == "bad"
assert results[0].timestamp == now
@pytest.mark.asyncio
async def test_get_tag_returns_none_for_missing_tag():
bus = EventBus.get_instance()
dp_engine = DatapointService(bus, DatapointModel(), None)
tag = dp_engine.model.get("AuxServer@NON_EXISTENT")
assert tag is None
@pytest.mark.asyncio
async def test_multiple_tag_updates():
bus = EventBus.get_instance()
dp_engine = DatapointService(bus, DatapointModel(), None)
results = []
async def capture(msg: TagUpdateMsg):
results.append(msg)
bus.subscribe(EventType.TAG_UPDATE, capture)
await dp_engine.handle_controller_message(RawTagUpdateMsg("WaterTank@TANK", 10, "good", None))
await dp_engine.handle_controller_message(RawTagUpdateMsg("WaterTank@TANK", 20, "good", None))
await dp_engine.handle_controller_message(RawTagUpdateMsg("WaterTank@TANK", 15, "good", None))
await asyncio.sleep(0.01)
assert dp_engine.model.get("WaterTank@TANK").value == 15
assert results[0].value == 10
assert results[1].value == 20
assert results[2].value == 15
@pytest.mark.asyncio
async def test_update_tag_without_optional_fields():
bus = EventBus.get_instance()
dp_engine = DatapointService(bus, DatapointModel(), None)
results = []
async def capture(msg: TagUpdateMsg):
results.append(msg)
bus.subscribe(EventType.TAG_UPDATE, capture)
await dp_engine.handle_controller_message(RawTagUpdateMsg("WaterTank@TANK", 99, "good", None))
tag = dp_engine.model.get("WaterTank@TANK")
assert tag.value == 99
assert tag.quality == "good" # Default quality
assert tag.timestamp is None
await asyncio.sleep(0.01)
assert results[0].value == 99
@pytest.mark.asyncio
async def test_update_invalid_tag():
bus = EventBus.get_instance()
dp_engine = DatapointService(bus, DatapointModel(), None)
# Attempt to update a tag not in config
await dp_engine.handle_controller_message(RawTagUpdateMsg("ServerX.UNKNOWN_TAG", 42, "bad", None))
# Should not appear in internal state
tag = dp_engine.model.get("ServerX.UNKNOWN_TAG")
assert tag is None