From f2205140c3c8720ce89219a83c5921c1cd6aeab7 Mon Sep 17 00:00:00 2001 From: moyosore Date: Tue, 6 Dec 2022 13:37:39 +0000 Subject: [PATCH 1/6] implement connection_state_ttl --- ably/realtime/connection.py | 31 +++++++++++++++++++++++++++++-- ably/transport/defaults.py | 4 +++- ably/types/options.py | 25 ++++++++++++++++++++----- 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/ably/realtime/connection.py b/ably/realtime/connection.py index b79898da..4082d5e0 100644 --- a/ably/realtime/connection.py +++ b/ably/realtime/connection.py @@ -21,6 +21,7 @@ class ConnectionState(str, Enum): CLOSING = 'closing' CLOSED = 'closed' FAILED = 'failed' + SUSPENDED = "suspended" @dataclass @@ -131,13 +132,27 @@ def __init__(self, realtime, initial_state): self.__ping_future = None self.__timeout_in_secs = self.options.realtime_request_timeout / 1000 self.transport: WebSocketTransport | None = None + self.__ttl_task = None + self.__retry_task = None super().__init__() def enact_state_change(self, state, reason=None): current_state = self.__state self.__state = state + print(self.state, "enact") + if self.state == ConnectionState.DISCONNECTED: + if not self.__ttl_task or self.__ttl_task.done(): + self.__ttl_task = asyncio.create_task(self.__connection_state_ttl()) self._emit('connectionstate', ConnectionStateChange(current_state, state, reason)) + async def __connection_state_ttl(self): + await asyncio.sleep(self.ably.options.connection_state_ttl) + exception = AblyException("Exceeded connectionStateTtl while in DISCONNECTED state", 504, 50003) + self.enact_state_change(ConnectionState.SUSPENDED, exception) + if self.__retry_task: + self.__retry_task.cancel() + asyncio.create_task(self.retry_connection_attempt()) + async def connect(self): if not self.__connected_future: self.__connected_future = asyncio.Future() @@ -145,11 +160,13 @@ async def connect(self): await self.__connected_future def try_connect(self): + print("erm", self.__state) task = asyncio.create_task(self._connect()) task.add_done_callback(self.on_connection_attempt_done) async def _connect(self): if self.__state == ConnectionState.CONNECTED: + self.__ttl_task.cancel() return if self.__state == ConnectionState.CONNECTING: @@ -177,14 +194,22 @@ def on_connection_attempt_done(self, task): if self.__state in (ConnectionState.CLOSED, ConnectionState.FAILED): return if self.__state != ConnectionState.DISCONNECTED: + print("howdy") if self.__connected_future: self.__connected_future.set_exception(exception) self.__connected_future = None self.enact_state_change(ConnectionState.DISCONNECTED, exception) - asyncio.create_task(self.retry_connection_attempt()) + self.__retry_task = asyncio.create_task(self.retry_connection_attempt()) async def retry_connection_attempt(self): - await asyncio.sleep(self.ably.options.disconnected_retry_timeout / 1000) + print("retrying", self.__state) + if self.state == ConnectionState.SUSPENDED: + print("suspended") + retry_timeout = self.ably.options.suspended_retry_timeout / 1000 + else: + print("not yet") + retry_timeout = self.ably.options.disconnected_retry_timeout / 1000 + await asyncio.sleep(retry_timeout) self.try_connect() async def close(self): @@ -272,6 +297,8 @@ async def on_protocol_message(self, msg): self.__connected_future = None else: log.warn('CONNECTED message received but connected_future not set') + if self.__ttl_task: + self.__ttl_task.cancel() self.enact_state_change(ConnectionState.CONNECTED) if action == ProtocolMessageAction.ERROR: # ERROR error = msg["error"] diff --git a/ably/transport/defaults.py b/ably/transport/defaults.py index 6b0fec88..915d3ef8 100644 --- a/ably/transport/defaults.py +++ b/ably/transport/defaults.py @@ -20,7 +20,9 @@ class Defaults: comet_recv_timeout = 90000 comet_send_timeout = 10000 realtime_request_timeout = 10000 - disconnected_retry_timeout = 1500 + disconnected_retry_timeout = 15000 + connection_state_ttl = 120000 + suspended_retry_timeout = 30000 transports = [] # ["web_socket", "comet"] diff --git a/ably/types/options.py b/ably/types/options.py index 0a926992..e4d8aef1 100644 --- a/ably/types/options.py +++ b/ably/types/options.py @@ -9,14 +9,13 @@ class Options(AuthOptions): - def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, - realtime_host=None, port=0, tls_port=0, use_binary_protocol=True, - queue_messages=False, recover=False, environment=None, + def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realtime_host=None, port=0, + tls_port=0, use_binary_protocol=True, queue_messages=False, recover=False, environment=None, http_open_timeout=None, http_request_timeout=None, realtime_request_timeout=None, http_max_retry_count=None, http_max_retry_duration=None, fallback_hosts=None, fallback_hosts_use_default=None, fallback_retry_timeout=None, disconnected_retry_timeout=None, - idempotent_rest_publishing=None, loop=None, auto_connect=True, - **kwargs): + idempotent_rest_publishing=None, loop=None, auto_connect=True, connection_state_ttl=None, + suspended_retry_timeout=None, **kwargs): super().__init__(**kwargs) # TODO check these defaults @@ -29,6 +28,12 @@ def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, if disconnected_retry_timeout is None: disconnected_retry_timeout = Defaults.disconnected_retry_timeout + if connection_state_ttl is None: + connection_state_ttl = Defaults.connection_state_ttl + + if suspended_retry_timeout is None: + suspended_retry_timeout = Defaults.suspended_retry_timeout + if environment is not None and rest_host is not None: raise ValueError('specify rest_host or environment, not both') @@ -62,6 +67,8 @@ def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, self.__idempotent_rest_publishing = idempotent_rest_publishing self.__loop = loop self.__auto_connect = auto_connect + self.__connection_state_ttl = connection_state_ttl + self.__suspended_retry_timeout = suspended_retry_timeout self.__rest_hosts = self.__get_rest_hosts() self.__realtime_hosts = self.__get_realtime_hosts() @@ -214,6 +221,14 @@ def loop(self): def auto_connect(self): return self.__auto_connect + @property + def connection_state_ttl(self): + return self.__connection_state_ttl + + @property + def suspended_retry_timeout(self): + return self.__suspended_retry_timeout + def __get_rest_hosts(self): """ Return the list of hosts as they should be tried. First comes the main From d5bf5186f6b93754df188d40933e30e2ae8d761c Mon Sep 17 00:00:00 2001 From: moyosore Date: Wed, 7 Dec 2022 15:06:42 +0000 Subject: [PATCH 2/6] override ttl with connection details ttl --- ably/realtime/connection.py | 16 +++++++++------- ably/types/options.py | 4 ++++ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/ably/realtime/connection.py b/ably/realtime/connection.py index 4082d5e0..a56ea69b 100644 --- a/ably/realtime/connection.py +++ b/ably/realtime/connection.py @@ -134,19 +134,21 @@ def __init__(self, realtime, initial_state): self.transport: WebSocketTransport | None = None self.__ttl_task = None self.__retry_task = None + self.__connection_details = None super().__init__() def enact_state_change(self, state, reason=None): current_state = self.__state self.__state = state - print(self.state, "enact") if self.state == ConnectionState.DISCONNECTED: if not self.__ttl_task or self.__ttl_task.done(): self.__ttl_task = asyncio.create_task(self.__connection_state_ttl()) self._emit('connectionstate', ConnectionStateChange(current_state, state, reason)) async def __connection_state_ttl(self): - await asyncio.sleep(self.ably.options.connection_state_ttl) + if self.__connection_details: + self.ably.options.connection_state_ttl = self.__connection_details["connectionStateTtl"] + await asyncio.sleep(self.ably.options.connection_state_ttl / 1000) exception = AblyException("Exceeded connectionStateTtl while in DISCONNECTED state", 504, 50003) self.enact_state_change(ConnectionState.SUSPENDED, exception) if self.__retry_task: @@ -160,7 +162,6 @@ async def connect(self): await self.__connected_future def try_connect(self): - print("erm", self.__state) task = asyncio.create_task(self._connect()) task.add_done_callback(self.on_connection_attempt_done) @@ -194,7 +195,6 @@ def on_connection_attempt_done(self, task): if self.__state in (ConnectionState.CLOSED, ConnectionState.FAILED): return if self.__state != ConnectionState.DISCONNECTED: - print("howdy") if self.__connected_future: self.__connected_future.set_exception(exception) self.__connected_future = None @@ -202,12 +202,9 @@ def on_connection_attempt_done(self, task): self.__retry_task = asyncio.create_task(self.retry_connection_attempt()) async def retry_connection_attempt(self): - print("retrying", self.__state) if self.state == ConnectionState.SUSPENDED: - print("suspended") retry_timeout = self.ably.options.suspended_retry_timeout / 1000 else: - print("not yet") retry_timeout = self.ably.options.disconnected_retry_timeout / 1000 await asyncio.sleep(retry_timeout) self.try_connect() @@ -299,6 +296,7 @@ async def on_protocol_message(self, msg): log.warn('CONNECTED message received but connected_future not set') if self.__ttl_task: self.__ttl_task.cancel() + self.__connection_details = msg['connectionDetails'] self.enact_state_change(ConnectionState.CONNECTED) if action == ProtocolMessageAction.ERROR: # ERROR error = msg["error"] @@ -337,3 +335,7 @@ def ably(self): @property def state(self): return self.__state + + @property + def connection_details(self): + return self.__connection_details diff --git a/ably/types/options.py b/ably/types/options.py index e4d8aef1..70b79b40 100644 --- a/ably/types/options.py +++ b/ably/types/options.py @@ -225,6 +225,10 @@ def auto_connect(self): def connection_state_ttl(self): return self.__connection_state_ttl + @connection_state_ttl.setter + def connection_state_ttl(self, value): + self.__connection_state_ttl = value + @property def suspended_retry_timeout(self): return self.__suspended_retry_timeout From 7b4162bd4b9b184f2c9a2fb5bd504dd9c8cf83a1 Mon Sep 17 00:00:00 2001 From: moyosore Date: Thu, 8 Dec 2022 12:45:07 +0000 Subject: [PATCH 3/6] update suspended state behaviour --- ably/realtime/connection.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/ably/realtime/connection.py b/ably/realtime/connection.py index a56ea69b..ec4a647b 100644 --- a/ably/realtime/connection.py +++ b/ably/realtime/connection.py @@ -135,12 +135,13 @@ def __init__(self, realtime, initial_state): self.__ttl_task = None self.__retry_task = None self.__connection_details = None + self.__in_suspended_state = False super().__init__() def enact_state_change(self, state, reason=None): current_state = self.__state self.__state = state - if self.state == ConnectionState.DISCONNECTED: + if self.__state == ConnectionState.DISCONNECTED: if not self.__ttl_task or self.__ttl_task.done(): self.__ttl_task = asyncio.create_task(self.__connection_state_ttl()) self._emit('connectionstate', ConnectionStateChange(current_state, state, reason)) @@ -151,9 +152,10 @@ async def __connection_state_ttl(self): await asyncio.sleep(self.ably.options.connection_state_ttl / 1000) exception = AblyException("Exceeded connectionStateTtl while in DISCONNECTED state", 504, 50003) self.enact_state_change(ConnectionState.SUSPENDED, exception) + self.__in_suspended_state = True if self.__retry_task: self.__retry_task.cancel() - asyncio.create_task(self.retry_connection_attempt()) + self.__retry_task = asyncio.create_task(self.retry_connection_attempt()) async def connect(self): if not self.__connected_future: @@ -167,7 +169,8 @@ def try_connect(self): async def _connect(self): if self.__state == ConnectionState.CONNECTED: - self.__ttl_task.cancel() + if self.__ttl_task: + self.__ttl_task.cancel() return if self.__state == ConnectionState.CONNECTING: @@ -198,14 +201,18 @@ def on_connection_attempt_done(self, task): if self.__connected_future: self.__connected_future.set_exception(exception) self.__connected_future = None - self.enact_state_change(ConnectionState.DISCONNECTED, exception) + if self.__in_suspended_state: + self.enact_state_change(ConnectionState.SUSPENDED, exception) + else: + self.enact_state_change(ConnectionState.DISCONNECTED, exception) self.__retry_task = asyncio.create_task(self.retry_connection_attempt()) async def retry_connection_attempt(self): - if self.state == ConnectionState.SUSPENDED: + if self.__in_suspended_state: retry_timeout = self.ably.options.suspended_retry_timeout / 1000 else: retry_timeout = self.ably.options.disconnected_retry_timeout / 1000 + await asyncio.sleep(retry_timeout) self.try_connect() @@ -294,6 +301,7 @@ async def on_protocol_message(self, msg): self.__connected_future = None else: log.warn('CONNECTED message received but connected_future not set') + self.__in_suspended_state = False if self.__ttl_task: self.__ttl_task.cancel() self.__connection_details = msg['connectionDetails'] From 64f330780aa2a03bb6011634265767270578e9ea Mon Sep 17 00:00:00 2001 From: moyosore Date: Fri, 9 Dec 2022 14:15:46 +0000 Subject: [PATCH 4/6] add test for connection state ttl --- ably/realtime/connection.py | 6 ++++-- ably/realtime/realtime.py | 6 ++++++ test/ably/realtimeconnection_test.py | 23 +++++++++++++++++++++++ 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/ably/realtime/connection.py b/ably/realtime/connection.py index ec4a647b..613c954c 100644 --- a/ably/realtime/connection.py +++ b/ably/realtime/connection.py @@ -212,7 +212,6 @@ async def retry_connection_attempt(self): retry_timeout = self.ably.options.suspended_retry_timeout / 1000 else: retry_timeout = self.ably.options.disconnected_retry_timeout / 1000 - await asyncio.sleep(retry_timeout) self.try_connect() @@ -242,7 +241,10 @@ async def close(self): log.warning('ConnectionManager: called close with no connected transport') self.enact_state_change(ConnectionState.CLOSED) if self.transport and self.transport.ws_connect_task is not None: - await self.transport.ws_connect_task + try: + await self.transport.ws_connect_task + except AblyException as e: + log.warning(f'Connection error encountered while closing: {e}') async def connect_impl(self): self.transport = WebSocketTransport(self) diff --git a/ably/realtime/realtime.py b/ably/realtime/realtime.py index 75e3270a..b5d80626 100644 --- a/ably/realtime/realtime.py +++ b/ably/realtime/realtime.py @@ -64,6 +64,12 @@ def __init__(self, key=None, loop=None, **kwargs): fallback_hosts: list[str] An array of fallback hosts to be used in the case of an error necessitating the use of an alternative host. If you have been provided a set of custom fallback hosts by Ably, please specify them here. + connection_state_ttl: float + The duration that Ably will persist the connection state for when a Realtime client is abruptly + disconnected. + suspended_retry_timeout: float + When the connection enters the SUSPENDED state, after this delay, if the state is still SUSPENDED, + the client library attempts to reconnect automatically. The default is 30 seconds. Raises ------ ValueError diff --git a/test/ably/realtimeconnection_test.py b/test/ably/realtimeconnection_test.py index 86883f25..3521e6bb 100644 --- a/test/ably/realtimeconnection_test.py +++ b/test/ably/realtimeconnection_test.py @@ -206,6 +206,7 @@ async def test_unroutable_host(self): assert exception.value.status_code == 504 assert ably.connection.state == ConnectionState.DISCONNECTED assert ably.connection.error_reason == exception.value + await ably.close() async def test_invalid_host(self): ably = await RestSetup.get_ably_realtime(realtime_host="iamnotahost") @@ -215,3 +216,25 @@ async def test_invalid_host(self): assert exception.value.status_code == 400 assert ably.connection.state == ConnectionState.DISCONNECTED assert ably.connection.error_reason == exception.value + await ably.close() + + async def test_connection_state_ttl(self): + ably = await RestSetup.get_ably_realtime(realtime_host="iamnotahost", connection_state_ttl=2000) + changes = [] + suspended_future = asyncio.Future() + + def on_state_change(state_change): + changes.append(state_change) + if state_change.current == ConnectionState.SUSPENDED: + suspended_future.set_result(None) + with pytest.raises(AblyException) as exception: + await ably.connect() + ably.connection.on(on_state_change) + assert exception.value.code == 40000 + assert exception.value.status_code == 400 + assert ably.connection.state == ConnectionState.DISCONNECTED + await suspended_future + assert ably.connection.state == changes[-1].current + assert ably.connection.state == ConnectionState.SUSPENDED + assert ably.connection.error_reason == changes[-1].reason + await ably.close() From d03be7a3bb7a61a05068d374541d77ee502c5100 Mon Sep 17 00:00:00 2001 From: moyosore Date: Mon, 19 Dec 2022 12:22:53 +0000 Subject: [PATCH 5/6] handle connected message --- ably/realtime/connection.py | 43 ++++++++++++++++++++++++++----------- ably/realtime/realtime.py | 5 +++-- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/ably/realtime/connection.py b/ably/realtime/connection.py index 613c954c..748056f0 100644 --- a/ably/realtime/connection.py +++ b/ably/realtime/connection.py @@ -21,13 +21,26 @@ class ConnectionState(str, Enum): CLOSING = 'closing' CLOSED = 'closed' FAILED = 'failed' - SUSPENDED = "suspended" + SUSPENDED = 'suspended' + + +class ConnectionEvent(str): + INITIALIZED = 'initialized' + CONNECTING = 'connecting' + CONNECTED = 'connected' + DISCONNECTED = 'disconnected' + CLOSING = 'closing' + CLOSED = 'closed' + FAILED = 'failed' + SUSPENDED = 'suspended' + UPDATE = 'update' @dataclass class ConnectionStateChange: previous: ConnectionState current: ConnectionState + event: ConnectionEvent reason: Optional[AblyException] = None @@ -138,20 +151,20 @@ def __init__(self, realtime, initial_state): self.__in_suspended_state = False super().__init__() - def enact_state_change(self, state, reason=None): + def enact_state_change(self, state, event, reason=None): current_state = self.__state self.__state = state if self.__state == ConnectionState.DISCONNECTED: if not self.__ttl_task or self.__ttl_task.done(): self.__ttl_task = asyncio.create_task(self.__connection_state_ttl()) - self._emit('connectionstate', ConnectionStateChange(current_state, state, reason)) + self._emit('connectionstate', ConnectionStateChange(current_state, state, event, reason)) async def __connection_state_ttl(self): if self.__connection_details: self.ably.options.connection_state_ttl = self.__connection_details["connectionStateTtl"] await asyncio.sleep(self.ably.options.connection_state_ttl / 1000) exception = AblyException("Exceeded connectionStateTtl while in DISCONNECTED state", 504, 50003) - self.enact_state_change(ConnectionState.SUSPENDED, exception) + self.enact_state_change(ConnectionState.SUSPENDED, ConnectionEvent.SUSPENDED, exception) self.__in_suspended_state = True if self.__retry_task: self.__retry_task.cancel() @@ -184,7 +197,7 @@ async def _connect(self): log.info('Connection cancelled due to request timeout. Attempting reconnection...') raise exception else: - self.enact_state_change(ConnectionState.CONNECTING) + self.enact_state_change(ConnectionState.CONNECTING, ConnectionEvent.CONNECTING) await self.connect_impl() def on_connection_attempt_done(self, task): @@ -202,9 +215,9 @@ def on_connection_attempt_done(self, task): self.__connected_future.set_exception(exception) self.__connected_future = None if self.__in_suspended_state: - self.enact_state_change(ConnectionState.SUSPENDED, exception) + self.enact_state_change(ConnectionState.SUSPENDED, ConnectionEvent.SUSPENDED, exception) else: - self.enact_state_change(ConnectionState.DISCONNECTED, exception) + self.enact_state_change(ConnectionState.DISCONNECTED, ConnectionEvent.DISCONNECTED, exception) self.__retry_task = asyncio.create_task(self.retry_connection_attempt()) async def retry_connection_attempt(self): @@ -217,19 +230,19 @@ async def retry_connection_attempt(self): async def close(self): if self.__state in (ConnectionState.CLOSED, ConnectionState.INITIALIZED, ConnectionState.FAILED): - self.enact_state_change(ConnectionState.CLOSED) + self.enact_state_change(ConnectionState.CLOSED, ConnectionEvent.CLOSED) return if self.__state is ConnectionState.DISCONNECTED: if self.transport: await self.transport.dispose() self.transport = None - self.enact_state_change(ConnectionState.CLOSED) + self.enact_state_change(ConnectionState.CLOSED, ConnectionEvent.CLOSED) return if self.__state != ConnectionState.CONNECTED: log.warning('Connection.closed called while connection state not connected') if self.__state == ConnectionState.CONNECTING: await self.__connected_future - self.enact_state_change(ConnectionState.CLOSING) + self.enact_state_change(ConnectionState.CLOSING, ConnectionEvent.CLOSING) self.__closed_future = asyncio.Future() if self.transport and self.transport.is_connected: await self.transport.close() @@ -239,7 +252,7 @@ async def close(self): raise AblyException("Timeout waiting for connection close response", 504, 50003) else: log.warning('ConnectionManager: called close with no connected transport') - self.enact_state_change(ConnectionState.CLOSED) + self.enact_state_change(ConnectionState.CLOSED, ConnectionEvent.CLOSED) if self.transport and self.transport.ws_connect_task is not None: try: await self.transport.ws_connect_task @@ -295,6 +308,7 @@ async def ping(self): async def on_protocol_message(self, msg): action = msg['action'] if action == ProtocolMessageAction.CONNECTED: # CONNECTED + msg_error = msg.get("error") if self.transport: self.transport.is_connected = True if self.__connected_future: @@ -307,12 +321,15 @@ async def on_protocol_message(self, msg): if self.__ttl_task: self.__ttl_task.cancel() self.__connection_details = msg['connectionDetails'] - self.enact_state_change(ConnectionState.CONNECTED) + if self.__state == ConnectionState.CONNECTED: + self.enact_state_change(ConnectionState.CONNECTED, ConnectionEvent.UPDATE, msg_error) + else: + self.enact_state_change(ConnectionState.CONNECTED, ConnectionEvent.CONNECTED) if action == ProtocolMessageAction.ERROR: # ERROR error = msg["error"] if error['nonfatal'] is False: exception = AblyAuthException(error["message"], error["statusCode"], error["code"]) - self.enact_state_change(ConnectionState.FAILED, exception) + self.enact_state_change(ConnectionState.FAILED, ConnectionEvent.FAILED, exception) if self.__connected_future: self.__connected_future.set_exception(exception) self.__connected_future = None diff --git a/ably/realtime/realtime.py b/ably/realtime/realtime.py index b5d80626..9b744217 100644 --- a/ably/realtime/realtime.py +++ b/ably/realtime/realtime.py @@ -62,8 +62,9 @@ def __init__(self, key=None, loop=None, **kwargs): If the connection is still in the DISCONNECTED state after this delay, the client library will attempt to reconnect automatically. The default is 15 seconds. fallback_hosts: list[str] - An array of fallback hosts to be used in the case of an error necessitating the use of an alternative host. - If you have been provided a set of custom fallback hosts by Ably, please specify them here. + An array of fallback hosts to be used in the case of an error necessitating the use of an + alternative host. If you have been provided a set of custom fallback hosts by Ably, please specify + them here. connection_state_ttl: float The duration that Ably will persist the connection state for when a Realtime client is abruptly disconnected. From ba276073c367e9a16c1150b21146996237e89103 Mon Sep 17 00:00:00 2001 From: Owen Pearson Date: Thu, 12 Jan 2023 16:01:33 +0000 Subject: [PATCH 6/6] blah --- ably/realtime/connection.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/ably/realtime/connection.py b/ably/realtime/connection.py index 748056f0..1f98f684 100644 --- a/ably/realtime/connection.py +++ b/ably/realtime/connection.py @@ -73,6 +73,7 @@ def __init__(self, realtime): self.__state = ConnectionState.CONNECTING if realtime.options.auto_connect else ConnectionState.INITIALIZED self.__connection_manager = ConnectionManager(self.__realtime, self.state) self.__connection_manager.on('connectionstate', self._on_state_update) + self.__connection_manager.on('update', self._on_connection_update) super().__init__() async def connect(self): @@ -116,6 +117,9 @@ def _on_state_update(self, state_change): self.__error_reason = state_change.reason self.__realtime.options.loop.call_soon(functools.partial(self._emit, state_change.current, state_change)) + def _on_connection_update(self, state_change): + self.__realtime.options.loop.call_soon(functools.partial(self._emit, ConnectionEvent.UPDATE, state_change)) + @property def state(self): """The current connection state of the connection""" @@ -151,20 +155,20 @@ def __init__(self, realtime, initial_state): self.__in_suspended_state = False super().__init__() - def enact_state_change(self, state, event, reason=None): + def enact_state_change(self, state, reason=None): current_state = self.__state self.__state = state if self.__state == ConnectionState.DISCONNECTED: if not self.__ttl_task or self.__ttl_task.done(): self.__ttl_task = asyncio.create_task(self.__connection_state_ttl()) - self._emit('connectionstate', ConnectionStateChange(current_state, state, event, reason)) + self._emit('connectionstate', ConnectionStateChange(current_state, state, state, reason)) async def __connection_state_ttl(self): if self.__connection_details: self.ably.options.connection_state_ttl = self.__connection_details["connectionStateTtl"] await asyncio.sleep(self.ably.options.connection_state_ttl / 1000) exception = AblyException("Exceeded connectionStateTtl while in DISCONNECTED state", 504, 50003) - self.enact_state_change(ConnectionState.SUSPENDED, ConnectionEvent.SUSPENDED, exception) + self.enact_state_change(ConnectionState.SUSPENDED, exception) self.__in_suspended_state = True if self.__retry_task: self.__retry_task.cancel() @@ -322,7 +326,12 @@ async def on_protocol_message(self, msg): self.__ttl_task.cancel() self.__connection_details = msg['connectionDetails'] if self.__state == ConnectionState.CONNECTED: - self.enact_state_change(ConnectionState.CONNECTED, ConnectionEvent.UPDATE, msg_error) + state_change = ConnectionStateChange( + ConnectionState.CONNECTED, + ConnectionState.CONNECTED, + ConnectionEvent.UPDATE, + ) + self._emit('update', state_change) else: self.enact_state_change(ConnectionState.CONNECTED, ConnectionEvent.CONNECTED) if action == ProtocolMessageAction.ERROR: # ERROR