diff --git a/ably/realtime/connection.py b/ably/realtime/connection.py index b79898da..1f98f684 100644 --- a/ably/realtime/connection.py +++ b/ably/realtime/connection.py @@ -21,12 +21,26 @@ class ConnectionState(str, Enum): CLOSING = 'closing' CLOSED = 'closed' FAILED = 'failed' + SUSPENDED = 'suspended' + + +class ConnectionEvent(str): + INITIALIZED = 'initialized' + CONNECTING = 'connecting' + CONNECTED = 'connected' + DISCONNECTED = 'disconnected' + CLOSING = 'closing' + CLOSED = 'closed' + FAILED = 'failed' + SUSPENDED = 'suspended' + UPDATE = 'update' @dataclass class ConnectionStateChange: previous: ConnectionState current: ConnectionState + event: ConnectionEvent reason: Optional[AblyException] = None @@ -59,6 +73,7 @@ def __init__(self, realtime): self.__state = ConnectionState.CONNECTING if realtime.options.auto_connect else ConnectionState.INITIALIZED self.__connection_manager = ConnectionManager(self.__realtime, self.state) self.__connection_manager.on('connectionstate', self._on_state_update) + self.__connection_manager.on('update', self._on_connection_update) super().__init__() async def connect(self): @@ -102,6 +117,9 @@ def _on_state_update(self, state_change): self.__error_reason = state_change.reason self.__realtime.options.loop.call_soon(functools.partial(self._emit, state_change.current, state_change)) + def _on_connection_update(self, state_change): + self.__realtime.options.loop.call_soon(functools.partial(self._emit, ConnectionEvent.UPDATE, state_change)) + @property def state(self): """The current connection state of the connection""" @@ -131,12 +149,30 @@ def __init__(self, realtime, initial_state): self.__ping_future = None self.__timeout_in_secs = self.options.realtime_request_timeout / 1000 self.transport: WebSocketTransport | None = None + self.__ttl_task = None + self.__retry_task = None + self.__connection_details = None + self.__in_suspended_state = False super().__init__() def enact_state_change(self, state, reason=None): current_state = self.__state self.__state = state - self._emit('connectionstate', ConnectionStateChange(current_state, state, reason)) + if self.__state == ConnectionState.DISCONNECTED: + if not self.__ttl_task or self.__ttl_task.done(): + self.__ttl_task = asyncio.create_task(self.__connection_state_ttl()) + self._emit('connectionstate', ConnectionStateChange(current_state, state, state, reason)) + + async def __connection_state_ttl(self): + if self.__connection_details: + self.ably.options.connection_state_ttl = self.__connection_details["connectionStateTtl"] + await asyncio.sleep(self.ably.options.connection_state_ttl / 1000) + exception = AblyException("Exceeded connectionStateTtl while in DISCONNECTED state", 504, 50003) + self.enact_state_change(ConnectionState.SUSPENDED, exception) + self.__in_suspended_state = True + if self.__retry_task: + self.__retry_task.cancel() + self.__retry_task = asyncio.create_task(self.retry_connection_attempt()) async def connect(self): if not self.__connected_future: @@ -150,6 +186,8 @@ def try_connect(self): async def _connect(self): if self.__state == ConnectionState.CONNECTED: + if self.__ttl_task: + self.__ttl_task.cancel() return if self.__state == ConnectionState.CONNECTING: @@ -163,7 +201,7 @@ async def _connect(self): log.info('Connection cancelled due to request timeout. Attempting reconnection...') raise exception else: - self.enact_state_change(ConnectionState.CONNECTING) + self.enact_state_change(ConnectionState.CONNECTING, ConnectionEvent.CONNECTING) await self.connect_impl() def on_connection_attempt_done(self, task): @@ -180,28 +218,35 @@ def on_connection_attempt_done(self, task): if self.__connected_future: self.__connected_future.set_exception(exception) self.__connected_future = None - self.enact_state_change(ConnectionState.DISCONNECTED, exception) - asyncio.create_task(self.retry_connection_attempt()) + if self.__in_suspended_state: + self.enact_state_change(ConnectionState.SUSPENDED, ConnectionEvent.SUSPENDED, exception) + else: + self.enact_state_change(ConnectionState.DISCONNECTED, ConnectionEvent.DISCONNECTED, exception) + self.__retry_task = asyncio.create_task(self.retry_connection_attempt()) async def retry_connection_attempt(self): - await asyncio.sleep(self.ably.options.disconnected_retry_timeout / 1000) + if self.__in_suspended_state: + retry_timeout = self.ably.options.suspended_retry_timeout / 1000 + else: + retry_timeout = self.ably.options.disconnected_retry_timeout / 1000 + await asyncio.sleep(retry_timeout) self.try_connect() async def close(self): if self.__state in (ConnectionState.CLOSED, ConnectionState.INITIALIZED, ConnectionState.FAILED): - self.enact_state_change(ConnectionState.CLOSED) + self.enact_state_change(ConnectionState.CLOSED, ConnectionEvent.CLOSED) return if self.__state is ConnectionState.DISCONNECTED: if self.transport: await self.transport.dispose() self.transport = None - self.enact_state_change(ConnectionState.CLOSED) + self.enact_state_change(ConnectionState.CLOSED, ConnectionEvent.CLOSED) return if self.__state != ConnectionState.CONNECTED: log.warning('Connection.closed called while connection state not connected') if self.__state == ConnectionState.CONNECTING: await self.__connected_future - self.enact_state_change(ConnectionState.CLOSING) + self.enact_state_change(ConnectionState.CLOSING, ConnectionEvent.CLOSING) self.__closed_future = asyncio.Future() if self.transport and self.transport.is_connected: await self.transport.close() @@ -211,9 +256,12 @@ async def close(self): raise AblyException("Timeout waiting for connection close response", 504, 50003) else: log.warning('ConnectionManager: called close with no connected transport') - self.enact_state_change(ConnectionState.CLOSED) + self.enact_state_change(ConnectionState.CLOSED, ConnectionEvent.CLOSED) if self.transport and self.transport.ws_connect_task is not None: - await self.transport.ws_connect_task + try: + await self.transport.ws_connect_task + except AblyException as e: + log.warning(f'Connection error encountered while closing: {e}') async def connect_impl(self): self.transport = WebSocketTransport(self) @@ -264,6 +312,7 @@ async def ping(self): async def on_protocol_message(self, msg): action = msg['action'] if action == ProtocolMessageAction.CONNECTED: # CONNECTED + msg_error = msg.get("error") if self.transport: self.transport.is_connected = True if self.__connected_future: @@ -272,12 +321,24 @@ async def on_protocol_message(self, msg): self.__connected_future = None else: log.warn('CONNECTED message received but connected_future not set') - self.enact_state_change(ConnectionState.CONNECTED) + self.__in_suspended_state = False + if self.__ttl_task: + self.__ttl_task.cancel() + self.__connection_details = msg['connectionDetails'] + if self.__state == ConnectionState.CONNECTED: + state_change = ConnectionStateChange( + ConnectionState.CONNECTED, + ConnectionState.CONNECTED, + ConnectionEvent.UPDATE, + ) + self._emit('update', state_change) + else: + self.enact_state_change(ConnectionState.CONNECTED, ConnectionEvent.CONNECTED) if action == ProtocolMessageAction.ERROR: # ERROR error = msg["error"] if error['nonfatal'] is False: exception = AblyAuthException(error["message"], error["statusCode"], error["code"]) - self.enact_state_change(ConnectionState.FAILED, exception) + self.enact_state_change(ConnectionState.FAILED, ConnectionEvent.FAILED, exception) if self.__connected_future: self.__connected_future.set_exception(exception) self.__connected_future = None @@ -310,3 +371,7 @@ def ably(self): @property def state(self): return self.__state + + @property + def connection_details(self): + return self.__connection_details diff --git a/ably/realtime/realtime.py b/ably/realtime/realtime.py index 75e3270a..9b744217 100644 --- a/ably/realtime/realtime.py +++ b/ably/realtime/realtime.py @@ -62,8 +62,15 @@ def __init__(self, key=None, loop=None, **kwargs): If the connection is still in the DISCONNECTED state after this delay, the client library will attempt to reconnect automatically. The default is 15 seconds. fallback_hosts: list[str] - An array of fallback hosts to be used in the case of an error necessitating the use of an alternative host. - If you have been provided a set of custom fallback hosts by Ably, please specify them here. + An array of fallback hosts to be used in the case of an error necessitating the use of an + alternative host. If you have been provided a set of custom fallback hosts by Ably, please specify + them here. + connection_state_ttl: float + The duration that Ably will persist the connection state for when a Realtime client is abruptly + disconnected. + suspended_retry_timeout: float + When the connection enters the SUSPENDED state, after this delay, if the state is still SUSPENDED, + the client library attempts to reconnect automatically. The default is 30 seconds. Raises ------ ValueError diff --git a/ably/transport/defaults.py b/ably/transport/defaults.py index 6b0fec88..915d3ef8 100644 --- a/ably/transport/defaults.py +++ b/ably/transport/defaults.py @@ -20,7 +20,9 @@ class Defaults: comet_recv_timeout = 90000 comet_send_timeout = 10000 realtime_request_timeout = 10000 - disconnected_retry_timeout = 1500 + disconnected_retry_timeout = 15000 + connection_state_ttl = 120000 + suspended_retry_timeout = 30000 transports = [] # ["web_socket", "comet"] diff --git a/ably/types/options.py b/ably/types/options.py index 0a926992..70b79b40 100644 --- a/ably/types/options.py +++ b/ably/types/options.py @@ -9,14 +9,13 @@ class Options(AuthOptions): - def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, - realtime_host=None, port=0, tls_port=0, use_binary_protocol=True, - queue_messages=False, recover=False, environment=None, + def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realtime_host=None, port=0, + tls_port=0, use_binary_protocol=True, queue_messages=False, recover=False, environment=None, http_open_timeout=None, http_request_timeout=None, realtime_request_timeout=None, http_max_retry_count=None, http_max_retry_duration=None, fallback_hosts=None, fallback_hosts_use_default=None, fallback_retry_timeout=None, disconnected_retry_timeout=None, - idempotent_rest_publishing=None, loop=None, auto_connect=True, - **kwargs): + idempotent_rest_publishing=None, loop=None, auto_connect=True, connection_state_ttl=None, + suspended_retry_timeout=None, **kwargs): super().__init__(**kwargs) # TODO check these defaults @@ -29,6 +28,12 @@ def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, if disconnected_retry_timeout is None: disconnected_retry_timeout = Defaults.disconnected_retry_timeout + if connection_state_ttl is None: + connection_state_ttl = Defaults.connection_state_ttl + + if suspended_retry_timeout is None: + suspended_retry_timeout = Defaults.suspended_retry_timeout + if environment is not None and rest_host is not None: raise ValueError('specify rest_host or environment, not both') @@ -62,6 +67,8 @@ def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, self.__idempotent_rest_publishing = idempotent_rest_publishing self.__loop = loop self.__auto_connect = auto_connect + self.__connection_state_ttl = connection_state_ttl + self.__suspended_retry_timeout = suspended_retry_timeout self.__rest_hosts = self.__get_rest_hosts() self.__realtime_hosts = self.__get_realtime_hosts() @@ -214,6 +221,18 @@ def loop(self): def auto_connect(self): return self.__auto_connect + @property + def connection_state_ttl(self): + return self.__connection_state_ttl + + @connection_state_ttl.setter + def connection_state_ttl(self, value): + self.__connection_state_ttl = value + + @property + def suspended_retry_timeout(self): + return self.__suspended_retry_timeout + def __get_rest_hosts(self): """ Return the list of hosts as they should be tried. First comes the main diff --git a/test/ably/realtimeconnection_test.py b/test/ably/realtimeconnection_test.py index 86883f25..3521e6bb 100644 --- a/test/ably/realtimeconnection_test.py +++ b/test/ably/realtimeconnection_test.py @@ -206,6 +206,7 @@ async def test_unroutable_host(self): assert exception.value.status_code == 504 assert ably.connection.state == ConnectionState.DISCONNECTED assert ably.connection.error_reason == exception.value + await ably.close() async def test_invalid_host(self): ably = await RestSetup.get_ably_realtime(realtime_host="iamnotahost") @@ -215,3 +216,25 @@ async def test_invalid_host(self): assert exception.value.status_code == 400 assert ably.connection.state == ConnectionState.DISCONNECTED assert ably.connection.error_reason == exception.value + await ably.close() + + async def test_connection_state_ttl(self): + ably = await RestSetup.get_ably_realtime(realtime_host="iamnotahost", connection_state_ttl=2000) + changes = [] + suspended_future = asyncio.Future() + + def on_state_change(state_change): + changes.append(state_change) + if state_change.current == ConnectionState.SUSPENDED: + suspended_future.set_result(None) + with pytest.raises(AblyException) as exception: + await ably.connect() + ably.connection.on(on_state_change) + assert exception.value.code == 40000 + assert exception.value.status_code == 400 + assert ably.connection.state == ConnectionState.DISCONNECTED + await suspended_future + assert ably.connection.state == changes[-1].current + assert ably.connection.state == ConnectionState.SUSPENDED + assert ably.connection.error_reason == changes[-1].reason + await ably.close()