Skip to content
This repository was archived by the owner on Dec 23, 2024. It is now read-only.

Commit 844e53a

Browse files
committed
Move crypto calls to threads in case of big enough chunks
1 parent 521e403 commit 844e53a

File tree

5 files changed

+34
-49
lines changed

5 files changed

+34
-49
lines changed

pyrogram/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
__license__ = "GNU Lesser General Public License v3 or later (LGPLv3+)"
2121
__copyright__ = "Copyright (C) 2017-2020 Dan <https://github.com/delivrance>"
2222

23+
from concurrent.futures.thread import ThreadPoolExecutor
24+
2325

2426
class StopTransmission(StopAsyncIteration):
2527
pass
@@ -41,3 +43,7 @@ class ContinuePropagation(StopAsyncIteration):
4143

4244
# Save the main thread loop for future references
4345
main_event_loop = asyncio.get_event_loop()
46+
47+
CRYPTO_EXECUTOR_SIZE_THRESHOLD = 512
48+
49+
crypto_executor = ThreadPoolExecutor(2, thread_name_prefix="CryptoWorker")

pyrogram/connection/transport/tcp/tcp.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def __init__(self, ipv6: bool, proxy: dict):
4545
self.writer = None # type: asyncio.StreamWriter
4646

4747
self.lock = asyncio.Lock()
48+
self.loop = asyncio.get_event_loop()
4849

4950
if proxy.get("enabled", False):
5051
hostname = proxy.get("hostname", None)

pyrogram/connection/transport/tcp/tcp_abridged_o.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import logging
2020
import os
2121

22+
from pyrogram import utils
2223
from pyrogram.crypto import aes
2324
from .tcp import TCP
2425

@@ -55,16 +56,10 @@ async def connect(self, address: tuple):
5556

5657
async def send(self, data: bytes, *args):
5758
length = len(data) // 4
59+
data = (bytes([length]) if length <= 126 else b"\x7f" + length.to_bytes(3, "little")) + data
60+
payload = await utils.maybe_run_in_executor(aes.ctr256_encrypt, data, len(data), self.loop, *self.encrypt)
5861

59-
await super().send(
60-
aes.ctr256_encrypt(
61-
(bytes([length])
62-
if length <= 126
63-
else b"\x7f" + length.to_bytes(3, "little"))
64-
+ data,
65-
*self.encrypt
66-
)
67-
)
62+
await super().send(payload)
6863

6964
async def recv(self, length: int = 0) -> bytes or None:
7065
length = await super().recv(1)
@@ -87,4 +82,4 @@ async def recv(self, length: int = 0) -> bytes or None:
8782
if data is None:
8883
return None
8984

90-
return aes.ctr256_decrypt(data, *self.decrypt)
85+
return await utils.maybe_run_in_executor(aes.ctr256_decrypt, data, len(data), self.loop, *self.decrypt)

pyrogram/session/session.py

Lines changed: 14 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@
1919
import asyncio
2020
import logging
2121
import os
22-
from concurrent.futures.thread import ThreadPoolExecutor
2322
from datetime import datetime, timedelta
2423
from hashlib import sha1
2524
from io import BytesIO
2625

2726
import pyrogram
28-
from pyrogram import __copyright__, __license__, __version__
27+
from pyrogram import __copyright__, __license__, __version__, utils
2928
from pyrogram import raw
3029
from pyrogram.connection import Connection
3130
from pyrogram.crypto import mtproto
@@ -51,7 +50,6 @@ class Session:
5150
MAX_RETRIES = 5
5251
ACKS_THRESHOLD = 8
5352
PING_INTERVAL = 5
54-
EXECUTOR_SIZE_THRESHOLD = 512
5553

5654
notice_displayed = False
5755

@@ -69,8 +67,6 @@ class Session:
6967
64: "[64] invalid container"
7068
}
7169

72-
executor = ThreadPoolExecutor(2, thread_name_prefix="CryptoWorker")
73-
7470
def __init__(
7571
self,
7672
client: "pyrogram.Client",
@@ -220,22 +216,12 @@ async def restart(self):
220216
await self.start()
221217

222218
async def handle_packet(self, packet):
223-
if len(packet) <= self.EXECUTOR_SIZE_THRESHOLD:
224-
data = mtproto.unpack(
225-
BytesIO(packet),
226-
self.session_id,
227-
self.auth_key,
228-
self.auth_key_id
229-
)
230-
else:
231-
data = await self.loop.run_in_executor(
232-
self.executor,
233-
mtproto.unpack,
234-
BytesIO(packet),
235-
self.session_id,
236-
self.auth_key,
237-
self.auth_key_id
238-
)
219+
data = await utils.maybe_run_in_executor(
220+
mtproto.unpack, BytesIO(packet), len(packet), self.loop,
221+
self.session_id,
222+
self.auth_key,
223+
self.auth_key_id
224+
)
239225

240226
messages = (
241227
data.body.messages
@@ -375,24 +361,13 @@ async def _send(self, data: TLObject, wait_response: bool = True, timeout: float
375361
log.debug(f"Sent:")
376362
log.debug(message)
377363

378-
if len(message) <= self.EXECUTOR_SIZE_THRESHOLD:
379-
payload = mtproto.pack(
380-
message,
381-
self.current_salt.salt,
382-
self.session_id,
383-
self.auth_key,
384-
self.auth_key_id
385-
)
386-
else:
387-
payload = await self.loop.run_in_executor(
388-
self.executor,
389-
mtproto.pack,
390-
message,
391-
self.current_salt.salt,
392-
self.session_id,
393-
self.auth_key,
394-
self.auth_key_id
395-
)
364+
payload = await utils.maybe_run_in_executor(
365+
mtproto.pack, message, len(message), self.loop,
366+
self.current_salt.salt,
367+
self.session_id,
368+
self.auth_key,
369+
self.auth_key_id
370+
)
396371

397372
try:
398373
await self.connection.send(payload)

pyrogram/utils.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,3 +315,11 @@ async def parse_text_entities(
315315
"message": text,
316316
"entities": entities
317317
}
318+
319+
320+
async def maybe_run_in_executor(func, data, length, loop, *args):
321+
return (
322+
func(data, *args)
323+
if length <= pyrogram.CRYPTO_EXECUTOR_SIZE_THRESHOLD
324+
else await loop.run_in_executor(pyrogram.crypto_executor, func, data, *args)
325+
)

0 commit comments

Comments
 (0)