Skip to content

Commit e270c7d

Browse files
committed
Remove unpack_dispatch_and_ack method
1 parent 7f27c43 commit e270c7d

File tree

1 file changed

+49
-52
lines changed

1 file changed

+49
-52
lines changed

pyrogram/session/session.py

Lines changed: 49 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -259,63 +259,60 @@ def net_worker(self):
259259
break
260260

261261
try:
262-
self.unpack_dispatch_and_ack(packet)
262+
data = self.unpack(BytesIO(packet))
263+
264+
messages = (
265+
data.body.messages
266+
if isinstance(data.body, MsgContainer)
267+
else [data]
268+
)
269+
270+
log.debug(data)
271+
272+
for msg in messages:
273+
if msg.seq_no % 2 != 0:
274+
if msg.msg_id in self.pending_acks:
275+
continue
276+
else:
277+
self.pending_acks.add(msg.msg_id)
278+
279+
if isinstance(msg.body, (types.MsgDetailedInfo, types.MsgNewDetailedInfo)):
280+
self.pending_acks.add(msg.body.answer_msg_id)
281+
continue
282+
283+
if isinstance(msg.body, types.NewSessionCreated):
284+
continue
285+
286+
msg_id = None
287+
288+
if isinstance(msg.body, (types.BadMsgNotification, types.BadServerSalt)):
289+
msg_id = msg.body.bad_msg_id
290+
elif isinstance(msg.body, (core.FutureSalts, types.RpcResult)):
291+
msg_id = msg.body.req_msg_id
292+
elif isinstance(msg.body, types.Pong):
293+
msg_id = msg.body.msg_id
294+
else:
295+
if self.client is not None:
296+
self.client.updates_queue.put(msg.body)
297+
298+
if msg_id in self.results:
299+
self.results[msg_id].value = getattr(msg.body, "result", msg.body)
300+
self.results[msg_id].event.set()
301+
302+
if len(self.pending_acks) >= self.ACKS_THRESHOLD:
303+
log.info("Send {} acks".format(len(self.pending_acks)))
304+
305+
try:
306+
self._send(types.MsgsAck(list(self.pending_acks)), False)
307+
except (OSError, TimeoutError):
308+
pass
309+
else:
310+
self.pending_acks.clear()
263311
except Exception as e:
264312
log.error(e, exc_info=True)
265313

266314
log.debug("{} stopped".format(name))
267315

268-
def unpack_dispatch_and_ack(self, packet: bytes):
269-
data = self.unpack(BytesIO(packet))
270-
271-
messages = (
272-
data.body.messages
273-
if isinstance(data.body, MsgContainer)
274-
else [data]
275-
)
276-
277-
log.debug(data)
278-
279-
for msg in messages:
280-
if msg.seq_no % 2 != 0:
281-
if msg.msg_id in self.pending_acks:
282-
continue
283-
else:
284-
self.pending_acks.add(msg.msg_id)
285-
286-
if isinstance(msg.body, (types.MsgDetailedInfo, types.MsgNewDetailedInfo)):
287-
self.pending_acks.add(msg.body.answer_msg_id)
288-
continue
289-
290-
if isinstance(msg.body, types.NewSessionCreated):
291-
continue
292-
293-
msg_id = None
294-
295-
if isinstance(msg.body, (types.BadMsgNotification, types.BadServerSalt)):
296-
msg_id = msg.body.bad_msg_id
297-
elif isinstance(msg.body, (core.FutureSalts, types.RpcResult)):
298-
msg_id = msg.body.req_msg_id
299-
elif isinstance(msg.body, types.Pong):
300-
msg_id = msg.body.msg_id
301-
else:
302-
if self.client is not None:
303-
self.client.updates_queue.put(msg.body)
304-
305-
if msg_id in self.results:
306-
self.results[msg_id].value = getattr(msg.body, "result", msg.body)
307-
self.results[msg_id].event.set()
308-
309-
if len(self.pending_acks) >= self.ACKS_THRESHOLD:
310-
log.info("Send {} acks".format(len(self.pending_acks)))
311-
312-
try:
313-
self._send(types.MsgsAck(list(self.pending_acks)), False)
314-
except (OSError, TimeoutError):
315-
pass
316-
else:
317-
self.pending_acks.clear()
318-
319316
def ping(self):
320317
log.debug("PingThread started")
321318

0 commit comments

Comments
 (0)