From 4e8b04989b230a9fa722e4dffc66492048930230 Mon Sep 17 00:00:00 2001 From: Vel Lesikov Date: Thu, 24 Jun 2021 18:30:12 -0700 Subject: [PATCH 1/7] Update requests version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index ea047851..47e1ca9c 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ install_requires = [ 'sortedcontainers>=1.5.9', - 'requests>=2.13.0', + 'requests>=2.25.0', 'six>=1.10.0', 'websocket-client>=0.40.0', 'pymongo>=3.5.1', From 5d38d21f523885340cda8b8b3fbfb091ca49a099 Mon Sep 17 00:00:00 2001 From: Mike Cardillo Date: Mon, 27 Nov 2017 16:05:13 -0800 Subject: [PATCH 2/7] Removed WebsocketClient inheritance from OrderBook class This is meant to change the application workflow to only require one WebsocketClient, which can feed messages to data structures for further processing. --- cbpro/order_book.py | 144 +++++++++++++++++++++++--------------------- 1 file changed, 76 insertions(+), 68 deletions(-) diff --git a/cbpro/order_book.py b/cbpro/order_book.py index 0f393d9b..beeefc74 100644 --- a/cbpro/order_book.py +++ b/cbpro/order_book.py @@ -6,16 +6,15 @@ from sortedcontainers import SortedDict from decimal import Decimal +import Queue import pickle from cbpro.public_client import PublicClient from cbpro.websocket_client import WebsocketClient -class OrderBook(WebsocketClient): +class OrderBook(object): def __init__(self, product_id='BTC-USD', log_to=None): - super(OrderBook, self).__init__( - products=product_id, channels=['full']) self._asks = SortedDict() self._bids = SortedDict() self._client = PublicClient() @@ -24,18 +23,7 @@ def __init__(self, product_id='BTC-USD', log_to=None): if self._log_to: assert hasattr(self._log_to, 'write') self._current_ticker = None - - @property - def product_id(self): - ''' Currently OrderBook only supports a single product even though it is stored as a list of products. ''' - return self.products[0] - - def on_open(self): - self._sequence = -1 - print("-- Subscribed to OrderBook! --\n") - - def on_close(self): - print("\n-- OrderBook Socket Closed! --") + self.product_id = product_id def reset_book(self): self._asks = SortedDict() @@ -57,33 +45,34 @@ def reset_book(self): }) self._sequence = res['sequence'] - def on_message(self, message): - if self._log_to: - pickle.dump(message, self._log_to) + def process_message(self, message): + if message.get('product_id') == self.product_id: + if self._log_to: + pickle.dump(message, self._log_to) - sequence = message.get('sequence', -1) - if self._sequence == -1: - self.reset_book() - return - if sequence <= self._sequence: - # ignore older messages (e.g. before order book initialization from getProductOrderBook) - return - elif sequence > self._sequence + 1: - self.on_sequence_gap(self._sequence, sequence) - return + sequence = message.get('sequence', -1) + if self._sequence == -1: + self.reset_book() + return + if sequence <= self._sequence: + # ignore older messages (e.g. before order book initialization from getProductOrderBook) + return + elif sequence > self._sequence + 1: + self.on_sequence_gap(self._sequence, sequence) + return - msg_type = message['type'] - if msg_type == 'open': - self.add(message) - elif msg_type == 'done' and 'price' in message: - self.remove(message) - elif msg_type == 'match': - self.match(message) - self._current_ticker = message - elif msg_type == 'change': - self.change(message) + msg_type = message['type'] + if msg_type == 'open': + self.add(message) + elif msg_type == 'done' and 'price' in message: + self.remove(message) + elif msg_type == 'match': + self.match(message) + self._current_ticker = message + elif msg_type == 'change': + self.change(message) - self._sequence = sequence + self._sequence = sequence def on_sequence_gap(self, gap_start, gap_end): self.reset_book() @@ -249,7 +238,6 @@ def set_bids(self, price, bids): import time import datetime as dt - class OrderBookConsole(OrderBook): ''' Logs real-time changes to the bid-ask spread to the console ''' @@ -262,38 +250,58 @@ def __init__(self, product_id=None): self._bid_depth = None self._ask_depth = None - def on_message(self, message): - super(OrderBookConsole, self).on_message(message) - - # Calculate newest bid-ask spread - bid = self.get_bid() - bids = self.get_bids(bid) - bid_depth = sum([b['size'] for b in bids]) - ask = self.get_ask() - asks = self.get_asks(ask) - ask_depth = sum([a['size'] for a in asks]) - - if self._bid == bid and self._ask == ask and self._bid_depth == bid_depth and self._ask_depth == ask_depth: - # If there are no changes to the bid-ask spread since the last update, no need to print - pass - else: - # If there are differences, update the cache - self._bid = bid - self._ask = ask - self._bid_depth = bid_depth - self._ask_depth = ask_depth - print('{} {} bid: {:.3f} @ {:.2f}\task: {:.3f} @ {:.2f}'.format( - dt.datetime.now(), self.product_id, bid_depth, bid, ask_depth, ask)) - - order_book = OrderBookConsole() - order_book.start() + def process_message(self, message): + if message.get('product_id') == self.product_id: + super(OrderBookConsole, self).process_message(message) + + try: + # Calculate newest bid-ask spread + bid = self.get_bid() + bids = self.get_bids(bid) + bid_depth = sum([b['size'] for b in bids]) + ask = self.get_ask() + asks = self.get_asks(ask) + ask_depth = sum([a['size'] for a in asks]) + + if self._bid == bid and self._ask == ask and self._bid_depth == bid_depth and self._ask_depth == ask_depth: + # If there are no changes to the bid-ask spread since the last update, no need to print + pass + else: + # If there are differences, update the cache + self._bid = bid + self._ask = ask + self._bid_depth = bid_depth + self._ask_depth = ask_depth + print('{} {} bid: {:.3f} @ {:.2f}\task: {:.3f} @ {:.2f}'.format( + dt.datetime.now(), self.product_id, bid_depth, bid, ask_depth, ask)) + except Exception: + pass + + class WebsocketConsole(WebsocketClient): + def on_open(self): + self.products = ['BTC-USD', 'ETH-USD'] + self.websocket_queue = Queue.Queue() + + def on_message(self, msg): + self.websocket_queue.put(msg) + + order_book_btc = OrderBookConsole(product_id='BTC-USD') + order_book_eth = OrderBookConsole(product_id='ETH-USD') + + wsClient = WebsocketConsole() + wsClient.start() + time.sleep(10) try: while True: - time.sleep(10) + msg = wsClient.websocket_queue.get(timeout=15) + order_book_btc.process_message(msg) + order_book_eth.process_message(msg) except KeyboardInterrupt: - order_book.close() + wsClient.close() + except Exception: + pass - if order_book.error: + if wsClient.error: sys.exit(1) else: sys.exit(0) From bb66088ddeec2de63ac101d8a81c6c11a921da15 Mon Sep 17 00:00:00 2001 From: Mike Cardillo Date: Mon, 27 Nov 2017 17:07:36 -0800 Subject: [PATCH 3/7] Updated README to reflect OrderBook changes --- README.md | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 37872be3..b1a63c14 100644 --- a/README.md +++ b/README.md @@ -346,16 +346,30 @@ python -m pytest ``` ### Real-time OrderBook -The ```OrderBook``` subscribes to a websocket and keeps a real-time record of -the orderbook for the product_id input. Please provide your feedback for future +The ```OrderBook``` is a convenient data structure to keep a real-time record of +the orderbook for the product_id input. It processes incoming messages from an +already existing WebsocketClient. Please provide your feedback for future improvements. ```python -import cbpro, time -order_book = cbpro.OrderBook(product_id='BTC-USD') -order_book.start() +import cbpro, time, Queue +class myWebsocketClient(cbpro.WebsocketClient): + def on_open(self): + self.products = ['BTC-USD', 'ETH-USD'] + self.websocket_queue = Queue.Queue() + def on_message(self, msg): + self.websocket_queue.put(msg) + +order_book_btc = cbpro.OrderBook(product_id='BTC-USD') +order_book_eth = cbpro.OrderBook(product_id='ETH-USD') +wsClient = myWebsocketClient() +wsClient.start() time.sleep(10) -order_book.close() +while True: + msg = wsClient.websocket_queue.get(timeout=15) + order_book.process_message(msg) + print(order_book_btc.get_ask()) + print(order_book_eth.get_bid()) ``` ### Testing From 002fdfe4ca5b6b124088fe124f6687abdd2cbae7 Mon Sep 17 00:00:00 2001 From: Mike Cardillo Date: Thu, 30 Nov 2017 13:50:13 -0800 Subject: [PATCH 4/7] Remove print from OrderBook.on_sequence_gap() --- cbpro/order_book.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/cbpro/order_book.py b/cbpro/order_book.py index beeefc74..035fa52f 100644 --- a/cbpro/order_book.py +++ b/cbpro/order_book.py @@ -76,9 +76,6 @@ def process_message(self, message): def on_sequence_gap(self, gap_start, gap_end): self.reset_book() - print('Error: messages missing ({} - {}). Re-initializing book at sequence.'.format( - gap_start, gap_end, self._sequence)) - def add(self, order): order = { From 53ab2f3a9630bfea0b54d3810eba1b0c81507942 Mon Sep 17 00:00:00 2001 From: Mike Cardillo Date: Thu, 30 Nov 2017 13:57:21 -0800 Subject: [PATCH 5/7] Updated contributors --- contributors.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/contributors.txt b/contributors.txt index b1e5c495..3a6695b6 100644 --- a/contributors.txt +++ b/contributors.txt @@ -3,4 +3,5 @@ Leonard Lin Jeff Gibson David Caseria Paul Mestemaker -Drew Rice \ No newline at end of file +Drew Rice +Mike Cardillo \ No newline at end of file From 13147c0c923e3afe91f889ad3572391e7a158e71 Mon Sep 17 00:00:00 2001 From: Mike Cardillo Date: Sun, 10 Dec 2017 14:29:46 -0800 Subject: [PATCH 6/7] Removed reference to Queue in WebsocketConsole Removed to maintain backward compatability --- cbpro/order_book.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/cbpro/order_book.py b/cbpro/order_book.py index 035fa52f..1a8525e5 100644 --- a/cbpro/order_book.py +++ b/cbpro/order_book.py @@ -6,7 +6,6 @@ from sortedcontainers import SortedDict from decimal import Decimal -import Queue import pickle from cbpro.public_client import PublicClient @@ -277,22 +276,19 @@ def process_message(self, message): class WebsocketConsole(WebsocketClient): def on_open(self): self.products = ['BTC-USD', 'ETH-USD'] - self.websocket_queue = Queue.Queue() + self.order_book_btc = OrderBookConsole(product_id='BTC-USD') + self.order_book_eth = OrderBookConsole(product_id='ETH-USD') def on_message(self, msg): - self.websocket_queue.put(msg) - - order_book_btc = OrderBookConsole(product_id='BTC-USD') - order_book_eth = OrderBookConsole(product_id='ETH-USD') + self.order_book_btc.process_message(msg) + self.order_book_eth.process_message(msg) wsClient = WebsocketConsole() wsClient.start() time.sleep(10) try: while True: - msg = wsClient.websocket_queue.get(timeout=15) - order_book_btc.process_message(msg) - order_book_eth.process_message(msg) + pass except KeyboardInterrupt: wsClient.close() except Exception: From 789bbab7c5183d9d29f956b24a2ad5ae9705549c Mon Sep 17 00:00:00 2001 From: Mike Cardillo Date: Sun, 10 Dec 2017 21:27:46 -0800 Subject: [PATCH 7/7] Updated README to give example without Queue --- README.md | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index b1a63c14..456d212e 100644 --- a/README.md +++ b/README.md @@ -356,20 +356,19 @@ import cbpro, time, Queue class myWebsocketClient(cbpro.WebsocketClient): def on_open(self): self.products = ['BTC-USD', 'ETH-USD'] - self.websocket_queue = Queue.Queue() + self.order_book_btc = OrderBookConsole(product_id='BTC-USD') + self.order_book_eth = OrderBookConsole(product_id='ETH-USD') def on_message(self, msg): - self.websocket_queue.put(msg) + self.order_book_btc.process_message(msg) + self.order_book_eth.process_message(msg) -order_book_btc = cbpro.OrderBook(product_id='BTC-USD') -order_book_eth = cbpro.OrderBook(product_id='ETH-USD') wsClient = myWebsocketClient() wsClient.start() time.sleep(10) while True: - msg = wsClient.websocket_queue.get(timeout=15) - order_book.process_message(msg) - print(order_book_btc.get_ask()) - print(order_book_eth.get_bid()) + print(wsClient.order_book_btc.get_ask()) + print(wsClient.order_book_eth.get_bid()) + time.sleep(1) ``` ### Testing