-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_datapoint_controller.py
More file actions
181 lines (149 loc) · 5.96 KB
/
test_datapoint_controller.py
File metadata and controls
181 lines (149 loc) · 5.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from asyncio import sleep
import pytest
import datetime
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock, patch, MagicMock
from fastapi import APIRouter, FastAPI
from openscada_lite.modules.security.service import SecurityService
from openscada_lite.common.models.dtos import TagUpdateMsg
from openscada_lite.common.config.config import Config
from openscada_lite.modules.datapoint.model import DatapointModel
from openscada_lite.modules.datapoint.controller import DatapointController
@pytest.fixture(autouse=True)
def reset_config_singleton():
Config.reset_instance()
Config.get_instance("tests/config/test_config.json")
def setup_function():
Config.reset_instance()
@pytest.fixture
def model():
m = DatapointModel()
# Add a test tag for initial state
m._store["Test@TAG"] = TagUpdateMsg(
datapoint_identifier="Test@TAG",
value=123,
quality="good",
timestamp="2025-09-15T12:00:00Z",
)
return m
@pytest.fixture
def socketio_mock():
sio = MagicMock()
sio.join_room = AsyncMock()
sio.emit = AsyncMock()
return sio
@pytest.fixture
def controller(model, socketio_mock):
module_name = "datapoint"
router = APIRouter()
# Create the DatapointController instance
controller = DatapointController(model, socketio_mock, module_name, router)
controller.register_socketio() # Ensure events are registered
return controller
def test_publish_tag_blocks_when_initializing(controller):
controller._initializing_clients.add("sid1")
controller.socketio.emit.reset_mock()
controller.publish(
TagUpdateMsg(
datapoint_identifier="Test@TAG",
value=456,
quality="good",
timestamp=datetime.datetime.now(),
)
)
controller.socketio.emit.assert_not_called()
@pytest.mark.asyncio
async def test_publish_tag_emits_when_no_initializing(controller):
controller.socketio.emit.reset_mock() # Ensure clean state
# Publish a message
controller.publish(
TagUpdateMsg(
datapoint_identifier="Test@TAG",
value=456,
quality="good",
timestamp=datetime.datetime.now(),
)
)
# Wait for the batch worker to process the buffer
await sleep(1.1) # Slightly longer than the batch interval (1 second)
# Assert that socketio.emit was called
controller.socketio.emit.assert_called_once()
args, _ = controller.socketio.emit.call_args
assert args[0] == "datapoint_tagupdatemsg"
assert any(tag["datapoint_identifier"] == "Test@TAG" and tag["value"] == 456 for tag in args[1])
@pytest.mark.asyncio
async def test_handle_subscribe_live_feed_emits_initial_state(controller):
# Mock the `enter_room` method
with patch.object(controller.socketio, "enter_room", new_callable=AsyncMock) as mock_enter_room:
# Mock the `trigger_event` method
async def mock_trigger_event(event_name, *args, **kwargs):
if event_name == f"{controller.base_event}_subscribe_live_feed":
await controller.handle_subscribe_live_feed(*args, **kwargs)
controller.socketio.trigger_event = mock_trigger_event
controller.socketio.emit.reset_mock()
# Trigger the event
event_name = f"{controller.base_event}_subscribe_live_feed"
await controller.socketio.trigger_event(event_name, "sid1")
# Assert that `enter_room` was called with the correct room
mock_enter_room.assert_called_once_with("sid1", "datapoint_room")
# Verify that the initial state was emitted
found = False
for call_args in controller.socketio.emit.call_args_list:
if call_args[0][0] == "datapoint_initial_state":
tag_list = call_args[0][1]
# Check that the test tag is in the list
assert any(
tag["datapoint_identifier"] == "Test@TAG" and tag["value"] == 123
for tag in tag_list
)
found = True
assert found, "No initial_state emit found"
@pytest.mark.asyncio
async def test_set_tag_calls_service_and_emits_ack(controller):
from openscada_lite.modules.datapoint.controller import DatapointController
import json
import datetime
from unittest.mock import AsyncMock, MagicMock
from openscada_lite.common.models.dtos import (
RawTagUpdateMsg,
)
# Create the FastAPI app and include the controller's router
app = FastAPI()
router = APIRouter()
test_controller = DatapointController(MagicMock(), MagicMock(), "datapoint", router)
test_controller.service = MagicMock()
test_controller.service.handle_controller_message = AsyncMock(return_value=True)
app.include_router(router)
# Create test data
now = datetime.datetime.now()
test_data = RawTagUpdateMsg(
track_id="1234",
datapoint_identifier="Test@TAG",
value=42,
quality="good",
timestamp=now,
)
mock_security_service = MagicMock()
mock_security_service.is_allowed.return_value = True
SecurityService._instance = mock_security_service
# Use FastAPI's TestClient to simulate requests
client = TestClient(app)
response = client.post(
"/datapoint/rawtagupdatemsg",
content=json.dumps(test_data.to_dict()), # Use 'content' instead of 'data'
headers={"Content-Type": "application/json"},
)
# Assert the response
assert response.status_code == 200
assert response.json()["status"] == "ok"
assert response.json()["reason"] == "Request accepted."
assert "data" in response.json() # Optionally check for data key
# Verify that the service's handle_controller_message method was called
expected_data = RawTagUpdateMsg(
track_id="1234",
datapoint_identifier="Test@TAG",
value=42,
quality="good",
timestamp=test_data.timestamp,
)
test_controller.service.handle_controller_message.assert_called_once_with(expected_data)