-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_datapoint_app.py
More file actions
90 lines (74 loc) · 2.75 KB
/
test_datapoint_app.py
File metadata and controls
90 lines (74 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import os
os.environ["SCADA_CONFIG_PATH"] = "tests"
import asyncio
import pytest
import threading
import time
import socketio
from openscada_lite.app import app, socketio as flask_socketio
from common.models.dtos import RawTagUpdateMsg
import requests
SERVER_URL = "http://localhost:5000"
@pytest.fixture(autouse=True)
def ensure_svg_folder_exists():
svg_dir = os.path.abspath("config/svg")
if not os.path.exists(svg_dir):
os.makedirs(svg_dir)
@pytest.fixture(scope="module", autouse=True)
def run_server():
# Start the Flask app in a background thread
thread = threading.Thread(
target=lambda: flask_socketio.run(app, port=5000, allow_unsafe_werkzeug=True),
daemon=True
)
flask_socketio.start_background_task = immediate_call
thread.start()
time.sleep(1) # Give the server time to start
yield
# No explicit shutdown; daemon thread will exit with pytest
def immediate_call(func, *args, **kwargs):
import asyncio
if asyncio.iscoroutinefunction(func):
asyncio.run(func(*args, **kwargs))
else:
func(*args, **kwargs)
@pytest.mark.asyncio
async def test_live_feed_and_set_tag_real():
# Connect to the live feed using a real SocketIO client
sio = socketio.Client()
received_initial = []
received_updates = []
@sio.on('datapoint_initial_state')
def on_initial_state(data):
received_initial.append(data)
@sio.on('datapoint_tagupdatemsg')
def on_datapoint_update(data):
received_updates.append(data)
sio.connect(SERVER_URL)
sio.emit('datapoint_subscribe_live_feed')
await asyncio.sleep(1) # Wait for initial state
# Check initial state contains all tags from test_config.json
import json, os
with open(os.path.join(os.path.dirname(__file__), "test_config.json")) as f:
config = json.load(f)
expected_tags = []
for driver in config["drivers"]:
for dp in driver["datapoints"]:
expected_tags.append(f"{driver['name']}@{dp['name']}")
assert received_initial, "No initial_state received"
initial_tags = {dp['datapoint_identifier'] for dp in received_initial[0]}
for tag in expected_tags:
assert tag in initial_tags
# Set a value for one tag using HTTP POST (if you have a REST endpoint), or via WebSocket
test_tag = expected_tags[0]
test_value = 123.45
requests.post(
f"{SERVER_URL}/datapoint_send_rawtagupdatemsg",
json=RawTagUpdateMsg(test_tag, test_value, "good", None).to_dict()
)
await asyncio.sleep(1) # Wait for update
assert received_updates, "No datapoint_update received after set_tag"
update = received_updates[-1]
assert update['datapoint_identifier'] == test_tag
assert update['value'] == test_value
sio.disconnect()