diff --git a/README.rst b/README.rst index 953c4a0..5b6a455 100644 --- a/README.rst +++ b/README.rst @@ -1,7 +1,7 @@ python-proxy ============ -|made-with-python| |PyPI-version| |Hit-Count| |Downloads| +|made-with-python| |PyPI-version| |Hit-Count| |Downloads| |Downloads-month| |Downloads-week| .. |made-with-python| image:: https://img.shields.io/badge/Made%20with-Python-1f425f.svg :target: https://www.python.org/ @@ -11,8 +11,12 @@ python-proxy :target: https://pypi.python.org/pypi/pproxy/ .. |Downloads| image:: https://pepy.tech/badge/pproxy :target: https://pepy.tech/project/pproxy +.. |Downloads-month| image:: https://pepy.tech/badge/pproxy/month + :target: https://pepy.tech/project/pproxy +.. |Downloads-week| image:: https://pepy.tech/badge/pproxy/week + :target: https://pepy.tech/project/pproxy -HTTP/Socks4/Socks5/Shadowsocks/ShadowsocksR/SSH/Redirect/Pf TCP/UDP asynchronous tunnel proxy implemented in Python3 asyncio. +HTTP/HTTP2/HTTP3/Socks4/Socks5/Shadowsocks/SSH/Redirect/Pf/QUIC TCP/UDP asynchronous tunnel proxy implemented in Python3 asyncio. QuickStart ---------- @@ -75,6 +79,7 @@ Features - Incoming traffic auto-detect. - Tunnel/jump/backward-jump support. - Unix domain socket support. +- HTTP v2, HTTP v3 (QUIC) - User/password authentication support. - Filter/block hostname by regex patterns. - SSL/TLS client/server support. @@ -98,6 +103,10 @@ Protocols | http | | ✔ | | | httponly:// | | (get,post,etc) | | | | | (as client) | +-------------------+------------+------------+------------+------------+--------------+ +| http v2 (connect) | ✔ | ✔ | | | h2:// | ++-------------------+------------+------------+------------+------------+--------------+ +| http v3 (connect) | ✔ by UDP | ✔ by UDP | | | h3:// | ++-------------------+------------+------------+------------+------------+--------------+ | https | ✔ | ✔ | | | http+ssl:// | +-------------------+------------+------------+------------+------------+--------------+ | socks4 | ✔ | ✔ | | | socks4:// | @@ -116,7 +125,7 @@ Protocols +-------------------+------------+------------+------------+------------+--------------+ | ssh tunnel | | ✔ | | | ssh:// | +-------------------+------------+------------+------------+------------+--------------+ -| quic | ✔ | ✔ | ✔ | ✔ | http+quic:// | +| quic | ✔ by UDP | ✔ by UDP | ✔ | ✔ | http+quic:// | +-------------------+------------+------------+------------+------------+--------------+ | iptables nat | ✔ | | | | redir:// | +-------------------+------------+------------+------------+------------+--------------+ @@ -717,16 +726,17 @@ Examples - QUIC protocol example - QUIC is a UDP stream protocol in HTTP/3. Library **aioquic** is required if you want to proxy via QUIC. + QUIC is a UDP stream protocol used in HTTP/3. Library **aioquic** is required if you want to proxy via QUIC. + QUIC is listened on UDP port, but can handle TCP or UDP traffic. If you want to handle TCP traffic, you should use "-l quic+http" instead of "-ul quic+http". .. code:: rst $ pip3 install aioquic - $ pproxy --ssl ssl.crt,ssl.key -l quic://:1234 + $ pproxy --ssl ssl.crt,ssl.key -l quic+http://:1234 On the client: - $ pproxy -r quic://server:1234 + $ pproxy -r quic+http://server:1234 QUIC protocol can transfer a lot of TCP streams on one single UDP stream. If the connection number is hugh, QUIC can benefit by reducing TCP handshake time. diff --git a/pproxy/admin.py b/pproxy/admin.py new file mode 100644 index 0000000..6428df8 --- /dev/null +++ b/pproxy/admin.py @@ -0,0 +1,39 @@ +import json +import asyncio +config = {} + + +async def reply_http(reply, ver, code, content): + await reply(code, f'{ver} {code}\r\nConnection: close\r\nContent-Type: text/plain\r\nCache-Control: max-age=900\r\nContent-Length: {len(content)}\r\n\r\n'.encode(), content, True) + + +async def status_handler(reply, **kwarg): + method = kwarg.get('method') + if method == 'GET': + data = {"status": "ok"} + value = json.dumps(data).encode() + ver = kwarg.get('ver') + await reply_http(reply, ver, '200 OK', value) + + +async def configs_handler(reply, **kwarg): + method = kwarg.get('method') + ver = kwarg.get('ver') + + if method == 'GET': + data = {"argv": config['argv']} + value = json.dumps(data).encode() + await reply_http(reply, ver, '200 OK', value) + elif method == 'POST': + config['argv'] = kwarg.get('content').decode().split(' ') + config['reload'] = True + data = {"result": 'ok'} + value = json.dumps(data).encode() + await reply_http(reply, ver, '200 OK', value) + raise KeyboardInterrupt + + +httpget = { + '/status': status_handler, + '/configs': configs_handler, +} diff --git a/pproxy/cipher.py b/pproxy/cipher.py index 12af3ce..2dfbd8e 100644 --- a/pproxy/cipher.py +++ b/pproxy/cipher.py @@ -43,6 +43,7 @@ def setup_iv(self, iv=None): self._buffer = bytearray() self._declen = None self.setup() + return self @property def nonce(self): ret = self._nonce.to_bytes(self.NONCE_LENGTH, 'little') diff --git a/pproxy/proto.py b/pproxy/proto.py index 7c66d5e..9d58bb9 100644 --- a/pproxy/proto.py +++ b/pproxy/proto.py @@ -1,10 +1,10 @@ import asyncio, socket, urllib.parse, time, re, base64, hmac, struct, hashlib, io, os - +from . import admin HTTP_LINE = re.compile('([^ ]+) +(.+?) +(HTTP/[^ ]+)$') packstr = lambda s, n=1: len(s).to_bytes(n, 'big') + s def netloc_split(loc, default_host=None, default_port=None): - ipv6 = re.fullmatch('\[([0-9a-fA-F:]*)\](?::(\d+)?)?', loc) + ipv6 = re.fullmatch(r'\[([0-9a-fA-F:]*)\](?::(\d+)?)?', loc) if ipv6: host_name, port = ipv6.groups() elif ':' in loc: @@ -283,45 +283,53 @@ async def guess(self, reader, **kw): header = await reader.read_w(4) reader.rollback(header) return header in (b'GET ', b'HEAD', b'POST', b'PUT ', b'DELE', b'CONN', b'OPTI', b'TRAC', b'PATC') - async def accept(self, reader, user, writer, users, authtable, httpget=None, **kw): + async def accept(self, reader, user, writer, **kw): lines = await reader.read_until(b'\r\n\r\n') headers = lines[:-4].decode().split('\r\n') method, path, ver = HTTP_LINE.match(headers.pop(0)).groups() lines = '\r\n'.join(i for i in headers if not i.startswith('Proxy-')) headers = dict(i.split(': ', 1) for i in headers if ': ' in i) + async def reply(code, message, body=None, wait=False): + writer.write(message) + if body: + writer.write(body) + if wait: + await writer.drain() + return await self.http_accept(user, method, path, None, ver, lines, headers.get('Host', ''), headers.get('Proxy-Authorization'), reply, **kw) + async def http_accept(self, user, method, path, authority, ver, lines, host, pauth, reply, authtable, users, httpget=None, **kw): url = urllib.parse.urlparse(path) - if method == 'GET' and not url.hostname and httpget: - for path, text in httpget.items(): + if method == 'GET' and not url.hostname: + for path, text in (httpget.items() if httpget else ()): if path == url.path: user = next(filter(lambda x: x.decode()==url.query, users), None) if users else True if user: if users: authtable.set_authed(user) if type(text) is str: - text = (text % dict(host=headers["Host"])).encode() - writer.write(f'{ver} 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nCache-Control: max-age=900\r\nContent-Length: {len(text)}\r\n\r\n'.encode() + text) - await writer.drain() + text = (text % dict(host=host)).encode() + await reply(200, f'{ver} 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nCache-Control: max-age=900\r\nContent-Length: {len(text)}\r\n\r\n'.encode(), text, True) raise Exception('Connection closed') raise Exception(f'404 {method} {url.path}') if users: - pauth = headers.get('Proxy-Authorization', None) user = authtable.authed() if not user: user = next(filter(lambda i: ('Basic '+base64.b64encode(i).decode()) == pauth, users), None) if user is None: - writer.write(f'{ver} 407 Proxy Authentication Required\r\nConnection: close\r\nProxy-Authenticate: Basic realm="simple"\r\n\r\n'.encode()) + await reply(407, f'{ver} 407 Proxy Authentication Required\r\nConnection: close\r\nProxy-Authenticate: Basic realm="simple"\r\n\r\n'.encode(), wait=True) raise Exception('Unauthorized HTTP') authtable.set_authed(user) if method == 'CONNECT': - host_name, port = netloc_split(path) - return user, host_name, port, f'{ver} 200 OK\r\nConnection: close\r\n\r\n'.encode() + host_name, port = netloc_split(authority or path) + return user, host_name, port, lambda writer: reply(200, f'{ver} 200 Connection established\r\nConnection: close\r\n\r\n'.encode()) else: - url = urllib.parse.urlparse(path) - host_name, port = netloc_split(url.netloc or headers.get("Host"), default_port=80) + host_name, port = netloc_split(url.netloc or host, default_port=80) newpath = url._replace(netloc='', scheme='').geturl() - return user, host_name, port, b'', f'{method} {newpath} {ver}\r\n{lines}\r\n\r\n'.encode() - async def connect(self, reader_remote, writer_remote, rauth, host_name, port, myhost, **kw): - writer_remote.write(f'CONNECT {host_name}:{port} HTTP/1.1\r\nHost: {myhost}'.encode() + (b'\r\nProxy-Authorization: Basic '+base64.b64encode(rauth) if rauth else b'') + b'\r\n\r\n') + async def connected(writer): + writer.write(f'{method} {newpath} {ver}\r\n{lines}\r\n\r\n'.encode()) + return True + return user, host_name, port, connected + async def connect(self, reader_remote, writer_remote, rauth, host_name, port, **kw): + writer_remote.write(f'CONNECT {host_name}:{port} HTTP/1.1\r\nHost: {host_name}:{port}'.encode() + (b'\r\nProxy-Authorization: Basic '+base64.b64encode(rauth) if rauth else b'') + b'\r\n\r\n') await reader_remote.read_until(b'\r\n\r\n') async def http_channel(self, reader, writer, stat_bytes, stat_conn): try: @@ -370,6 +378,64 @@ def write(data, o=writer_remote.write): return o(data) writer_remote.write = write +class H2(HTTP): + async def guess(self, reader, **kw): + return True + async def accept(self, reader, user, writer, **kw): + if not writer.headers.done(): + await writer.headers + headers = writer.headers.result() + headers = {i.decode().lower():j.decode() for i,j in headers} + lines = '\r\n'.join(i for i in headers if not i.startswith('proxy-') and not i.startswith(':')) + async def reply(code, message, body=None, wait=False): + writer.send_headers(((':status', str(code)),)) + if body: + writer.write(body) + if wait: + await writer.drain() + return await self.http_accept(user, headers[':method'], headers[':path'], headers[':authority'], '2.0', lines, '', headers.get('proxy-authorization'), reply, **kw) + async def connect(self, reader_remote, writer_remote, rauth, host_name, port, myhost, **kw): + headers = [(':method', 'CONNECT'), (':scheme', 'https'), (':path', '/'), + (':authority', f'{host_name}:{port}')] + if rauth: + headers.append(('proxy-authorization', 'Basic '+base64.b64encode(rauth))) + writer_remote.send_headers(headers) + +class H3(H2): + pass + + +class HTTPAdmin(HTTP): + async def accept(self, reader, user, writer, **kw): + lines = await reader.read_until(b'\r\n\r\n') + headers = lines[:-4].decode().split('\r\n') + method, path, ver = HTTP_LINE.match(headers.pop(0)).groups() + lines = '\r\n'.join(i for i in headers if not i.startswith('Proxy-')) + headers = dict(i.split(': ', 1) for i in headers if ': ' in i) + async def reply(code, message, body=None, wait=False): + writer.write(message) + if body: + writer.write(body) + if wait: + await writer.drain() + + content_length = int(headers.get('Content-Length','0')) + content = '' + if content_length > 0: + content = await reader.read_n(content_length) + + url = urllib.parse.urlparse(path) + if url.hostname is not None: + raise Exception(f'HTTP Admin Unsupported hostname') + if method in ["GET", "POST", "PUT", "PATCH", "DELETE"]: + for path, handler in admin.httpget.items(): + if path == url.path: + await handler(reply=reply, ver=ver, method=method, headers=headers, lines=lines, content=content) + raise Exception('Connection closed') + raise Exception(f'404 {method} {url.path}') + raise Exception(f'405 {method} not allowed') + + class SSH(BaseProtocol): async def connect(self, reader_remote, writer_remote, rauth, host_name, port, myhost, **kw): pass @@ -521,8 +587,8 @@ async def accept(protos, reader, **kw): raise Exception('Connection closed') if user: ret = await proto.accept(reader, user, **kw) - while len(ret) < 5: - ret += (b'',) + while len(ret) < 4: + ret += (None,) return (proto,) + ret raise Exception(f'Unsupported protocol') @@ -533,7 +599,7 @@ def udp_accept(protos, data, **kw): return (proto,) + ret raise Exception(f'Unsupported protocol {data[:10]}') -MAPPINGS = dict(direct=Direct, http=HTTP, httponly=HTTPOnly, ssh=SSH, socks5=Socks5, socks4=Socks4, socks=Socks5, ss=SS, ssr=SSR, redir=Redir, pf=Pf, tunnel=Tunnel, echo=Echo, ws=WS, trojan=Trojan, ssl='', secure='', quic='') +MAPPINGS = dict(direct=Direct, http=HTTP, httponly=HTTPOnly, httpadmin=HTTPAdmin, ssh=SSH, socks5=Socks5, socks4=Socks4, socks=Socks5, ss=SS, ssr=SSR, redir=Redir, pf=Pf, tunnel=Tunnel, echo=Echo, ws=WS, trojan=Trojan, h2=H2, h3=H3, ssl='', secure='', quic='') MAPPINGS['in'] = '' def get_protos(rawprotos): @@ -553,7 +619,7 @@ def get_protos(rawprotos): def sslwrap(reader, writer, sslcontext, server_side=False, server_hostname=None, verbose=None): if sslcontext is None: return reader, writer - ssl_reader = type(reader)() + ssl_reader = asyncio.StreamReader() class Protocol(asyncio.Protocol): def data_received(self, data): ssl_reader.feed_data(data) @@ -576,17 +642,27 @@ def close(self): def _force_close(self, exc): if not self.closed: (verbose or print)(f'{exc} from {writer.get_extra_info("peername")[0]}') + ssl._app_transport._closed = True self.close() def abort(self): self.close() ssl.connection_made(Transport()) async def channel(): + read_size=65536 + buffer=None + if hasattr(ssl,'get_buffer'): + buffer=ssl.get_buffer(read_size) try: - while not reader.at_eof() and not ssl._app_transport.closed: - data = await reader.read(65536) + while not reader.at_eof() and not ssl._app_transport._closed: + data = await reader.read(read_size) if not data: break - ssl.data_received(data) + if buffer!=None: + data_len=len(data) + buffer[:data_len]=data + ssl.buffer_updated(data_len) + else: + ssl.data_received(data) except Exception: pass finally: @@ -600,7 +676,8 @@ def write(self, data): def drain(self): return writer.drain() def is_closing(self): - return ssl._app_transport.closed + return ssl._app_transport._closed def close(self): - ssl._app_transport.close() + if not ssl._app_transport._closed: + ssl._app_transport.close() return ssl_reader, Writer() diff --git a/pproxy/server.py b/pproxy/server.py index 342102b..f7c82be 100644 --- a/pproxy/server.py +++ b/pproxy/server.py @@ -1,5 +1,7 @@ -import argparse, time, re, asyncio, functools, base64, random, urllib.parse, socket +import argparse, time, re, asyncio, functools, base64, random, urllib.parse, socket, sys from . import proto +from . import admin + from .__doc__ import * SOCKET_TIMEOUT = 60 @@ -71,7 +73,7 @@ async def stream_handler(reader, writer, unix, lbind, protos, rserver, cipher, s remote_text = f'{remote_ip}:{remote_port}' local_addr = None if server_ip in ('127.0.0.1', '::1', None) else (server_ip, 0) reader_cipher, _ = await prepare_ciphers(cipher, reader, writer, server_side=False) - lproto, user, host_name, port, lbuf, rbuf = await proto.accept(protos, reader=reader, writer=writer, authtable=AuthTable(remote_ip, authtime), reader_cipher=reader_cipher, sock=writer.get_extra_info('socket'), **kwargs) + lproto, user, host_name, port, client_connected = await proto.accept(protos, reader=reader, writer=writer, authtable=AuthTable(remote_ip, authtime), reader_cipher=reader_cipher, sock=writer.get_extra_info('socket'), **kwargs) if host_name == 'echo': asyncio.ensure_future(lproto.channel(reader, writer, DUMMY, DUMMY)) elif host_name == 'empty': @@ -87,13 +89,12 @@ async def stream_handler(reader, writer, unix, lbind, protos, rserver, cipher, s raise Exception(f'Connection timeout {roption.bind}') try: reader_remote, writer_remote = await roption.prepare_connection(reader_remote, writer_remote, host_name, port) - writer.write(lbuf) - writer_remote.write(rbuf) + use_http = (await client_connected(writer_remote)) if client_connected else None except Exception: writer_remote.close() raise Exception('Unknown remote protocol') m = modstat(user, remote_ip, host_name) - lchannel = lproto.http_channel if rbuf else lproto.channel + lchannel = lproto.http_channel if use_http else lproto.channel asyncio.ensure_future(lproto.channel(reader_remote, writer, m(2+roption.direct), m(4+roption.direct))) asyncio.ensure_future(lchannel(reader, writer_remote, m(roption.direct), roption.connection_change)) except Exception as ex: @@ -269,7 +270,7 @@ def auth(self): return self.users[0] if self.users else b'' def udp_packet_unpack(self, data): data = self.cipher.datagram.decrypt(data) if self.cipher else data - return self.rproto.udp_unpack(data) + return self.jump.udp_packet_unpack(self.rproto.udp_unpack(data)) def destination(self, host, port): return self.host_name, self.port def udp_prepare_connection(self, host, port, data): @@ -304,6 +305,126 @@ def start_server(self, args, stream_handler=stream_handler): else: return asyncio.start_server(handler, host=self.host_name, port=self.port, reuse_port=args.get('ruport')) +class ProxyH2(ProxySimple): + def __init__(self, sslserver, sslclient, **kw): + super().__init__(sslserver=None, sslclient=None, **kw) + self.handshake = None + self.h2sslserver = sslserver + self.h2sslclient = sslclient + async def handler(self, reader, writer, client_side=True, stream_handler=None, **kw): + import h2.connection, h2.config, h2.events + reader, writer = proto.sslwrap(reader, writer, self.h2sslclient if client_side else self.h2sslserver, not client_side, None) + config = h2.config.H2Configuration(client_side=client_side) + conn = h2.connection.H2Connection(config=config) + streams = {} + conn.initiate_connection() + writer.write(conn.data_to_send()) + while not reader.at_eof() and not writer.is_closing(): + try: + data = await reader.read(65636) + if not data: + break + events = conn.receive_data(data) + except Exception: + pass + writer.write(conn.data_to_send()) + for event in events: + if isinstance(event, h2.events.RequestReceived) and not client_side: + if event.stream_id not in streams: + stream_reader, stream_writer = self.get_stream(conn, writer, event.stream_id) + streams[event.stream_id] = (stream_reader, stream_writer) + asyncio.ensure_future(stream_handler(stream_reader, stream_writer)) + else: + stream_reader, stream_writer = streams[event.stream_id] + stream_writer.headers.set_result(event.headers) + elif isinstance(event, h2.events.SettingsAcknowledged) and client_side: + self.handshake.set_result((conn, streams, writer)) + elif isinstance(event, h2.events.DataReceived): + stream_reader, stream_writer = streams[event.stream_id] + stream_reader.feed_data(event.data) + conn.acknowledge_received_data(len(event.data), event.stream_id) + writer.write(conn.data_to_send()) + elif isinstance(event, h2.events.StreamEnded) or isinstance(event, h2.events.StreamReset): + stream_reader, stream_writer = streams[event.stream_id] + stream_reader.feed_eof() + if not stream_writer.closed: + stream_writer.close() + elif isinstance(event, h2.events.ConnectionTerminated): + break + elif isinstance(event, h2.events.WindowUpdated): + if event.stream_id in streams: + stream_reader, stream_writer = streams[event.stream_id] + stream_writer.window_update() + writer.write(conn.data_to_send()) + writer.close() + def get_stream(self, conn, writer, stream_id): + reader = asyncio.StreamReader() + write_buffer = bytearray() + write_wait = asyncio.Event() + write_full = asyncio.Event() + class StreamWriter(): + def __init__(self): + self.closed = False + self.headers = asyncio.get_event_loop().create_future() + def get_extra_info(self, key): + return writer.get_extra_info(key) + def write(self, data): + write_buffer.extend(data) + write_wait.set() + def drain(self): + writer.write(conn.data_to_send()) + return writer.drain() + def is_closing(self): + return self.closed + def close(self): + self.closed = True + write_wait.set() + def window_update(self): + write_full.set() + def send_headers(self, headers): + conn.send_headers(stream_id, headers) + writer.write(conn.data_to_send()) + stream_writer = StreamWriter() + async def write_job(): + while not stream_writer.closed: + while len(write_buffer) > 0: + while conn.local_flow_control_window(stream_id) <= 0: + write_full.clear() + await write_full.wait() + if stream_writer.closed: + break + chunk_size = min(conn.local_flow_control_window(stream_id), len(write_buffer), conn.max_outbound_frame_size) + conn.send_data(stream_id, write_buffer[:chunk_size]) + writer.write(conn.data_to_send()) + del write_buffer[:chunk_size] + if not stream_writer.closed: + write_wait.clear() + await write_wait.wait() + conn.send_data(stream_id, b'', end_stream=True) + writer.write(conn.data_to_send()) + asyncio.ensure_future(write_job()) + return reader, stream_writer + async def wait_h2_connection(self, local_addr, family): + if self.handshake is not None: + if not self.handshake.done(): + await self.handshake + else: + self.handshake = asyncio.get_event_loop().create_future() + reader, writer = await super().wait_open_connection(None, None, local_addr, family) + asyncio.ensure_future(self.handler(reader, writer)) + await self.handshake + return self.handshake.result() + async def wait_open_connection(self, host, port, local_addr, family): + conn, streams, writer = await self.wait_h2_connection(local_addr, family) + stream_id = conn.get_next_available_stream_id() + conn._begin_new_stream(stream_id, stream_id%2) + stream_reader, stream_writer = self.get_stream(conn, writer, stream_id) + streams[stream_id] = (stream_reader, stream_writer) + return stream_reader, stream_writer + def start_server(self, args, stream_handler=stream_handler): + handler = functools.partial(stream_handler, **vars(self), **args) + return super().start_server(args, functools.partial(self.handler, client_side=False, stream_handler=handler)) + class ProxyQUIC(ProxySimple): def __init__(self, quicserver, quicclient, **kw): super().__init__(**kw) @@ -389,6 +510,93 @@ def handler(reader, writer): asyncio.ensure_future(stream_handler(reader, writer, **vars(self), **args)) return aioquic.asyncio.serve(self.host_name, self.port, configuration=self.quicserver, stream_handler=handler) +class ProxyH3(ProxyQUIC): + def get_stream(self, conn, stream_id): + remote_addr = conn._quic._network_paths[0].addr + reader = asyncio.StreamReader() + class StreamWriter(): + def __init__(self): + self.closed = False + self.headers = asyncio.get_event_loop().create_future() + def get_extra_info(self, key): + return dict(peername=remote_addr, sockname=remote_addr).get(key) + def write(self, data): + conn.http.send_data(stream_id, data, False) + conn.transmit() + async def drain(self): + conn.transmit() + def is_closing(self): + return self.closed + def close(self): + if not self.closed: + conn.http.send_data(stream_id, b'', True) + conn.transmit() + conn.close_stream(stream_id) + self.closed = True + def send_headers(self, headers): + conn.http.send_headers(stream_id, [(i.encode(), j.encode()) for i, j in headers]) + conn.transmit() + return reader, StreamWriter() + def get_protocol(self, server_side=False, handler=None): + import aioquic.asyncio, aioquic.quic.events, aioquic.h3.connection, aioquic.h3.events + class Protocol(aioquic.asyncio.QuicConnectionProtocol): + def __init__(s, *args, **kw): + super().__init__(*args, **kw) + s.http = aioquic.h3.connection.H3Connection(s._quic) + s.streams = {} + def quic_event_received(s, event): + if not server_side: + if isinstance(event, aioquic.quic.events.HandshakeCompleted): + self.handshake.set_result(s) + elif isinstance(event, aioquic.quic.events.ConnectionTerminated): + self.handshake = None + self.quic_egress_acm = None + if s.http is not None: + for http_event in s.http.handle_event(event): + s.http_event_received(http_event) + def http_event_received(s, event): + if isinstance(event, aioquic.h3.events.HeadersReceived): + if event.stream_id not in s.streams and server_side: + reader, writer = s.create_stream(event.stream_id) + writer.headers.set_result(event.headers) + asyncio.ensure_future(handler(reader, writer)) + elif isinstance(event, aioquic.h3.events.DataReceived) and event.stream_id in s.streams: + reader, writer = s.streams[event.stream_id] + if event.data: + reader.feed_data(event.data) + if event.stream_ended: + reader.feed_eof() + s.close_stream(event.stream_id) + def create_stream(s, stream_id=None): + if stream_id is None: + stream_id = s._quic.get_next_available_stream_id(False) + s._quic._get_or_create_stream_for_send(stream_id) + reader, writer = self.get_stream(s, stream_id) + s.streams[stream_id] = (reader, writer) + return reader, writer + def close_stream(s, stream_id): + if stream_id in s.streams: + reader, writer = s.streams[stream_id] + if reader.at_eof() and writer.is_closing(): + s.streams.pop(stream_id) + return Protocol + async def wait_h3_connection(self): + if self.handshake is not None: + if not self.handshake.done(): + await self.handshake + else: + import aioquic.asyncio + self.handshake = asyncio.get_event_loop().create_future() + self.quic_egress_acm = aioquic.asyncio.connect(self.host_name, self.port, create_protocol=self.get_protocol(), configuration=self.quicclient) + conn = await self.quic_egress_acm.__aenter__() + await self.handshake + async def wait_open_connection(self, *args): + await self.wait_h3_connection() + return self.handshake.result().create_stream() + def start_server(self, args, stream_handler=stream_handler): + import aioquic.asyncio + return aioquic.asyncio.serve(self.host_name, self.port, configuration=self.quicserver, create_protocol=self.get_protocol(True, functools.partial(stream_handler, **vars(self), **args))) + class ProxySSH(ProxySimple): def __init__(self, **kw): super().__init__(**kw) @@ -409,7 +617,7 @@ async def channel(): writer.get_extra_info = dict(peername=remote_addr, sockname=remote_addr).get return reader, writer async def wait_ssh_connection(self, local_addr=None, family=0, tunnel=None): - if self.sshconn is not None: + if self.sshconn is not None and not self.sshconn.cancelled(): if not self.sshconn.done(): await self.sshconn else: @@ -427,18 +635,24 @@ async def wait_ssh_connection(self, local_addr=None, family=0, tunnel=None): conn = await asyncssh.connect(host=self.host_name, port=self.port, local_addr=local_addr, family=family, x509_trusted_certs=None, known_hosts=None, username=username, password=password, client_keys=client_keys, keepalive_interval=60, tunnel=tunnel) self.sshconn.set_result(conn) async def wait_open_connection(self, host, port, local_addr, family, tunnel=None): - await self.wait_ssh_connection(local_addr, family, tunnel) - conn = self.sshconn.result() - if isinstance(self.jump, ProxySSH): - reader, writer = await self.jump.wait_open_connection(host, port, None, None, conn) - else: - host, port = self.jump.destination(host, port) - if self.jump.unix: - reader, writer = await conn.open_unix_connection(self.jump.bind) + try: + await self.wait_ssh_connection(local_addr, family, tunnel) + conn = self.sshconn.result() + if isinstance(self.jump, ProxySSH): + reader, writer = await self.jump.wait_open_connection(host, port, None, None, conn) else: - reader, writer = await conn.open_connection(host, port) - reader, writer = self.patch_stream(reader, writer, host, port) - return reader, writer + host, port = self.jump.destination(host, port) + if self.jump.unix: + reader, writer = await conn.open_unix_connection(self.jump.bind) + else: + reader, writer = await conn.open_connection(host, port) + reader, writer = self.patch_stream(reader, writer, host, port) + return reader, writer + except Exception as ex: + if not self.sshconn.done(): + self.sshconn.set_exception(ex) + self.sshconn = None + raise async def start_server(self, args, stream_handler=stream_handler, tunnel=None): if type(self.jump) is ProxyDirect: raise Exception('ssh server mode unsupported') @@ -544,11 +758,14 @@ def proxies_by_uri(uri_jumps): jump = proxy_by_uri(uri, jump) return jump +sslcontexts = [] + def proxy_by_uri(uri, jump): scheme, _, uri = uri.partition('://') url = urllib.parse.urlparse('s://'+uri) rawprotos = [i.lower() for i in scheme.split('+')] err_str, protos = proto.get_protos(rawprotos) + protonames = [i.name for i in protos] if err_str: raise argparse.ArgumentTypeError(err_str) if 'ssl' in rawprotos or 'secure' in rawprotos: @@ -558,18 +775,25 @@ def proxy_by_uri(uri, jump): if 'ssl' in rawprotos: sslclient.check_hostname = False sslclient.verify_mode = ssl.CERT_NONE + sslcontexts.append(sslserver) + sslcontexts.append(sslclient) else: sslserver = sslclient = None - if 'quic' in rawprotos: + if 'quic' in rawprotos or 'h3' in protonames: try: import ssl, aioquic.quic.configuration except Exception: raise Exception('Missing library: "pip3 install aioquic"') - import logging - quicserver = aioquic.quic.configuration.QuicConfiguration(is_client=False) - quicclient = aioquic.quic.configuration.QuicConfiguration() + quicserver = aioquic.quic.configuration.QuicConfiguration(is_client=False, max_stream_data=2**60, max_data=2**60, idle_timeout=SOCKET_TIMEOUT) + quicclient = aioquic.quic.configuration.QuicConfiguration(max_stream_data=2**60, max_data=2**60, idle_timeout=SOCKET_TIMEOUT*5) quicclient.verify_mode = ssl.CERT_NONE - protonames = [i.name for i in protos] + sslcontexts.append(quicserver) + sslcontexts.append(quicclient) + if 'h2' in rawprotos: + try: + import h2 + except Exception: + raise Exception('Missing library: "pip3 install h2"') urlpath, _, plugins = url.path.partition(',') urlpath, _, lbind = urlpath.partition('@') plugins = plugins.split(',') if plugins else None @@ -611,6 +835,10 @@ def proxy_by_uri(uri, jump): host_name=host_name, port=port, unix=not loc, lbind=lbind, sslclient=sslclient, sslserver=sslserver) if 'quic' in rawprotos: proxy = ProxyQUIC(quicserver, quicclient, **params) + elif 'h3' in protonames: + proxy = ProxyH3(quicserver, quicclient, **params) + elif 'h2' in protonames: + proxy = ProxyH2(**params) elif 'ssh' in protonames: proxy = ProxySSH(**params) else: @@ -646,7 +874,7 @@ async def test_url(url, rserver): print(headers.decode()[:-4]) print(f'--------------------------------') body = bytearray() - while 1: + while not reader.at_eof(): s = await reader.read(65536) if not s: break @@ -654,7 +882,20 @@ async def test_url(url, rserver): print(body.decode('utf8', 'ignore')) print(f'============ success ============') -def main(): +def print_server_started(option, server, print_fn): + for s in server.sockets: + # https://github.com/MagicStack/uvloop/blob/master/uvloop/pseudosock.pyx + laddr = s.getsockname() # tuple size varies with protocol family + h = laddr[0] + p = laddr[1] + f = str(s.family) + ipversion = "ipv4" if f == "AddressFamily.AF_INET" else ("ipv6" if f == "AddressFamily.AF_INET6" else "ipv?") # TODO better + bind = ipversion+' '+h+':'+str(p) + print_fn(option, bind) + +def main(args = None): + origin_argv = sys.argv[1:] if args is None else args + parser = argparse.ArgumentParser(description=__description__+'\nSupported protocols: http,socks4,socks5,shadowsocks,shadowsocksr,redirect,pf,tunnel', epilog=f'Online help: <{__url__}>') parser.add_argument('-l', dest='listen', default=[], action='append', type=proxies_by_uri, help='tcp server uri (default: http+socks4+socks5://:8080/)') parser.add_argument('-r', dest='rserver', default=[], action='append', type=proxies_by_uri, help='tcp remote server uri (default: direct)') @@ -674,18 +915,11 @@ def main(): parser.add_argument('--daemon', dest='daemon', action='store_true', help='run as a daemon (Linux only)') parser.add_argument('--test', help='test this url for all remote proxies and exit') parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}') - args = parser.parse_args() + args = parser.parse_args(args) if args.sslfile: sslfile = args.sslfile.split(',') - for option in args.listen: - if option.sslclient: - option.sslclient.load_cert_chain(*sslfile) - option.sslserver.load_cert_chain(*sslfile) - for option in args.listen+args.ulisten+args.rserver+args.urserver: - if isinstance(option, ProxyQUIC): - option.quicserver.load_cert_chain(*sslfile) - if isinstance(option, ProxyBackward) and isinstance(option.backward, ProxyQUIC): - option.backward.quicserver.load_cert_chain(*sslfile) + for context in sslcontexts: + context.load_cert_chain(*sslfile) elif any(map(lambda o: o.sslclient or isinstance(o, ProxyQUIC), args.listen+args.ulisten)): print('You must specify --ssl to listen in ssl mode') return @@ -723,27 +957,37 @@ def main(): from . import verbose verbose.setup(loop, args) servers = [] + admin.config.update({'argv': origin_argv, 'servers': servers, 'args': args, 'loop': loop}) + def print_fn(option, bind=None): + print('Serving on', (bind or option.bind), 'by', ",".join(i.name for i in option.protos) + ('(SSL)' if option.sslclient else ''), '({}{})'.format(option.cipher.name, ' '+','.join(i.name() for i in option.cipher.plugins) if option.cipher and option.cipher.plugins else '') if option.cipher else '') for option in args.listen: - print('Serving on', option.bind, 'by', ",".join(i.name for i in option.protos) + ('(SSL)' if option.sslclient else ''), '({}{})'.format(option.cipher.name, ' '+','.join(i.name() for i in option.cipher.plugins) if option.cipher and option.cipher.plugins else '') if option.cipher else '') try: server = loop.run_until_complete(option.start_server(vars(args))) + print_server_started(option, server, print_fn) servers.append(server) except Exception as ex: + print_fn(option) print('Start server failed.\n\t==>', ex) + def print_fn(option, bind=None): + print('Serving on UDP', (bind or option.bind), 'by', ",".join(i.name for i in option.protos), f'({option.cipher.name})' if option.cipher else '') for option in args.ulisten: - print('Serving on UDP', option.bind, 'by', ",".join(i.name for i in option.protos), f'({option.cipher.name})' if option.cipher else '') try: server, protocol = loop.run_until_complete(option.udp_start_server(vars(args))) + print_server_started(option, server, print_fn) servers.append(server) except Exception as ex: + print_fn(option) print('Start server failed.\n\t==>', ex) + def print_fn(option, bind=None): + print('Serving on', (bind or option.bind), 'backward by', ",".join(i.name for i in option.protos) + ('(SSL)' if option.sslclient else ''), '({}{})'.format(option.cipher.name, ' '+','.join(i.name() for i in option.cipher.plugins) if option.cipher and option.cipher.plugins else '') if option.cipher else '') for option in args.rserver: if isinstance(option, ProxyBackward): - print('Serving on', option.bind, 'backward by', ",".join(i.name for i in option.protos) + ('(SSL)' if option.sslclient else ''), '({}{})'.format(option.cipher.name, ' '+','.join(i.name() for i in option.cipher.plugins) if option.cipher and option.cipher.plugins else '') if option.cipher else '') try: server = loop.run_until_complete(option.start_backward_client(vars(args))) + print_server_started(option, server, print_fn) servers.append(server) except Exception as ex: + print_fn(option) print('Start server failed.\n\t==>', ex) if servers: if args.sys: @@ -765,6 +1009,9 @@ def main(): if hasattr(server, 'wait_closed'): loop.run_until_complete(server.wait_closed()) loop.run_until_complete(loop.shutdown_asyncgens()) + if admin.config.get('reload', False): + admin.config['reload'] = False + main(admin.config['argv']) loop.close() if __name__ == '__main__':