From ee1edffd23aa1006c9edf706b2f43762bd1415f3 Mon Sep 17 00:00:00 2001 From: qwj Date: Sat, 20 Feb 2021 22:19:44 +0800 Subject: [PATCH 01/23] h2 proto --- pproxy/proto.py | 73 ++++++++++++++++------ pproxy/server.py | 154 ++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 192 insertions(+), 35 deletions(-) diff --git a/pproxy/proto.py b/pproxy/proto.py index 7c66d5e..517e99f 100644 --- a/pproxy/proto.py +++ b/pproxy/proto.py @@ -283,43 +283,51 @@ async def guess(self, reader, **kw): header = await reader.read_w(4) reader.rollback(header) return header in (b'GET ', b'HEAD', b'POST', b'PUT ', b'DELE', b'CONN', b'OPTI', b'TRAC', b'PATC') - async def accept(self, reader, user, writer, users, authtable, httpget=None, **kw): + async def accept(self, reader, user, writer, **kw): lines = await reader.read_until(b'\r\n\r\n') headers = lines[:-4].decode().split('\r\n') method, path, ver = HTTP_LINE.match(headers.pop(0)).groups() lines = '\r\n'.join(i for i in headers if not i.startswith('Proxy-')) headers = dict(i.split(': ', 1) for i in headers if ': ' in i) + async def reply(code, message, body=None, wait=False): + writer.write(message) + if body: + writer.write(body) + if wait: + await writer.drain() + return await self.http_accept(user, method, path, None, ver, lines, headers['Host'], headers.get('Proxy-Authorization'), reply, **kw) + async def http_accept(self, user, method, path, authority, ver, lines, host, pauth, reply, authtable, users, httpget, **kw): url = urllib.parse.urlparse(path) - if method == 'GET' and not url.hostname and httpget: - for path, text in httpget.items(): + if method == 'GET' and not url.hostname: + for path, text in (httpget.items() if httpget else ()): if path == url.path: user = next(filter(lambda x: x.decode()==url.query, users), None) if users else True if user: if users: authtable.set_authed(user) if type(text) is str: - text = (text % dict(host=headers["Host"])).encode() - writer.write(f'{ver} 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nCache-Control: max-age=900\r\nContent-Length: {len(text)}\r\n\r\n'.encode() + text) - await writer.drain() + text = (text % dict(host=host)).encode() + await reply(200, f'{ver} 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nCache-Control: max-age=900\r\nContent-Length: {len(text)}\r\n\r\n'.encode(), text, True) raise Exception('Connection closed') raise Exception(f'404 {method} {url.path}') if users: - pauth = headers.get('Proxy-Authorization', None) user = authtable.authed() if not user: user = next(filter(lambda i: ('Basic '+base64.b64encode(i).decode()) == pauth, users), None) if user is None: - writer.write(f'{ver} 407 Proxy Authentication Required\r\nConnection: close\r\nProxy-Authenticate: Basic realm="simple"\r\n\r\n'.encode()) + await reply(407, f'{ver} 407 Proxy Authentication Required\r\nConnection: close\r\nProxy-Authenticate: Basic realm="simple"\r\n\r\n'.encode(), wait=True) raise Exception('Unauthorized HTTP') authtable.set_authed(user) if method == 'CONNECT': - host_name, port = netloc_split(path) - return user, host_name, port, f'{ver} 200 OK\r\nConnection: close\r\n\r\n'.encode() + host_name, port = netloc_split(authority or path) + return user, host_name, port, lambda writer: reply(200, f'{ver} 200 OK\r\nConnection: close\r\n\r\n'.encode()) else: - url = urllib.parse.urlparse(path) - host_name, port = netloc_split(url.netloc or headers.get("Host"), default_port=80) + host_name, port = netloc_split(url.netloc or host, default_port=80) newpath = url._replace(netloc='', scheme='').geturl() - return user, host_name, port, b'', f'{method} {newpath} {ver}\r\n{lines}\r\n\r\n'.encode() + async def connected(writer): + writer.write(f'{method} {newpath} {ver}\r\n{lines}\r\n\r\n'.encode()) + return True + return user, host_name, port, connected async def connect(self, reader_remote, writer_remote, rauth, host_name, port, myhost, **kw): writer_remote.write(f'CONNECT {host_name}:{port} HTTP/1.1\r\nHost: {myhost}'.encode() + (b'\r\nProxy-Authorization: Basic '+base64.b64encode(rauth) if rauth else b'') + b'\r\n\r\n') await reader_remote.read_until(b'\r\n\r\n') @@ -370,6 +378,29 @@ def write(data, o=writer_remote.write): return o(data) writer_remote.write = write +class H2(HTTP): + async def guess(self, reader, **kw): + return True + async def accept(self, reader, user, writer, **kw): + if not writer.headers.done(): + await writer.headers + headers = writer.headers.result() + headers = {i.decode().lower():j.decode() for i,j in headers} + lines = '\r\n'.join(i for i in headers if not i.startswith('proxy-') and not i.startswith(':')) + async def reply(code, message, body=None, wait=False): + writer.send_headers(((':status', str(code)),)) + if body: + writer.write(body) + if wait: + await writer.drain() + return await self.http_accept(user, headers[':method'], headers[':path'], headers[':authority'], '2.0', lines, '', headers.get('proxy-authorization'), reply, **kw) + async def connect(self, reader_remote, writer_remote, rauth, host_name, port, myhost, **kw): + headers = [(':method', 'CONNECT'), (':scheme', 'https'), (':path', '/'), + (':authority', f'{host_name}:{port}')] + if rauth: + headers.append(('proxy-authorization', 'Basic '+base64.b64encode(rauth))) + writer_remote.send_headers(headers) + class SSH(BaseProtocol): async def connect(self, reader_remote, writer_remote, rauth, host_name, port, myhost, **kw): pass @@ -521,8 +552,8 @@ async def accept(protos, reader, **kw): raise Exception('Connection closed') if user: ret = await proto.accept(reader, user, **kw) - while len(ret) < 5: - ret += (b'',) + while len(ret) < 4: + ret += (None,) return (proto,) + ret raise Exception(f'Unsupported protocol') @@ -533,7 +564,7 @@ def udp_accept(protos, data, **kw): return (proto,) + ret raise Exception(f'Unsupported protocol {data[:10]}') -MAPPINGS = dict(direct=Direct, http=HTTP, httponly=HTTPOnly, ssh=SSH, socks5=Socks5, socks4=Socks4, socks=Socks5, ss=SS, ssr=SSR, redir=Redir, pf=Pf, tunnel=Tunnel, echo=Echo, ws=WS, trojan=Trojan, ssl='', secure='', quic='') +MAPPINGS = dict(direct=Direct, http=HTTP, httponly=HTTPOnly, ssh=SSH, socks5=Socks5, socks4=Socks4, socks=Socks5, ss=SS, ssr=SSR, redir=Redir, pf=Pf, tunnel=Tunnel, echo=Echo, ws=WS, trojan=Trojan, h2=H2, ssl='', secure='', quic='') MAPPINGS['in'] = '' def get_protos(rawprotos): @@ -553,7 +584,7 @@ def get_protos(rawprotos): def sslwrap(reader, writer, sslcontext, server_side=False, server_hostname=None, verbose=None): if sslcontext is None: return reader, writer - ssl_reader = type(reader)() + ssl_reader = asyncio.StreamReader() class Protocol(asyncio.Protocol): def data_received(self, data): ssl_reader.feed_data(data) @@ -576,13 +607,14 @@ def close(self): def _force_close(self, exc): if not self.closed: (verbose or print)(f'{exc} from {writer.get_extra_info("peername")[0]}') + ssl._app_transport._closed = True self.close() def abort(self): self.close() ssl.connection_made(Transport()) async def channel(): try: - while not reader.at_eof() and not ssl._app_transport.closed: + while not reader.at_eof() and not ssl._app_transport._closed: data = await reader.read(65536) if not data: break @@ -600,7 +632,8 @@ def write(self, data): def drain(self): return writer.drain() def is_closing(self): - return ssl._app_transport.closed + return ssl._app_transport._closed def close(self): - ssl._app_transport.close() + if not ssl._app_transport._closed: + ssl._app_transport.close() return ssl_reader, Writer() diff --git a/pproxy/server.py b/pproxy/server.py index 342102b..9afc2ed 100644 --- a/pproxy/server.py +++ b/pproxy/server.py @@ -71,7 +71,7 @@ async def stream_handler(reader, writer, unix, lbind, protos, rserver, cipher, s remote_text = f'{remote_ip}:{remote_port}' local_addr = None if server_ip in ('127.0.0.1', '::1', None) else (server_ip, 0) reader_cipher, _ = await prepare_ciphers(cipher, reader, writer, server_side=False) - lproto, user, host_name, port, lbuf, rbuf = await proto.accept(protos, reader=reader, writer=writer, authtable=AuthTable(remote_ip, authtime), reader_cipher=reader_cipher, sock=writer.get_extra_info('socket'), **kwargs) + lproto, user, host_name, port, client_connected = await proto.accept(protos, reader=reader, writer=writer, authtable=AuthTable(remote_ip, authtime), reader_cipher=reader_cipher, sock=writer.get_extra_info('socket'), **kwargs) if host_name == 'echo': asyncio.ensure_future(lproto.channel(reader, writer, DUMMY, DUMMY)) elif host_name == 'empty': @@ -87,13 +87,12 @@ async def stream_handler(reader, writer, unix, lbind, protos, rserver, cipher, s raise Exception(f'Connection timeout {roption.bind}') try: reader_remote, writer_remote = await roption.prepare_connection(reader_remote, writer_remote, host_name, port) - writer.write(lbuf) - writer_remote.write(rbuf) + use_http = (await client_connected(writer_remote)) if client_connected else None except Exception: writer_remote.close() raise Exception('Unknown remote protocol') m = modstat(user, remote_ip, host_name) - lchannel = lproto.http_channel if rbuf else lproto.channel + lchannel = lproto.http_channel if use_http else lproto.channel asyncio.ensure_future(lproto.channel(reader_remote, writer, m(2+roption.direct), m(4+roption.direct))) asyncio.ensure_future(lchannel(reader, writer_remote, m(roption.direct), roption.connection_change)) except Exception as ex: @@ -304,6 +303,126 @@ def start_server(self, args, stream_handler=stream_handler): else: return asyncio.start_server(handler, host=self.host_name, port=self.port, reuse_port=args.get('ruport')) +class ProxyH2(ProxySimple): + def __init__(self, sslserver, sslclient, **kw): + super().__init__(sslserver=None, sslclient=None, **kw) + self.handshake = None + self.h2sslserver = sslserver + self.h2sslclient = sslclient + async def handler(self, reader, writer, client_side=True, stream_handler=None, **kw): + import h2.connection, h2.config, h2.events + reader, writer = proto.sslwrap(reader, writer, self.h2sslclient if client_side else self.h2sslserver, not client_side, None) + config = h2.config.H2Configuration(client_side=client_side) + conn = h2.connection.H2Connection(config=config) + streams = {} + conn.initiate_connection() + writer.write(conn.data_to_send()) + while not reader.at_eof() and not writer.is_closing(): + try: + data = await reader.read(65636) + if not data: + break + events = conn.receive_data(data) + except Exception: + pass + writer.write(conn.data_to_send()) + for event in events: + if isinstance(event, h2.events.RequestReceived) and not client_side: + if event.stream_id not in streams: + stream_reader, stream_writer = self.get_stream(conn, writer, event.stream_id) + streams[event.stream_id] = (stream_reader, stream_writer) + asyncio.ensure_future(stream_handler(stream_reader, stream_writer)) + else: + stream_reader, stream_writer = streams[event.stream_id] + stream_writer.headers.set_result(event.headers) + elif isinstance(event, h2.events.SettingsAcknowledged) and client_side: + self.handshake.set_result((conn, streams, writer)) + elif isinstance(event, h2.events.DataReceived): + stream_reader, stream_writer = streams[event.stream_id] + stream_reader.feed_data(event.data) + conn.acknowledge_received_data(len(event.data), event.stream_id) + writer.write(conn.data_to_send()) + elif isinstance(event, h2.events.StreamEnded) or isinstance(event, h2.events.StreamReset): + stream_reader, stream_writer = streams[event.stream_id] + stream_reader.feed_eof() + if not stream_writer.closed: + stream_writer.close() + elif isinstance(event, h2.events.ConnectionTerminated): + break + elif isinstance(event, h2.events.WindowUpdated): + if event.stream_id in streams: + stream_reader, stream_writer = streams[event.stream_id] + stream_writer.window_update() + writer.write(conn.data_to_send()) + writer.close() + def get_stream(self, conn, writer, stream_id): + reader = asyncio.StreamReader() + write_buffer = bytearray() + write_wait = asyncio.Event() + write_full = asyncio.Event() + class StreamWriter(): + def __init__(self): + self.closed = False + self.headers = asyncio.get_event_loop().create_future() + def get_extra_info(self, key): + return writer.get_extra_info(key) + def write(self, data): + write_buffer.extend(data) + write_wait.set() + def drain(self): + writer.write(conn.data_to_send()) + return writer.drain() + def is_closing(self): + return self.closed + def close(self): + self.closed = True + write_wait.set() + def window_update(self): + write_full.set() + def send_headers(self, headers): + conn.send_headers(stream_id, headers) + writer.write(conn.data_to_send()) + stream_writer = StreamWriter() + async def write_job(): + while not stream_writer.closed: + while len(write_buffer) > 0: + while conn.local_flow_control_window(stream_id) <= 0: + write_full.clear() + await write_full.wait() + if stream_writer.closed: + break + chunk_size = min(conn.local_flow_control_window(stream_id), len(write_buffer), conn.max_outbound_frame_size) + conn.send_data(stream_id, write_buffer[:chunk_size]) + writer.write(conn.data_to_send()) + del write_buffer[:chunk_size] + if not stream_writer.closed: + write_wait.clear() + await write_wait.wait() + conn.send_data(stream_id, b'', end_stream=True) + writer.write(conn.data_to_send()) + asyncio.ensure_future(write_job()) + return reader, stream_writer + async def wait_h2_connection(self, local_addr, family): + if self.handshake is not None: + if not self.handshake.done(): + await self.handshake + else: + self.handshake = asyncio.get_event_loop().create_future() + reader, writer = await super().wait_open_connection(None, None, local_addr, family) + asyncio.ensure_future(self.handler(reader, writer)) + await self.handshake + return self.handshake.result() + async def wait_open_connection(self, host, port, local_addr, family): + conn, streams, writer = await self.wait_h2_connection(local_addr, family) + stream_id = conn.get_next_available_stream_id() + conn._begin_new_stream(stream_id, stream_id%2) + stream_reader, stream_writer = self.get_stream(conn, writer, stream_id) + streams[stream_id] = (stream_reader, stream_writer) + return stream_reader, stream_writer + def start_server(self, args, stream_handler=stream_handler): + handler = functools.partial(stream_handler, **vars(self), **args) + return super().start_server(args, functools.partial(self.handler, client_side=False, stream_handler=handler)) + class ProxyQUIC(ProxySimple): def __init__(self, quicserver, quicclient, **kw): super().__init__(**kw) @@ -544,6 +663,8 @@ def proxies_by_uri(uri_jumps): jump = proxy_by_uri(uri, jump) return jump +sslcontexts = [] + def proxy_by_uri(uri, jump): scheme, _, uri = uri.partition('://') url = urllib.parse.urlparse('s://'+uri) @@ -558,6 +679,8 @@ def proxy_by_uri(uri, jump): if 'ssl' in rawprotos: sslclient.check_hostname = False sslclient.verify_mode = ssl.CERT_NONE + sslcontexts.append(sslserver) + sslcontexts.append(sslclient) else: sslserver = sslclient = None if 'quic' in rawprotos: @@ -565,10 +688,16 @@ def proxy_by_uri(uri, jump): import ssl, aioquic.quic.configuration except Exception: raise Exception('Missing library: "pip3 install aioquic"') - import logging quicserver = aioquic.quic.configuration.QuicConfiguration(is_client=False) quicclient = aioquic.quic.configuration.QuicConfiguration() quicclient.verify_mode = ssl.CERT_NONE + sslcontexts.append(quicserver) + sslcontexts.append(quicclient) + if 'h2' in rawprotos: + try: + import h2 + except Exception: + raise Exception('Missing library: "pip3 install h2"') protonames = [i.name for i in protos] urlpath, _, plugins = url.path.partition(',') urlpath, _, lbind = urlpath.partition('@') @@ -611,6 +740,8 @@ def proxy_by_uri(uri, jump): host_name=host_name, port=port, unix=not loc, lbind=lbind, sslclient=sslclient, sslserver=sslserver) if 'quic' in rawprotos: proxy = ProxyQUIC(quicserver, quicclient, **params) + elif 'h2' in protonames: + proxy = ProxyH2(**params) elif 'ssh' in protonames: proxy = ProxySSH(**params) else: @@ -646,7 +777,7 @@ async def test_url(url, rserver): print(headers.decode()[:-4]) print(f'--------------------------------') body = bytearray() - while 1: + while not reader.at_eof(): s = await reader.read(65536) if not s: break @@ -677,15 +808,8 @@ def main(): args = parser.parse_args() if args.sslfile: sslfile = args.sslfile.split(',') - for option in args.listen: - if option.sslclient: - option.sslclient.load_cert_chain(*sslfile) - option.sslserver.load_cert_chain(*sslfile) - for option in args.listen+args.ulisten+args.rserver+args.urserver: - if isinstance(option, ProxyQUIC): - option.quicserver.load_cert_chain(*sslfile) - if isinstance(option, ProxyBackward) and isinstance(option.backward, ProxyQUIC): - option.backward.quicserver.load_cert_chain(*sslfile) + for context in sslcontexts: + context.load_cert_chain(*sslfile) elif any(map(lambda o: o.sslclient or isinstance(o, ProxyQUIC), args.listen+args.ulisten)): print('You must specify --ssl to listen in ssl mode') return From 1912b35d9074a4c85d828dfc7170270c2cce61d1 Mon Sep 17 00:00:00 2001 From: qwj Date: Sat, 20 Feb 2021 22:24:52 +0800 Subject: [PATCH 02/23] h2 proto --- pproxy/proto.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pproxy/proto.py b/pproxy/proto.py index 517e99f..d328802 100644 --- a/pproxy/proto.py +++ b/pproxy/proto.py @@ -295,7 +295,7 @@ async def reply(code, message, body=None, wait=False): writer.write(body) if wait: await writer.drain() - return await self.http_accept(user, method, path, None, ver, lines, headers['Host'], headers.get('Proxy-Authorization'), reply, **kw) + return await self.http_accept(user, method, path, None, ver, lines, headers.get('Host', ''), headers.get('Proxy-Authorization'), reply, **kw) async def http_accept(self, user, method, path, authority, ver, lines, host, pauth, reply, authtable, users, httpget, **kw): url = urllib.parse.urlparse(path) if method == 'GET' and not url.hostname: From 5326c1785a5810e9ba81986e8e840d8d6e116663 Mon Sep 17 00:00:00 2001 From: qwj Date: Sat, 20 Feb 2021 22:36:13 +0800 Subject: [PATCH 03/23] h2 proto --- README.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.rst b/README.rst index 953c4a0..b803f2a 100644 --- a/README.rst +++ b/README.rst @@ -98,6 +98,8 @@ Protocols | http | | ✔ | | | httponly:// | | (get,post,etc) | | | | | (as client) | +-------------------+------------+------------+------------+------------+--------------+ +| http v2 (connect) | ✔ | ✔ | | | h2:// | ++-------------------+------------+------------+------------+------------+--------------+ | https | ✔ | ✔ | | | http+ssl:// | +-------------------+------------+------------+------------+------------+--------------+ | socks4 | ✔ | ✔ | | | socks4:// | From 7f9a52332e0fbcf2c9e232bd0cbddcd78c6a8adc Mon Sep 17 00:00:00 2001 From: qwj Date: Sun, 21 Feb 2021 03:01:22 +0800 Subject: [PATCH 04/23] h3 proto --- README.rst | 5 ++- pproxy/proto.py | 5 ++- pproxy/server.py | 93 ++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 99 insertions(+), 4 deletions(-) diff --git a/README.rst b/README.rst index b803f2a..c82de33 100644 --- a/README.rst +++ b/README.rst @@ -12,7 +12,7 @@ python-proxy .. |Downloads| image:: https://pepy.tech/badge/pproxy :target: https://pepy.tech/project/pproxy -HTTP/Socks4/Socks5/Shadowsocks/ShadowsocksR/SSH/Redirect/Pf TCP/UDP asynchronous tunnel proxy implemented in Python3 asyncio. +HTTP/Socks4/Socks5/Shadowsocks/ShadowsocksR/SSH/Redirect/Pf/HTTP2/HTTP3/QUIC TCP/UDP asynchronous tunnel proxy implemented in Python3 asyncio. QuickStart ---------- @@ -75,6 +75,7 @@ Features - Incoming traffic auto-detect. - Tunnel/jump/backward-jump support. - Unix domain socket support. +- HTTP v2, HTTP v3 (based on QUIC) - User/password authentication support. - Filter/block hostname by regex patterns. - SSL/TLS client/server support. @@ -100,6 +101,8 @@ Protocols +-------------------+------------+------------+------------+------------+--------------+ | http v2 (connect) | ✔ | ✔ | | | h2:// | +-------------------+------------+------------+------------+------------+--------------+ +| http v3 (connect) | ✔ | ✔ | | | h3:// | ++-------------------+------------+------------+------------+------------+--------------+ | https | ✔ | ✔ | | | http+ssl:// | +-------------------+------------+------------+------------+------------+--------------+ | socks4 | ✔ | ✔ | | | socks4:// | diff --git a/pproxy/proto.py b/pproxy/proto.py index d328802..b701c1c 100644 --- a/pproxy/proto.py +++ b/pproxy/proto.py @@ -401,6 +401,9 @@ async def connect(self, reader_remote, writer_remote, rauth, host_name, port, my headers.append(('proxy-authorization', 'Basic '+base64.b64encode(rauth))) writer_remote.send_headers(headers) +class H3(H2): + pass + class SSH(BaseProtocol): async def connect(self, reader_remote, writer_remote, rauth, host_name, port, myhost, **kw): pass @@ -564,7 +567,7 @@ def udp_accept(protos, data, **kw): return (proto,) + ret raise Exception(f'Unsupported protocol {data[:10]}') -MAPPINGS = dict(direct=Direct, http=HTTP, httponly=HTTPOnly, ssh=SSH, socks5=Socks5, socks4=Socks4, socks=Socks5, ss=SS, ssr=SSR, redir=Redir, pf=Pf, tunnel=Tunnel, echo=Echo, ws=WS, trojan=Trojan, h2=H2, ssl='', secure='', quic='') +MAPPINGS = dict(direct=Direct, http=HTTP, httponly=HTTPOnly, ssh=SSH, socks5=Socks5, socks4=Socks4, socks=Socks5, ss=SS, ssr=SSR, redir=Redir, pf=Pf, tunnel=Tunnel, echo=Echo, ws=WS, trojan=Trojan, h2=H2, h3=H3, ssl='', secure='', quic='') MAPPINGS['in'] = '' def get_protos(rawprotos): diff --git a/pproxy/server.py b/pproxy/server.py index 9afc2ed..0956aa9 100644 --- a/pproxy/server.py +++ b/pproxy/server.py @@ -508,6 +508,93 @@ def handler(reader, writer): asyncio.ensure_future(stream_handler(reader, writer, **vars(self), **args)) return aioquic.asyncio.serve(self.host_name, self.port, configuration=self.quicserver, stream_handler=handler) +class ProxyH3(ProxyQUIC): + def get_stream(self, conn, stream_id): + remote_addr = conn._quic._network_paths[0].addr + reader = asyncio.StreamReader() + class StreamWriter(): + def __init__(self): + self.closed = False + self.headers = asyncio.get_event_loop().create_future() + def get_extra_info(self, key): + return dict(peername=remote_addr, sockname=remote_addr).get(key) + def write(self, data): + conn.http.send_data(stream_id, data, False) + conn.transmit() + async def drain(self): + conn.transmit() + def is_closing(self): + return self.closed + def close(self): + if not self.closed: + conn.http.send_data(stream_id, b'', True) + conn.transmit() + conn.close_stream(stream_id) + self.closed = True + def send_headers(self, headers): + conn.http.send_headers(stream_id, [(i.encode(), j.encode()) for i, j in headers]) + conn.transmit() + return reader, StreamWriter() + def get_protocol(self, server_side=False, handler=None): + import aioquic.asyncio, aioquic.quic.events, aioquic.h3.connection, aioquic.h3.events + class Protocol(aioquic.asyncio.QuicConnectionProtocol): + def __init__(s, *args, **kw): + super().__init__(*args, **kw) + s.http = aioquic.h3.connection.H3Connection(s._quic) + s.streams = {} + def quic_event_received(s, event): + if not server_side: + if isinstance(event, aioquic.quic.events.HandshakeCompleted): + self.handshake.set_result(s) + elif isinstance(event, aioquic.quic.events.ConnectionTerminated): + self.handshake = None + self.quic_egress_acm = None + if s.http is not None: + for http_event in s.http.handle_event(event): + s.http_event_received(http_event) + def http_event_received(s, event): + if isinstance(event, aioquic.h3.events.HeadersReceived): + if event.stream_id not in s.streams and server_side: + reader, writer = s.create_stream(event.stream_id) + writer.headers.set_result(event.headers) + asyncio.ensure_future(handler(reader, writer)) + elif isinstance(event, aioquic.h3.events.DataReceived) and event.stream_id in s.streams: + reader, writer = s.streams[event.stream_id] + if event.data: + reader.feed_data(event.data) + if event.stream_ended: + reader.feed_eof() + s.close_stream(event.stream_id) + def create_stream(s, stream_id=None): + if stream_id is None: + stream_id = s._quic.get_next_available_stream_id(False) + s._quic._get_or_create_stream_for_send(stream_id) + reader, writer = self.get_stream(s, stream_id) + s.streams[stream_id] = (reader, writer) + return reader, writer + def close_stream(s, stream_id): + if stream_id in s.streams: + reader, writer = s.streams[stream_id] + if reader.at_eof() and writer.is_closing(): + s.streams.pop(stream_id) + return Protocol + async def wait_h3_connection(self): + if self.handshake is not None: + if not self.handshake.done(): + await self.handshake + else: + import aioquic.asyncio + self.handshake = asyncio.get_event_loop().create_future() + self.quic_egress_acm = aioquic.asyncio.connect(self.host_name, self.port, create_protocol=self.get_protocol(), configuration=self.quicclient) + conn = await self.quic_egress_acm.__aenter__() + await self.handshake + async def wait_open_connection(self, *args): + await self.wait_h3_connection() + return self.handshake.result().create_stream() + def start_server(self, args, stream_handler=stream_handler): + import aioquic.asyncio + return aioquic.asyncio.serve(self.host_name, self.port, configuration=self.quicserver, create_protocol=self.get_protocol(True, functools.partial(stream_handler, **vars(self), **args))) + class ProxySSH(ProxySimple): def __init__(self, **kw): super().__init__(**kw) @@ -670,6 +757,7 @@ def proxy_by_uri(uri, jump): url = urllib.parse.urlparse('s://'+uri) rawprotos = [i.lower() for i in scheme.split('+')] err_str, protos = proto.get_protos(rawprotos) + protonames = [i.name for i in protos] if err_str: raise argparse.ArgumentTypeError(err_str) if 'ssl' in rawprotos or 'secure' in rawprotos: @@ -683,7 +771,7 @@ def proxy_by_uri(uri, jump): sslcontexts.append(sslclient) else: sslserver = sslclient = None - if 'quic' in rawprotos: + if 'quic' in rawprotos or 'h3' in protonames: try: import ssl, aioquic.quic.configuration except Exception: @@ -698,7 +786,6 @@ def proxy_by_uri(uri, jump): import h2 except Exception: raise Exception('Missing library: "pip3 install h2"') - protonames = [i.name for i in protos] urlpath, _, plugins = url.path.partition(',') urlpath, _, lbind = urlpath.partition('@') plugins = plugins.split(',') if plugins else None @@ -740,6 +827,8 @@ def proxy_by_uri(uri, jump): host_name=host_name, port=port, unix=not loc, lbind=lbind, sslclient=sslclient, sslserver=sslserver) if 'quic' in rawprotos: proxy = ProxyQUIC(quicserver, quicclient, **params) + elif 'h3' in protonames: + proxy = ProxyH3(quicserver, quicclient, **params) elif 'h2' in protonames: proxy = ProxyH2(**params) elif 'ssh' in protonames: From 401186ac9589aeaa61227e1a581fc3f86cc4a566 Mon Sep 17 00:00:00 2001 From: qwj Date: Sun, 21 Feb 2021 12:21:47 +0800 Subject: [PATCH 05/23] h3 proto --- README.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index c82de33..24a982a 100644 --- a/README.rst +++ b/README.rst @@ -12,7 +12,7 @@ python-proxy .. |Downloads| image:: https://pepy.tech/badge/pproxy :target: https://pepy.tech/project/pproxy -HTTP/Socks4/Socks5/Shadowsocks/ShadowsocksR/SSH/Redirect/Pf/HTTP2/HTTP3/QUIC TCP/UDP asynchronous tunnel proxy implemented in Python3 asyncio. +HTTP/HTTP2/HTTP3/Socks4/Socks5/Shadowsocks/SSH/Redirect/Pf/QUIC TCP/UDP asynchronous tunnel proxy implemented in Python3 asyncio. QuickStart ---------- @@ -75,7 +75,7 @@ Features - Incoming traffic auto-detect. - Tunnel/jump/backward-jump support. - Unix domain socket support. -- HTTP v2, HTTP v3 (based on QUIC) +- HTTP v2, HTTP v3 (QUIC) - User/password authentication support. - Filter/block hostname by regex patterns. - SSL/TLS client/server support. From 7562c10773633a39a0bf27b749b5fd7339513d4d Mon Sep 17 00:00:00 2001 From: qwj Date: Sun, 21 Feb 2021 13:22:23 +0800 Subject: [PATCH 06/23] quic config --- pproxy/server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pproxy/server.py b/pproxy/server.py index 0956aa9..cbfc699 100644 --- a/pproxy/server.py +++ b/pproxy/server.py @@ -776,8 +776,8 @@ def proxy_by_uri(uri, jump): import ssl, aioquic.quic.configuration except Exception: raise Exception('Missing library: "pip3 install aioquic"') - quicserver = aioquic.quic.configuration.QuicConfiguration(is_client=False) - quicclient = aioquic.quic.configuration.QuicConfiguration() + quicserver = aioquic.quic.configuration.QuicConfiguration(is_client=False, max_stream_data=2**30, max_data=2**30) + quicclient = aioquic.quic.configuration.QuicConfiguration(max_stream_data=2**30, max_data=2**30) quicclient.verify_mode = ssl.CERT_NONE sslcontexts.append(quicserver) sslcontexts.append(quicclient) From 9fbfbd1ee502a47fd102826b20fd7f2ed1c3643b Mon Sep 17 00:00:00 2001 From: qwj Date: Tue, 23 Feb 2021 10:07:30 +0800 Subject: [PATCH 07/23] fix http api --- pproxy/proto.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pproxy/proto.py b/pproxy/proto.py index b701c1c..8b0e328 100644 --- a/pproxy/proto.py +++ b/pproxy/proto.py @@ -296,7 +296,7 @@ async def reply(code, message, body=None, wait=False): if wait: await writer.drain() return await self.http_accept(user, method, path, None, ver, lines, headers.get('Host', ''), headers.get('Proxy-Authorization'), reply, **kw) - async def http_accept(self, user, method, path, authority, ver, lines, host, pauth, reply, authtable, users, httpget, **kw): + async def http_accept(self, user, method, path, authority, ver, lines, host, pauth, reply, authtable, users, httpget=None, **kw): url = urllib.parse.urlparse(path) if method == 'GET' and not url.hostname: for path, text in (httpget.items() if httpget else ()): From c3a78e620af0e0443c80e0144fa1c8e5b0239c45 Mon Sep 17 00:00:00 2001 From: qwj Date: Wed, 24 Feb 2021 12:53:11 +0800 Subject: [PATCH 08/23] QUIC description --- README.rst | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index 24a982a..104daee 100644 --- a/README.rst +++ b/README.rst @@ -1,7 +1,7 @@ python-proxy ============ -|made-with-python| |PyPI-version| |Hit-Count| |Downloads| +|made-with-python| |PyPI-version| |Hit-Count| |Downloads| |Downloads(month)| |Downloads(week)| .. |made-with-python| image:: https://img.shields.io/badge/Made%20with-Python-1f425f.svg :target: https://www.python.org/ @@ -11,6 +11,10 @@ python-proxy :target: https://pypi.python.org/pypi/pproxy/ .. |Downloads| image:: https://pepy.tech/badge/pproxy :target: https://pepy.tech/project/pproxy +.. |Downloads(month)| image:: https://pepy.tech/badge/pproxy/month) + :target: https://pepy.tech/project/pproxy +.. |Downloads(week)| image:: https://pepy.tech/badge/pproxy/week) + :target: https://pepy.tech/project/pproxy HTTP/HTTP2/HTTP3/Socks4/Socks5/Shadowsocks/SSH/Redirect/Pf/QUIC TCP/UDP asynchronous tunnel proxy implemented in Python3 asyncio. @@ -101,7 +105,7 @@ Protocols +-------------------+------------+------------+------------+------------+--------------+ | http v2 (connect) | ✔ | ✔ | | | h2:// | +-------------------+------------+------------+------------+------------+--------------+ -| http v3 (connect) | ✔ | ✔ | | | h3:// | +| http v3 (connect) | ✔ by UDP | ✔ by UDP | | | h3:// | +-------------------+------------+------------+------------+------------+--------------+ | https | ✔ | ✔ | | | http+ssl:// | +-------------------+------------+------------+------------+------------+--------------+ @@ -121,7 +125,7 @@ Protocols +-------------------+------------+------------+------------+------------+--------------+ | ssh tunnel | | ✔ | | | ssh:// | +-------------------+------------+------------+------------+------------+--------------+ -| quic | ✔ | ✔ | ✔ | ✔ | http+quic:// | +| quic | ✔ by UDP | ✔ by UDP | ✔ | ✔ | http+quic:// | +-------------------+------------+------------+------------+------------+--------------+ | iptables nat | ✔ | | | | redir:// | +-------------------+------------+------------+------------+------------+--------------+ @@ -722,16 +726,17 @@ Examples - QUIC protocol example - QUIC is a UDP stream protocol in HTTP/3. Library **aioquic** is required if you want to proxy via QUIC. + QUIC is a UDP stream protocol used in HTTP/3. Library **aioquic** is required if you want to proxy via QUIC. + QUIC is listened on UDP port, but can handle TCP or UDP traffic. If you want to handle TCP traffic, you should use "-l quic+http" instead of "-ul quic+http". .. code:: rst $ pip3 install aioquic - $ pproxy --ssl ssl.crt,ssl.key -l quic://:1234 + $ pproxy --ssl ssl.crt,ssl.key -l quic+http://:1234 On the client: - $ pproxy -r quic://server:1234 + $ pproxy -r quic+http://server:1234 QUIC protocol can transfer a lot of TCP streams on one single UDP stream. If the connection number is hugh, QUIC can benefit by reducing TCP handshake time. From cc9dedf0755e5ced5c95c8f540479faa03846980 Mon Sep 17 00:00:00 2001 From: qwj Date: Wed, 24 Feb 2021 12:54:33 +0800 Subject: [PATCH 09/23] QUIC description --- README.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.rst b/README.rst index 104daee..26f7ef6 100644 --- a/README.rst +++ b/README.rst @@ -1,7 +1,7 @@ python-proxy ============ -|made-with-python| |PyPI-version| |Hit-Count| |Downloads| |Downloads(month)| |Downloads(week)| +|made-with-python| |PyPI-version| |Hit-Count| |Downloads| |Downloads-month| |Downloads-week| .. |made-with-python| image:: https://img.shields.io/badge/Made%20with-Python-1f425f.svg :target: https://www.python.org/ @@ -11,9 +11,9 @@ python-proxy :target: https://pypi.python.org/pypi/pproxy/ .. |Downloads| image:: https://pepy.tech/badge/pproxy :target: https://pepy.tech/project/pproxy -.. |Downloads(month)| image:: https://pepy.tech/badge/pproxy/month) +.. |Downloads-month| image:: https://pepy.tech/badge/pproxy/month) :target: https://pepy.tech/project/pproxy -.. |Downloads(week)| image:: https://pepy.tech/badge/pproxy/week) +.. |Downloads-week| image:: https://pepy.tech/badge/pproxy/week) :target: https://pepy.tech/project/pproxy HTTP/HTTP2/HTTP3/Socks4/Socks5/Shadowsocks/SSH/Redirect/Pf/QUIC TCP/UDP asynchronous tunnel proxy implemented in Python3 asyncio. From 46128d7a38288e9e4bf8b0a584a7144b871c01ed Mon Sep 17 00:00:00 2001 From: qwj Date: Wed, 24 Feb 2021 12:56:21 +0800 Subject: [PATCH 10/23] QUIC description --- README.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 26f7ef6..5b6a455 100644 --- a/README.rst +++ b/README.rst @@ -11,9 +11,9 @@ python-proxy :target: https://pypi.python.org/pypi/pproxy/ .. |Downloads| image:: https://pepy.tech/badge/pproxy :target: https://pepy.tech/project/pproxy -.. |Downloads-month| image:: https://pepy.tech/badge/pproxy/month) +.. |Downloads-month| image:: https://pepy.tech/badge/pproxy/month :target: https://pepy.tech/project/pproxy -.. |Downloads-week| image:: https://pepy.tech/badge/pproxy/week) +.. |Downloads-week| image:: https://pepy.tech/badge/pproxy/week :target: https://pepy.tech/project/pproxy HTTP/HTTP2/HTTP3/Socks4/Socks5/Shadowsocks/SSH/Redirect/Pf/QUIC TCP/UDP asynchronous tunnel proxy implemented in Python3 asyncio. From 9f0ce6d0c5787bb29bb2206921c8183c2c7eb066 Mon Sep 17 00:00:00 2001 From: qwj Date: Wed, 24 Feb 2021 19:49:36 +0800 Subject: [PATCH 11/23] quic param --- pproxy/server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pproxy/server.py b/pproxy/server.py index cbfc699..df9ddc6 100644 --- a/pproxy/server.py +++ b/pproxy/server.py @@ -776,8 +776,8 @@ def proxy_by_uri(uri, jump): import ssl, aioquic.quic.configuration except Exception: raise Exception('Missing library: "pip3 install aioquic"') - quicserver = aioquic.quic.configuration.QuicConfiguration(is_client=False, max_stream_data=2**30, max_data=2**30) - quicclient = aioquic.quic.configuration.QuicConfiguration(max_stream_data=2**30, max_data=2**30) + quicserver = aioquic.quic.configuration.QuicConfiguration(is_client=False, max_stream_data=2**50, max_data=2**50, idle_timeout=60*60) + quicclient = aioquic.quic.configuration.QuicConfiguration(max_stream_data=2**50, max_data=2**50, idle_timeout=60*60) quicclient.verify_mode = ssl.CERT_NONE sslcontexts.append(quicserver) sslcontexts.append(quicclient) From 542cc8ac40614587e90a1ca286c48534faa35876 Mon Sep 17 00:00:00 2001 From: qwj Date: Thu, 25 Feb 2021 08:09:11 +0800 Subject: [PATCH 12/23] quic config --- pproxy/server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pproxy/server.py b/pproxy/server.py index df9ddc6..5b75331 100644 --- a/pproxy/server.py +++ b/pproxy/server.py @@ -776,8 +776,8 @@ def proxy_by_uri(uri, jump): import ssl, aioquic.quic.configuration except Exception: raise Exception('Missing library: "pip3 install aioquic"') - quicserver = aioquic.quic.configuration.QuicConfiguration(is_client=False, max_stream_data=2**50, max_data=2**50, idle_timeout=60*60) - quicclient = aioquic.quic.configuration.QuicConfiguration(max_stream_data=2**50, max_data=2**50, idle_timeout=60*60) + quicserver = aioquic.quic.configuration.QuicConfiguration(is_client=False, max_stream_data=2**60, max_data=2**60, idle_timeout=SOCKET_TIMEOUT) + quicclient = aioquic.quic.configuration.QuicConfiguration(max_stream_data=2**60, max_data=2**60, idle_timeout=SOCKET_TIMEOUT*5) quicclient.verify_mode = ssl.CERT_NONE sslcontexts.append(quicserver) sslcontexts.append(quicclient) From b5731160fa109ddaebef933d3724475555e9f861 Mon Sep 17 00:00:00 2001 From: Jonney Date: Sun, 14 Mar 2021 17:17:53 +0800 Subject: [PATCH 13/23] Another way to run pproxy within a python script import sys,pproxy args = '-l socks5://0.0.0.0:1234' pproxy.server.main(args.split(' ')) --- pproxy/server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pproxy/server.py b/pproxy/server.py index 5b75331..4e47029 100644 --- a/pproxy/server.py +++ b/pproxy/server.py @@ -874,7 +874,7 @@ async def test_url(url, rserver): print(body.decode('utf8', 'ignore')) print(f'============ success ============') -def main(): +def main(args = None): parser = argparse.ArgumentParser(description=__description__+'\nSupported protocols: http,socks4,socks5,shadowsocks,shadowsocksr,redirect,pf,tunnel', epilog=f'Online help: <{__url__}>') parser.add_argument('-l', dest='listen', default=[], action='append', type=proxies_by_uri, help='tcp server uri (default: http+socks4+socks5://:8080/)') parser.add_argument('-r', dest='rserver', default=[], action='append', type=proxies_by_uri, help='tcp remote server uri (default: direct)') @@ -894,7 +894,7 @@ def main(): parser.add_argument('--daemon', dest='daemon', action='store_true', help='run as a daemon (Linux only)') parser.add_argument('--test', help='test this url for all remote proxies and exit') parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}') - args = parser.parse_args() + args = parser.parse_args(args) if args.sslfile: sslfile = args.sslfile.split(',') for context in sslcontexts: From 9061180260ace608a7b16a1360b5657468364724 Mon Sep 17 00:00:00 2001 From: Jonney Date: Mon, 15 Mar 2021 13:00:10 +0800 Subject: [PATCH 14/23] Forget to return in setup_iv() Exception: 'NoneType' object has no attribute 'iv' from 127.0.0.1 --- pproxy/cipher.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pproxy/cipher.py b/pproxy/cipher.py index 12af3ce..2dfbd8e 100644 --- a/pproxy/cipher.py +++ b/pproxy/cipher.py @@ -43,6 +43,7 @@ def setup_iv(self, iv=None): self._buffer = bytearray() self._declen = None self.setup() + return self @property def nonce(self): ret = self._nonce.to_bytes(self.NONCE_LENGTH, 'little') From 3fe4996abccafaccddcb0790c282818403ab3b79 Mon Sep 17 00:00:00 2001 From: Jonney Date: Wed, 17 Mar 2021 17:09:16 +0800 Subject: [PATCH 15/23] UDP multiple jumps not working Forget to decrypt for next jump --- pproxy/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pproxy/server.py b/pproxy/server.py index 4e47029..d13aa37 100644 --- a/pproxy/server.py +++ b/pproxy/server.py @@ -268,7 +268,7 @@ def auth(self): return self.users[0] if self.users else b'' def udp_packet_unpack(self, data): data = self.cipher.datagram.decrypt(data) if self.cipher else data - return self.rproto.udp_unpack(data) + return self.jump.udp_packet_unpack(self.rproto.udp_unpack(data)) def destination(self, host, port): return self.host_name, self.port def udp_prepare_connection(self, host, port, data): From 029eda0e6fd366c574fb687724371c3da338417e Mon Sep 17 00:00:00 2001 From: Aladdin Date: Wed, 21 Apr 2021 11:22:55 +0800 Subject: [PATCH 16/23] connect reason-phrase --- pproxy/proto.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pproxy/proto.py b/pproxy/proto.py index 8b0e328..a6f21d8 100644 --- a/pproxy/proto.py +++ b/pproxy/proto.py @@ -320,7 +320,7 @@ async def http_accept(self, user, method, path, authority, ver, lines, host, pau authtable.set_authed(user) if method == 'CONNECT': host_name, port = netloc_split(authority or path) - return user, host_name, port, lambda writer: reply(200, f'{ver} 200 OK\r\nConnection: close\r\n\r\n'.encode()) + return user, host_name, port, lambda writer: reply(200, f'{ver} 200 Connection Established\r\nConnection: close\r\n\r\n'.encode()) else: host_name, port = netloc_split(url.netloc or host, default_port=80) newpath = url._replace(netloc='', scheme='').geturl() From 1d276a8c622134d5666f91b455e40ac6c02c6d2b Mon Sep 17 00:00:00 2001 From: Aladdin Date: Wed, 21 Apr 2021 11:28:50 +0800 Subject: [PATCH 17/23] connect reason-phrase --- pproxy/proto.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pproxy/proto.py b/pproxy/proto.py index a6f21d8..eb40287 100644 --- a/pproxy/proto.py +++ b/pproxy/proto.py @@ -320,7 +320,7 @@ async def http_accept(self, user, method, path, authority, ver, lines, host, pau authtable.set_authed(user) if method == 'CONNECT': host_name, port = netloc_split(authority or path) - return user, host_name, port, lambda writer: reply(200, f'{ver} 200 Connection Established\r\nConnection: close\r\n\r\n'.encode()) + return user, host_name, port, lambda writer: reply(200, f'{ver} 200 Connection established\r\nConnection: close\r\n\r\n'.encode()) else: host_name, port = netloc_split(url.netloc or host, default_port=80) newpath = url._replace(netloc='', scheme='').geturl() From 2e18d25b6b36684103654f7f0f2965882df60f16 Mon Sep 17 00:00:00 2001 From: qwj Date: Sun, 9 May 2021 11:22:55 +0800 Subject: [PATCH 18/23] ssh reconnect --- pproxy/server.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/pproxy/server.py b/pproxy/server.py index d13aa37..bc8cd71 100644 --- a/pproxy/server.py +++ b/pproxy/server.py @@ -615,7 +615,7 @@ async def channel(): writer.get_extra_info = dict(peername=remote_addr, sockname=remote_addr).get return reader, writer async def wait_ssh_connection(self, local_addr=None, family=0, tunnel=None): - if self.sshconn is not None: + if self.sshconn is not None and not self.sshconn.cancelled(): if not self.sshconn.done(): await self.sshconn else: @@ -633,18 +633,24 @@ async def wait_ssh_connection(self, local_addr=None, family=0, tunnel=None): conn = await asyncssh.connect(host=self.host_name, port=self.port, local_addr=local_addr, family=family, x509_trusted_certs=None, known_hosts=None, username=username, password=password, client_keys=client_keys, keepalive_interval=60, tunnel=tunnel) self.sshconn.set_result(conn) async def wait_open_connection(self, host, port, local_addr, family, tunnel=None): - await self.wait_ssh_connection(local_addr, family, tunnel) - conn = self.sshconn.result() - if isinstance(self.jump, ProxySSH): - reader, writer = await self.jump.wait_open_connection(host, port, None, None, conn) - else: - host, port = self.jump.destination(host, port) - if self.jump.unix: - reader, writer = await conn.open_unix_connection(self.jump.bind) + try: + await self.wait_ssh_connection(local_addr, family, tunnel) + conn = self.sshconn.result() + if isinstance(self.jump, ProxySSH): + reader, writer = await self.jump.wait_open_connection(host, port, None, None, conn) else: - reader, writer = await conn.open_connection(host, port) - reader, writer = self.patch_stream(reader, writer, host, port) - return reader, writer + host, port = self.jump.destination(host, port) + if self.jump.unix: + reader, writer = await conn.open_unix_connection(self.jump.bind) + else: + reader, writer = await conn.open_connection(host, port) + reader, writer = self.patch_stream(reader, writer, host, port) + return reader, writer + except Exception as ex: + if not self.sshconn.done(): + self.sshconn.set_exception(ex) + self.sshconn = None + raise async def start_server(self, args, stream_handler=stream_handler, tunnel=None): if type(self.jump) is ProxyDirect: raise Exception('ssh server mode unsupported') From 697635c0b7c786faaa6053f8bf2ac514d5bc7260 Mon Sep 17 00:00:00 2001 From: Milan Hauth Date: Sun, 12 Dec 2021 11:56:04 +0100 Subject: [PATCH 19/23] print effective port when port is automatic (zero) --- pproxy/server.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/pproxy/server.py b/pproxy/server.py index d13aa37..1209767 100644 --- a/pproxy/server.py +++ b/pproxy/server.py @@ -874,6 +874,17 @@ async def test_url(url, rserver): print(body.decode('utf8', 'ignore')) print(f'============ success ============') +def print_server_started(option, server, print_fn): + for s in server.sockets: + # https://github.com/MagicStack/uvloop/blob/master/uvloop/pseudosock.pyx + laddr = s.getsockname() # tuple size varies with protocol family + h = laddr[0] + p = laddr[1] + f = str(s.family) + ipversion = "ipv4" if f == "AddressFamily.AF_INET" else ("ipv6" if f == "AddressFamily.AF_INET6" else "ipv?") # TODO better + bind = ipversion+' '+h+':'+str(p) + print_fn(option, bind) + def main(args = None): parser = argparse.ArgumentParser(description=__description__+'\nSupported protocols: http,socks4,socks5,shadowsocks,shadowsocksr,redirect,pf,tunnel', epilog=f'Online help: <{__url__}>') parser.add_argument('-l', dest='listen', default=[], action='append', type=proxies_by_uri, help='tcp server uri (default: http+socks4+socks5://:8080/)') @@ -936,27 +947,36 @@ def main(args = None): from . import verbose verbose.setup(loop, args) servers = [] + def print_fn(option, bind=None): + print('Serving on', (bind or option.bind), 'by', ",".join(i.name for i in option.protos) + ('(SSL)' if option.sslclient else ''), '({}{})'.format(option.cipher.name, ' '+','.join(i.name() for i in option.cipher.plugins) if option.cipher and option.cipher.plugins else '') if option.cipher else '') for option in args.listen: - print('Serving on', option.bind, 'by', ",".join(i.name for i in option.protos) + ('(SSL)' if option.sslclient else ''), '({}{})'.format(option.cipher.name, ' '+','.join(i.name() for i in option.cipher.plugins) if option.cipher and option.cipher.plugins else '') if option.cipher else '') try: server = loop.run_until_complete(option.start_server(vars(args))) + print_server_started(option, server, print_fn) servers.append(server) except Exception as ex: + print_fn(option) print('Start server failed.\n\t==>', ex) + def print_fn(option, bind=None): + print('Serving on UDP', (bind or option.bind), 'by', ",".join(i.name for i in option.protos), f'({option.cipher.name})' if option.cipher else '') for option in args.ulisten: - print('Serving on UDP', option.bind, 'by', ",".join(i.name for i in option.protos), f'({option.cipher.name})' if option.cipher else '') try: server, protocol = loop.run_until_complete(option.udp_start_server(vars(args))) + print_server_started(option, server, print_fn) servers.append(server) except Exception as ex: + print_fn(option) print('Start server failed.\n\t==>', ex) + def print_fn(option, bind=None): + print('Serving on', (bind or option.bind), 'backward by', ",".join(i.name for i in option.protos) + ('(SSL)' if option.sslclient else ''), '({}{})'.format(option.cipher.name, ' '+','.join(i.name() for i in option.cipher.plugins) if option.cipher and option.cipher.plugins else '') if option.cipher else '') for option in args.rserver: if isinstance(option, ProxyBackward): - print('Serving on', option.bind, 'backward by', ",".join(i.name for i in option.protos) + ('(SSL)' if option.sslclient else ''), '({}{})'.format(option.cipher.name, ' '+','.join(i.name() for i in option.cipher.plugins) if option.cipher and option.cipher.plugins else '') if option.cipher else '') try: server = loop.run_until_complete(option.start_backward_client(vars(args))) + print_server_started(option, server, print_fn) servers.append(server) except Exception as ex: + print_fn(option) print('Start server failed.\n\t==>', ex) if servers: if args.sys: From fe495ce21a2acadf7b3949288bb887fe8f3468c3 Mon Sep 17 00:00:00 2001 From: mervent Date: Wed, 16 Mar 2022 21:04:28 +0300 Subject: [PATCH 20/23] Obey RFC2616 https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html An HTTP/1.1 proxy MUST ensure that any request message it forwards does contain an appropriate Host header field that identifies the service being requested by the proxy. All Internet-based HTTP/1.1 servers MUST respond with a 400 (Bad Request) status code to any HTTP/1.1 request message which lacks a Host header field. --- pproxy/proto.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pproxy/proto.py b/pproxy/proto.py index eb40287..1572fb4 100644 --- a/pproxy/proto.py +++ b/pproxy/proto.py @@ -328,8 +328,8 @@ async def connected(writer): writer.write(f'{method} {newpath} {ver}\r\n{lines}\r\n\r\n'.encode()) return True return user, host_name, port, connected - async def connect(self, reader_remote, writer_remote, rauth, host_name, port, myhost, **kw): - writer_remote.write(f'CONNECT {host_name}:{port} HTTP/1.1\r\nHost: {myhost}'.encode() + (b'\r\nProxy-Authorization: Basic '+base64.b64encode(rauth) if rauth else b'') + b'\r\n\r\n') + async def connect(self, reader_remote, writer_remote, rauth, host_name, port, **kw): + writer_remote.write(f'CONNECT {host_name}:{port} HTTP/1.1\r\nHost: {host_name}:{port}'.encode() + (b'\r\nProxy-Authorization: Basic '+base64.b64encode(rauth) if rauth else b'') + b'\r\n\r\n') await reader_remote.read_until(b'\r\n\r\n') async def http_channel(self, reader, writer, stat_bytes, stat_conn): try: From c970092aba12ece06ca466a11c8c45492fc357a4 Mon Sep 17 00:00:00 2001 From: Jonney Date: Mon, 27 Mar 2023 19:58:03 +0800 Subject: [PATCH 21/23] Compatible with Python 3.11 In Python 3.11, asyncio.sslproto.SSLProtocol inherits from asyncio.protocols.BufferedProtocol. In SSLProtocol, data_received() is no longer used and has been replaced with get_buffer() and buffer_updated(). --- pproxy/proto.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pproxy/proto.py b/pproxy/proto.py index eb40287..a1bd1bb 100644 --- a/pproxy/proto.py +++ b/pproxy/proto.py @@ -616,12 +616,21 @@ def abort(self): self.close() ssl.connection_made(Transport()) async def channel(): + read_size=65536 + buffer=None + if hasattr(ssl,'get_buffer'): + buffer=ssl.get_buffer(read_size) try: while not reader.at_eof() and not ssl._app_transport._closed: - data = await reader.read(65536) + data = await reader.read(read_size) if not data: break - ssl.data_received(data) + if buffer!=None: + data_len=len(data) + buffer[:data_len]=data + ssl.buffer_updated(data_len) + else: + ssl.data_received(data) except Exception: pass finally: From b153daee60a2f558ba02d63c250651ca1e9c0001 Mon Sep 17 00:00:00 2001 From: v Date: Sun, 21 Jan 2024 17:08:29 +0800 Subject: [PATCH 22/23] feat: httpadmin protocol for external control --- pproxy/admin.py | 39 +++++++++++++++++++++++++++++++++++++++ pproxy/proto.py | 36 ++++++++++++++++++++++++++++++++++-- pproxy/server.py | 10 +++++++++- 3 files changed, 82 insertions(+), 3 deletions(-) create mode 100644 pproxy/admin.py diff --git a/pproxy/admin.py b/pproxy/admin.py new file mode 100644 index 0000000..6428df8 --- /dev/null +++ b/pproxy/admin.py @@ -0,0 +1,39 @@ +import json +import asyncio +config = {} + + +async def reply_http(reply, ver, code, content): + await reply(code, f'{ver} {code}\r\nConnection: close\r\nContent-Type: text/plain\r\nCache-Control: max-age=900\r\nContent-Length: {len(content)}\r\n\r\n'.encode(), content, True) + + +async def status_handler(reply, **kwarg): + method = kwarg.get('method') + if method == 'GET': + data = {"status": "ok"} + value = json.dumps(data).encode() + ver = kwarg.get('ver') + await reply_http(reply, ver, '200 OK', value) + + +async def configs_handler(reply, **kwarg): + method = kwarg.get('method') + ver = kwarg.get('ver') + + if method == 'GET': + data = {"argv": config['argv']} + value = json.dumps(data).encode() + await reply_http(reply, ver, '200 OK', value) + elif method == 'POST': + config['argv'] = kwarg.get('content').decode().split(' ') + config['reload'] = True + data = {"result": 'ok'} + value = json.dumps(data).encode() + await reply_http(reply, ver, '200 OK', value) + raise KeyboardInterrupt + + +httpget = { + '/status': status_handler, + '/configs': configs_handler, +} diff --git a/pproxy/proto.py b/pproxy/proto.py index 8e0472c..b554808 100644 --- a/pproxy/proto.py +++ b/pproxy/proto.py @@ -1,5 +1,5 @@ import asyncio, socket, urllib.parse, time, re, base64, hmac, struct, hashlib, io, os - +from . import admin HTTP_LINE = re.compile('([^ ]+) +(.+?) +(HTTP/[^ ]+)$') packstr = lambda s, n=1: len(s).to_bytes(n, 'big') + s @@ -404,6 +404,38 @@ async def connect(self, reader_remote, writer_remote, rauth, host_name, port, my class H3(H2): pass + +class HTTPAdmin(HTTP): + async def accept(self, reader, user, writer, **kw): + lines = await reader.read_until(b'\r\n\r\n') + headers = lines[:-4].decode().split('\r\n') + method, path, ver = HTTP_LINE.match(headers.pop(0)).groups() + lines = '\r\n'.join(i for i in headers if not i.startswith('Proxy-')) + headers = dict(i.split(': ', 1) for i in headers if ': ' in i) + async def reply(code, message, body=None, wait=False): + writer.write(message) + if body: + writer.write(body) + if wait: + await writer.drain() + + content_length = int(headers.get('Content-Length','0')) + content = '' + if content_length > 0: + content = await reader.read_n(content_length) + + url = urllib.parse.urlparse(path) + if url.hostname is not None: + raise Exception(f'HTTP Admin Unsupported hostname') + if method in ["GET", "POST", "PUT", "PATCH", "DELETE"]: + for path, handler in admin.httpget.items(): + if path == url.path: + await handler(reply=reply, ver=ver, method=method, headers=headers, lines=lines, content=content) + raise Exception('Connection closed') + raise Exception(f'404 {method} {url.path}') + raise Exception(f'405 {method} not allowed') + + class SSH(BaseProtocol): async def connect(self, reader_remote, writer_remote, rauth, host_name, port, myhost, **kw): pass @@ -567,7 +599,7 @@ def udp_accept(protos, data, **kw): return (proto,) + ret raise Exception(f'Unsupported protocol {data[:10]}') -MAPPINGS = dict(direct=Direct, http=HTTP, httponly=HTTPOnly, ssh=SSH, socks5=Socks5, socks4=Socks4, socks=Socks5, ss=SS, ssr=SSR, redir=Redir, pf=Pf, tunnel=Tunnel, echo=Echo, ws=WS, trojan=Trojan, h2=H2, h3=H3, ssl='', secure='', quic='') +MAPPINGS = dict(direct=Direct, http=HTTP, httponly=HTTPOnly, httpadmin=HTTPAdmin, ssh=SSH, socks5=Socks5, socks4=Socks4, socks=Socks5, ss=SS, ssr=SSR, redir=Redir, pf=Pf, tunnel=Tunnel, echo=Echo, ws=WS, trojan=Trojan, h2=H2, h3=H3, ssl='', secure='', quic='') MAPPINGS['in'] = '' def get_protos(rawprotos): diff --git a/pproxy/server.py b/pproxy/server.py index dc3e4e5..f7c82be 100644 --- a/pproxy/server.py +++ b/pproxy/server.py @@ -1,5 +1,7 @@ -import argparse, time, re, asyncio, functools, base64, random, urllib.parse, socket +import argparse, time, re, asyncio, functools, base64, random, urllib.parse, socket, sys from . import proto +from . import admin + from .__doc__ import * SOCKET_TIMEOUT = 60 @@ -892,6 +894,8 @@ def print_server_started(option, server, print_fn): print_fn(option, bind) def main(args = None): + origin_argv = sys.argv[1:] if args is None else args + parser = argparse.ArgumentParser(description=__description__+'\nSupported protocols: http,socks4,socks5,shadowsocks,shadowsocksr,redirect,pf,tunnel', epilog=f'Online help: <{__url__}>') parser.add_argument('-l', dest='listen', default=[], action='append', type=proxies_by_uri, help='tcp server uri (default: http+socks4+socks5://:8080/)') parser.add_argument('-r', dest='rserver', default=[], action='append', type=proxies_by_uri, help='tcp remote server uri (default: direct)') @@ -953,6 +957,7 @@ def main(args = None): from . import verbose verbose.setup(loop, args) servers = [] + admin.config.update({'argv': origin_argv, 'servers': servers, 'args': args, 'loop': loop}) def print_fn(option, bind=None): print('Serving on', (bind or option.bind), 'by', ",".join(i.name for i in option.protos) + ('(SSL)' if option.sslclient else ''), '({}{})'.format(option.cipher.name, ' '+','.join(i.name() for i in option.cipher.plugins) if option.cipher and option.cipher.plugins else '') if option.cipher else '') for option in args.listen: @@ -1004,6 +1009,9 @@ def print_fn(option, bind=None): if hasattr(server, 'wait_closed'): loop.run_until_complete(server.wait_closed()) loop.run_until_complete(loop.shutdown_asyncgens()) + if admin.config.get('reload', False): + admin.config['reload'] = False + main(admin.config['argv']) loop.close() if __name__ == '__main__': From 10f50139105a4e2a281d0e9e2eb69e52941e81b0 Mon Sep 17 00:00:00 2001 From: Jonney <603073+Jonney@users.noreply.github.com> Date: Thu, 25 Apr 2024 16:36:00 +0800 Subject: [PATCH 23/23] Fix SyntaxWarning pproxy/proto.py:7: SyntaxWarning: invalid escape sequence '\[' --- pproxy/proto.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pproxy/proto.py b/pproxy/proto.py index 8e0472c..770ea6a 100644 --- a/pproxy/proto.py +++ b/pproxy/proto.py @@ -4,7 +4,7 @@ packstr = lambda s, n=1: len(s).to_bytes(n, 'big') + s def netloc_split(loc, default_host=None, default_port=None): - ipv6 = re.fullmatch('\[([0-9a-fA-F:]*)\](?::(\d+)?)?', loc) + ipv6 = re.fullmatch(r'\[([0-9a-fA-F:]*)\](?::(\d+)?)?', loc) if ipv6: host_name, port = ipv6.groups() elif ':' in loc: