forked from danpaquin/coinbasepro-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWebsocketClient.py
More file actions
105 lines (86 loc) · 2.85 KB
/
WebsocketClient.py
File metadata and controls
105 lines (86 loc) · 2.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#
# GDAX/WebsocketClient.py
# Daniel Paquin
#
# Template object to receive messages from the GDAX Websocket Feed
from __future__ import print_function
import json
import time
from threading import Thread
from websocket import create_connection
class WebsocketClient(object):
def __init__(self, url=None, products=None, type=None):
if url is None:
url = "wss://ws-feed.gdax.com"
self.url = url
self.products = products
self.type = type or "subscribe"
self.stop = None
self.ws = None
self.thread = None
def start(self):
def _go():
self._connect()
self._listen()
self.onOpen()
self.ws = create_connection(self.url)
self.thread = Thread(target=_go)
self.thread.start()
def _connect(self):
if self.products is None:
self.products = ["BTC-USD"]
elif not isinstance(self.products, list):
self.products = [self.products]
if self.url[-1] == "/":
self.url = self.url[:-1]
self.stop = False
sub_params = {'type': 'subscribe', 'product_ids': self.products}
self.ws.send(json.dumps(sub_params))
if self.type is "heartbeat":
sub_params = {"type": "heartbeat", "on": True}
self.ws.send(json.dumps(sub_params))
def _listen(self):
while not self.stop:
try:
msg = json.loads(self.ws.recv())
except Exception as e:
self.onError(e)
self.close()
else:
self.onMessage(msg)
def close(self):
if self.stop is False:
if self.type is "heartbeat":
self.ws.send(json.dumps({"type": "heartbeat", "on": False}))
self.stop = True
self.onClose()
self.ws.close()
def onOpen(self):
print("-- Subscribed! --\n")
def onClose(self):
print("\n-- Socket Closed --")
def onMessage(self, msg):
print(msg)
def onError(self, e):
SystemError(e)
if __name__ == "__main__":
import GDAX, time
class myWebsocketClient(GDAX.WebsocketClient):
def onOpen(self):
self.url = "wss://ws-feed.gdax.com/"
self.products = ["BTC-USD", "ETH-USD"]
self.MessageCount = 0
print ("Lets count the messages!")
def onMessage(self, msg):
print ("Message type:", msg["type"], "\t@ %.3f" % float(msg["price"]))
self.MessageCount += 1
def onClose(self):
print ("-- Goodbye! --")
wsClient = myWebsocketClient()
wsClient.start()
print(wsClient.url, wsClient.products)
# Do some logic with the data
while (wsClient.MessageCount < 500):
print ("\nMessageCount =", "%i \n") % wsClient.MessageCount
time.sleep(1)
wsClient.close()