@@ -259,63 +259,60 @@ def net_worker(self):
259259 break
260260
261261 try :
262- self .unpack_dispatch_and_ack (packet )
262+ data = self .unpack (BytesIO (packet ))
263+
264+ messages = (
265+ data .body .messages
266+ if isinstance (data .body , MsgContainer )
267+ else [data ]
268+ )
269+
270+ log .debug (data )
271+
272+ for msg in messages :
273+ if msg .seq_no % 2 != 0 :
274+ if msg .msg_id in self .pending_acks :
275+ continue
276+ else :
277+ self .pending_acks .add (msg .msg_id )
278+
279+ if isinstance (msg .body , (types .MsgDetailedInfo , types .MsgNewDetailedInfo )):
280+ self .pending_acks .add (msg .body .answer_msg_id )
281+ continue
282+
283+ if isinstance (msg .body , types .NewSessionCreated ):
284+ continue
285+
286+ msg_id = None
287+
288+ if isinstance (msg .body , (types .BadMsgNotification , types .BadServerSalt )):
289+ msg_id = msg .body .bad_msg_id
290+ elif isinstance (msg .body , (core .FutureSalts , types .RpcResult )):
291+ msg_id = msg .body .req_msg_id
292+ elif isinstance (msg .body , types .Pong ):
293+ msg_id = msg .body .msg_id
294+ else :
295+ if self .client is not None :
296+ self .client .updates_queue .put (msg .body )
297+
298+ if msg_id in self .results :
299+ self .results [msg_id ].value = getattr (msg .body , "result" , msg .body )
300+ self .results [msg_id ].event .set ()
301+
302+ if len (self .pending_acks ) >= self .ACKS_THRESHOLD :
303+ log .info ("Send {} acks" .format (len (self .pending_acks )))
304+
305+ try :
306+ self ._send (types .MsgsAck (list (self .pending_acks )), False )
307+ except (OSError , TimeoutError ):
308+ pass
309+ else :
310+ self .pending_acks .clear ()
263311 except Exception as e :
264312 log .error (e , exc_info = True )
265313
266314 log .debug ("{} stopped" .format (name ))
267315
268- def unpack_dispatch_and_ack (self , packet : bytes ):
269- data = self .unpack (BytesIO (packet ))
270-
271- messages = (
272- data .body .messages
273- if isinstance (data .body , MsgContainer )
274- else [data ]
275- )
276-
277- log .debug (data )
278-
279- for msg in messages :
280- if msg .seq_no % 2 != 0 :
281- if msg .msg_id in self .pending_acks :
282- continue
283- else :
284- self .pending_acks .add (msg .msg_id )
285-
286- if isinstance (msg .body , (types .MsgDetailedInfo , types .MsgNewDetailedInfo )):
287- self .pending_acks .add (msg .body .answer_msg_id )
288- continue
289-
290- if isinstance (msg .body , types .NewSessionCreated ):
291- continue
292-
293- msg_id = None
294-
295- if isinstance (msg .body , (types .BadMsgNotification , types .BadServerSalt )):
296- msg_id = msg .body .bad_msg_id
297- elif isinstance (msg .body , (core .FutureSalts , types .RpcResult )):
298- msg_id = msg .body .req_msg_id
299- elif isinstance (msg .body , types .Pong ):
300- msg_id = msg .body .msg_id
301- else :
302- if self .client is not None :
303- self .client .updates_queue .put (msg .body )
304-
305- if msg_id in self .results :
306- self .results [msg_id ].value = getattr (msg .body , "result" , msg .body )
307- self .results [msg_id ].event .set ()
308-
309- if len (self .pending_acks ) >= self .ACKS_THRESHOLD :
310- log .info ("Send {} acks" .format (len (self .pending_acks )))
311-
312- try :
313- self ._send (types .MsgsAck (list (self .pending_acks )), False )
314- except (OSError , TimeoutError ):
315- pass
316- else :
317- self .pending_acks .clear ()
318-
319316 def ping (self ):
320317 log .debug ("PingThread started" )
321318
0 commit comments