-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_command_app.py
More file actions
135 lines (110 loc) · 4.01 KB
/
test_command_app.py
File metadata and controls
135 lines (110 loc) · 4.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
os.makedirs("svg", exist_ok=True)
os.environ["SCADA_CONFIG_PATH"] = "tests"
import asyncio
import pytest
import threading
import time
import socketio
import requests
from openscada_lite.common.bus.event_bus import EventBus
from openscada_lite.modules.command.service import CommandService
from openscada_lite.common.config.config import Config
from openscada_lite.common.models.dtos import CommandFeedbackMsg, SendCommandMsg
from openscada_lite.modules.command.model import CommandModel
from openscada_lite.app import app, socketio as flask_socketio
SERVER_URL = "http://localhost:5000"
@pytest.fixture(autouse=True)
def reset_event_bus(monkeypatch):
# Reset the singleton before each test
monkeypatch.setattr(EventBus, "_instance", None)
@pytest.fixture(scope="module", autouse=True)
def run_server():
# Start the Flask app in a background thread
thread = threading.Thread(
target=lambda: flask_socketio.run(
app, port=5000, allow_unsafe_werkzeug=True
),
daemon=True,
)
flask_socketio.start_background_task = immediate_call
thread.start()
time.sleep(1) # Give the server time to start
yield
# No explicit shutdown; daemon thread will exit with pytest
def immediate_call(func, *args, **kwargs):
import asyncio
if asyncio.iscoroutinefunction(func):
asyncio.run(func(*args, **kwargs))
else:
func(*args, **kwargs)
@pytest.mark.asyncio
async def test_command_live_feed_and_feedback():
sio = socketio.Client()
received_initial = []
received_feedback = []
@sio.on("command_initial_state")
def on_initial_state(data):
received_initial.append(data)
@sio.on("command_commandfeedbackmsg")
def on_command_feedback(data):
received_feedback.append(data)
sio.connect(SERVER_URL)
sio.emit("command_subscribe_live_feed")
await asyncio.sleep(1) # Wait for initial state
# Optionally check initial state
assert received_initial is not None
# Send a command
test_command = SendCommandMsg(
command_id="testcmd1",
datapoint_identifier="WaterTank@TANK",
value=42,
)
response = requests.post(
f"{SERVER_URL}/command_send_sendcommandmsg",
json=test_command.to_dict()
)
assert response.status_code == 200
assert received_feedback, "No command feedback received after sending command"
feedback = received_feedback[-1]
assert feedback["command_id"] == "testcmd1"
assert feedback["datapoint_identifier"] == "WaterTank@TANK"
# Optionally check feedback['feedback'] or other fields
sio.disconnect()
def test_command_model_initial_load(monkeypatch):
# Mock Config.get_instance().get_allowed_command_identifiers()
allowed = ["WaterTank@CMD1", "AuxServer@CMD2"]
class DummyConfig:
def get_allowed_command_identifiers(self):
return allowed
monkeypatch.setattr(Config, "get_instance", lambda: DummyConfig())
model = CommandModel()
# The model should have all allowed commands in its _store
for cmd_id in allowed:
assert cmd_id in model._store
feedback = model._store[cmd_id]
assert feedback.datapoint_identifier == cmd_id
assert feedback.feedback is None
@pytest.mark.asyncio
async def test_command_service_handle_bus_message(monkeypatch):
allowed = ["WaterTank@CMD1"]
class DummyConfig:
def get_allowed_command_identifiers(self):
return allowed
monkeypatch.setattr(Config, "get_instance", lambda: DummyConfig())
bus = EventBus.get_instance()
model = CommandModel()
service = CommandService(bus, model, controller=None)
msg = CommandFeedbackMsg(
command_id="cmd123",
datapoint_identifier="WaterTank@CMD1",
value=42,
feedback="OK",
timestamp=None
)
await service.handle_bus_message(msg)
stored = model._store.get("WaterTank@CMD1")
assert stored is not None
assert stored.command_id == "cmd123"
assert stored.value == 42
assert stored.feedback == "OK"