2020import ipaddress
2121import logging
2222import socket
23+ import time
24+ from concurrent .futures import ThreadPoolExecutor
25+
2326import socks
2427
2528log = logging .getLogger (__name__ )
@@ -31,12 +34,10 @@ class TCP:
3134 def __init__ (self , ipv6 : bool , proxy : dict ):
3235 self .socket = None
3336
34- self .reader = None
35- self .writer = None
36-
37- self .send_queue = asyncio .Queue ()
38- self .send_task = None
37+ self .reader = None # type: asyncio.StreamReader
38+ self .writer = None # type: asyncio.StreamWriter
3939
40+ self .lock = asyncio .Lock ()
4041 self .loop = asyncio .get_event_loop ()
4142
4243 if proxy :
@@ -62,50 +63,39 @@ def __init__(self, ipv6: bool, proxy: dict):
6263
6364 log .info ("Using proxy %s" , hostname )
6465 else :
65- self .socket = socket . socket (
66+ self .socket = socks . socksocket (
6667 socket .AF_INET6 if ipv6
6768 else socket .AF_INET
6869 )
6970
70- self .socket .setblocking ( False )
71+ self .socket .settimeout ( TCP . TIMEOUT )
7172
7273 async def connect (self , address : tuple ):
73- try :
74- await asyncio . wait_for ( asyncio . get_event_loop (). sock_connect ( self . socket , address ), TCP . TIMEOUT )
75- except asyncio . TimeoutError : # Re-raise as TimeoutError. asyncio.TimeoutError is deprecated in 3.11
76- raise TimeoutError ( "Connection timed out" )
74+ # The socket used by the whole logic is blocking and thus it blocks when connecting.
75+ # Offload the task to a thread executor to avoid blocking the main event loop.
76+ with ThreadPoolExecutor ( 1 ) as executor :
77+ await self . loop . run_in_executor ( executor , self . socket . connect , address )
7778
7879 self .reader , self .writer = await asyncio .open_connection (sock = self .socket )
79- self .send_task = asyncio .create_task (self .send_worker ())
80-
81- async def close (self ):
82- await self .send_queue .put (None )
83-
84- if self .send_task is not None :
85- await self .send_task
8680
81+ def close (self ):
8782 try :
88- if self .writer is not None :
89- self .writer .close ()
90- await asyncio .wait_for (self .writer .wait_closed (), TCP .TIMEOUT )
91- except Exception as e :
92- log .info ("Close exception: %s %s" , type (e ).__name__ , e )
83+ self .writer .close ()
84+ except AttributeError :
85+ try :
86+ self .socket .shutdown (socket .SHUT_RDWR )
87+ except OSError :
88+ pass
89+ finally :
90+ # A tiny sleep placed here helps avoiding .recv(n) hanging until the timeout.
91+ # This is a workaround that seems to fix the occasional delayed stop of a client.
92+ time .sleep (0.001 )
93+ self .socket .close ()
9394
9495 async def send (self , data : bytes ):
95- await self .send_queue .put (data )
96-
97- async def send_worker (self ):
98- while True :
99- data = await self .send_queue .get ()
100-
101- if data is None :
102- break
103-
104- try :
105- self .writer .write (data )
106- await self .writer .drain ()
107- except Exception as e :
108- log .info ("Send exception: %s %s" , type (e ).__name__ , e )
96+ async with self .lock :
97+ self .writer .write (data )
98+ await self .writer .drain ()
10999
110100 async def recv (self , length : int = 0 ):
111101 data = b""
0 commit comments