1+ import logging
12import pickle
3+ import time
24
35try :
46 import redis
79
810from .pubsub_manager import PubSubManager
911
12+ logger = logging .getLogger ('socketio' )
13+
1014
1115class RedisManager (PubSubManager ): # pragma: no cover
1216 """Redis based client manager.
@@ -38,8 +42,8 @@ def __init__(self, url='redis://localhost:6379/0', channel='socketio',
3842 raise RuntimeError ('Redis package is not installed '
3943 '(Run "pip install redis" in your '
4044 'virtualenv).' )
41- self .redis = redis . Redis . from_url ( url )
42- self .pubsub = self . redis . pubsub ()
45+ self .redis_url = url
46+ self ._redis_connect ()
4347 super (RedisManager , self ).__init__ (channel = channel ,
4448 write_only = write_only )
4549
@@ -58,13 +62,48 @@ def initialize(self):
5862 'Redis requires a monkey patched socket library to work '
5963 'with ' + self .server .async_mode )
6064
65+ def _redis_connect (self ):
66+ self .redis = redis .Redis .from_url (self .redis_url )
67+ self .pubsub = self .redis .pubsub ()
68+
6169 def _publish (self , data ):
62- return self .redis .publish (self .channel , pickle .dumps (data ))
70+ retry = True
71+ while True :
72+ try :
73+ if not retry :
74+ self ._redis_connect ()
75+ return self .redis .publish (self .channel , pickle .dumps (data ))
76+ except redis .exceptions .ConnectionError :
77+ if retry :
78+ logger .error ('Cannot publish to redis... retrying' )
79+ retry = False
80+ else :
81+ logger .error ('Cannot publish to redis... giving up' )
82+ break
83+
84+ def _redis_listen_with_retries (self ):
85+ retry_sleep = 1
86+ connect = False
87+ while True :
88+ try :
89+ if connect :
90+ self ._redis_connect ()
91+ self .pubsub .subscribe (self .channel )
92+ for message in self .pubsub .listen ():
93+ yield message
94+ except redis .exceptions .ConnectionError :
95+ logger .error ('Cannot receive from redis... '
96+ 'retrying in {} secs' .format (retry_sleep ))
97+ connect = True
98+ time .sleep (retry_sleep )
99+ retry_sleep *= 2
100+ if retry_sleep > 60 :
101+ retry_sleep = 60
63102
64103 def _listen (self ):
65104 channel = self .channel .encode ('utf-8' )
66105 self .pubsub .subscribe (self .channel )
67- for message in self .pubsub . listen ():
106+ for message in self ._redis_listen_with_retries ():
68107 if message ['channel' ] == channel and \
69108 message ['type' ] == 'message' and 'data' in message :
70109 yield message ['data' ]
0 commit comments