Skip to content

Commit ceb807c

Browse files
committed
Implement an error callback and arrange for reconnection
* Splits the `setup()` method into separate `connect()` and `listen()` methods. * Adjust the `connect()` and `close()` methods so that the websocket may be reconnected and stay active. * Adds an `on_error()` callback that is invoked in the `except` block of `listen()` with default behavior of closing the connection. * Makes WebSocketClient a new-style class. These changes together are intended to make it possible to reconnect a disconnected client using an `on_error()` override like this: ``` def on_error(self, e): if isinstance(e, WebSocketConnectionClosedException): while not self.stop: try: self.close() self.connect() except Exception as e: logger.error(repr(e)) else: break else: logger.error(repr(e)) ```
1 parent 6ddf4f2 commit ceb807c

1 file changed

Lines changed: 17 additions & 8 deletions

File tree

GDAX/WebsocketClient.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,31 @@
88
from threading import Thread
99
from websocket import create_connection
1010

11-
class WebsocketClient():
11+
class WebsocketClient(object):
1212
def __init__(self, ws_url="wss://ws-feed-public.sandbox.gdax.com", product_id="BTC-USD"):
1313
if ws_url[-1] == "/":
1414
self.url = ws_url[:-1]
1515
else:
1616
self.url = ws_url
17-
self.stop = False
1817
self.product_id = product_id
19-
self.thread = Thread(target=self.setup)
18+
19+
def go():
20+
self.connect()
21+
self.listen()
22+
23+
self.thread = Thread(target=go)
2024
self.thread.start()
2125

22-
def setup(self):
26+
def connect(self):
2327
self.open()
28+
self.stop = False
2429
self.ws = create_connection(self.url)
2530
if type(self.product_id) is list:
2631
#product_ids - plural for multiple products
2732
subParams = json.dumps({"type": "subscribe", "product_ids": self.product_id})
2833
else:
2934
subParams = json.dumps({"type": "subscribe", "product_id": self.product_id})
3035
self.ws.send(subParams)
31-
self.listen()
3236

3337
def open(self):
3438
print("-- Subscribed! --")
@@ -38,16 +42,21 @@ def listen(self):
3842
try:
3943
msg = json.loads(self.ws.recv())
4044
except Exception as e:
41-
#print e
42-
break
45+
self.on_error(e)
4346
else:
4447
self.message(msg)
4548

49+
def on_error(self, e):
50+
self.close()
51+
4652
def message(self, msg):
4753
print(msg)
4854

4955
def close(self):
50-
self.ws.close()
56+
self.stop = True
57+
if self.ws and self.ws.connected:
58+
self.ws.close()
59+
self.ws = None
5160
self.closed()
5261

5362
def closed(self):

0 commit comments

Comments
 (0)