forked from massive-com/client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmock_server.py
More file actions
104 lines (97 loc) · 3.51 KB
/
mock_server.py
File metadata and controls
104 lines (97 loc) · 3.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from websockets import serve # type: ignore
import asyncio
import json
port = 8765
subs = set()
async def mock_server(websocket):
await websocket.send(
'[{"ev":"status","status":"connected","message":"Connected Successfully"}]'
)
async for message in websocket:
message = json.loads(message)
if "action" not in message:
continue
if message["action"] == "auth":
await websocket.send(
'[{"ev":"status","status":"auth_success","message":"authenticated"}]'
)
elif message["action"] == "subscribe":
for p in message["params"].split(","):
subs.add(p)
await websocket.send(
json.dumps(
[
{
"ev": "status",
"status": "success",
"message": f"subscribed to: {p}",
}
]
)
)
await websocket.send(
json.dumps(
[
{
"ev": "T",
"sym": "AAPL",
"i": "5096",
"x": 10,
"p": 161.87,
"s": 300,
"c": [14, 41],
"t": 1651684192462,
"q": 4009402,
"z": 3,
}
]
)
)
await websocket.send(
json.dumps(
[
{
"ev": "T",
"sym": "AMZN",
"i": "72815",
"x": 12,
"p": 161.87,
"s": 1,
"c": [14, 37, 41],
"t": 1651684192536,
"q": 4009408,
"z": 3,
},
{
"ev": "T",
"sym": "AMZN",
"i": "799",
"x": 4,
"p": 161.87,
"s": 100,
"t": 1651684192717,
"q": 4009434,
"z": 3,
},
]
)
)
elif message["action"] == "unsubscribe":
for p in message["params"].split(","):
subs.discard(p)
await websocket.send(
json.dumps(
[
{
"ev": "status",
"status": "success",
"message": f"unsubscribed to: {p}",
}
]
)
)
async def run_mock_server():
async with serve(mock_server, "localhost", port):
await asyncio.Future() # run forever
if __name__ == "__main__":
asyncio.run(run_mock_server())