forked from massive-com/client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_conn.py
More file actions
71 lines (67 loc) · 2.09 KB
/
test_conn.py
File metadata and controls
71 lines (67 loc) · 2.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from polygon import WebSocketClient
from base_ws import BaseTest
from mock_server import subs, port
import asyncio
from polygon.websocket import EquityTrade
class WebSocketsTest(BaseTest):
async def test_conn(self):
c = WebSocketClient(
api_key="", feed=f"localhost:{port}", verbose=True, secure=False
)
self.expectResponse(
[
EquityTrade(
event_type="T",
symbol="AAPL",
exchange=10,
id="5096",
tape=3,
price=161.87,
size=300,
conditions=[14, 41],
timestamp=1651684192462,
sequence_number=4009402,
)
]
)
c.subscribe("T.AAPL")
def binded(msg):
self.expectProcessor(msg)
asyncio.get_event_loop().create_task(c.connect(binded))
self.expectResponse(
[
EquityTrade(
event_type="T",
symbol="AMZN",
exchange=12,
id="72815",
tape=3,
price=161.87,
size=1,
conditions=[14, 37, 41],
timestamp=1651684192536,
sequence_number=4009408,
),
EquityTrade(
event_type="T",
symbol="AMZN",
exchange=4,
id="799",
tape=3,
price=161.87,
size=100,
conditions=None,
timestamp=1651684192717,
sequence_number=4009434,
),
]
)
c.subscribe("T.AMZN")
self.assertEqual(subs, c.subs)
c.unsubscribe_all()
self.assertEqual(subs, set())
c.subscribe("T.*")
self.assertEqual(subs, c.subs)
c.unsubscribe("T.*")
self.assertEqual(subs, set())
await c.close()