Skip to content

Commit 0b9962a

Browse files
committed
REPL: add message-based protocol for control; improve factoring
1 parent e1114b4 commit 0b9962a

File tree

4 files changed

+373
-67
lines changed

4 files changed

+373
-67
lines changed

unpythonic/net/client.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import signal
1010
import threading
1111

12+
from .util import mkrecvbuf, recvmsg, sendmsg
13+
1214
__all__ = ["connect"]
1315

1416

@@ -21,10 +23,13 @@ def _handle_alarm(signum, frame):
2123
signal.signal(signal.SIGALRM, _handle_alarm)
2224

2325

24-
def _make_remote_completion_client(sock):
26+
def _make_remote_completion_client(buf, sock):
2527
"""Make a tab completion function for a remote REPL session.
2628
27-
`sock` must be a socket already connected to a `RemoteTabCompletionServer`.
29+
`buf` is a receive buffer for the message protocol (see
30+
`unpythonic.net.util.mkrecvbuf`).
31+
32+
`sock` must be a socket already connected to a `ControlSession`.
2833
The caller is responsible for managing the socket.
2934
3035
The return value can be used as a completer in `readline.set_completer`.
@@ -33,14 +38,11 @@ def complete(text, state):
3338
try:
3439
request = {"text": text, "state": state}
3540
data_out = json.dumps(request).encode("utf-8")
36-
sock.sendall(data_out)
37-
# TODO: must know how to receive until end of message, since TCP doesn't do datagrams
38-
# TODO: build a control channel protocol
39-
# https://docs.python.org/3/howto/sockets.html
40-
data_in = sock.recv(4096).decode("utf-8")
41+
sendmsg(data_out, sock)
42+
data_in = recvmsg(buf, sock).decode("utf-8")
4143
# print("text '{}' state '{}' reply '{}'".format(text, state, data_in))
4244
if not data_in:
43-
print("Tab completion server exited, socket closed!")
45+
print("Control server exited, socket closed!")
4446
return None
4547
reply = json.loads(data_in)
4648
return reply
@@ -70,13 +72,14 @@ class SessionExit(Exception):
7072
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: # remote REPL session
7173
sock.connect(addrspec)
7274

73-
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as csock: # remote tab completion
74-
# TODO: configurable tab completion port
75+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as csock: # control channel (remote tab completion, remote Ctrl+C)
76+
# TODO: configurable control port
7577
csock.connect((addrspec[0], 8128)) # TODO: IPv6 support
7678

7779
# Set a custom tab completer for readline.
7880
# https://stackoverflow.com/questions/35115208/is-there-any-way-to-combine-readline-rlcompleter-and-interactiveconsole-in-pytho
79-
completer = _make_remote_completion_client(csock)
81+
cbuf = mkrecvbuf()
82+
completer = _make_remote_completion_client(cbuf, csock)
8083
readline.set_completer(completer)
8184
readline.parse_and_bind("tab: complete")
8285

@@ -96,13 +99,15 @@ def sock_to_stdout():
9699
t = threading.Thread(target=sock_to_stdout, daemon=True)
97100
t.start()
98101

99-
# TODO: fix multiline editing (see repl_tool.py in socketserverREPL for reference,
102+
# TODO: fix multiline editing (see repl_tool.py in socketserverREPL for reference)
100103
#
101104
# This needs prompt detection so we'll know how to set up
102105
# `input`. The first time on a new line, the prompt is sent
103106
# by the server, but then during line editing, it needs to be
104107
# re-printed by `readline`, so `input` needs to know what the
105-
# prompt text should be).
108+
# prompt text should be.
109+
#
110+
# For this, we need to read the socket until we see a new prompt.
106111

107112
# Run readline at the client side. Only the tab completion
108113
# results come from the server.

unpythonic/net/server.py

Lines changed: 28 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -79,17 +79,17 @@
7979
import threading
8080
import sys
8181
import os
82-
import select
8382
import time
84-
import socket
8583
import socketserver
8684
import json
8785
import atexit
8886

89-
from .ptyproxy import PTYSocketProxy
9087
from ..collections import ThreadLocalBox, Shim
9188
#from ..misc import async_raise
9289

90+
from .util import mkrecvbuf, recvmsg, sendmsg, ReuseAddrThreadingTCPServer
91+
from .ptyproxy import PTYSocketProxy
92+
9393
_server_instance = None
9494
_active_connections = set()
9595
_halt_pending = False
@@ -128,13 +128,17 @@ def server_print(*values, **kwargs):
128128
"""
129129
print(*values, **kwargs, file=_original_stdout)
130130

131-
class RemoteTabCompletionSession(socketserver.BaseRequestHandler):
132-
"""Entry point for connections to the remote tab completion server.
133131

134-
In a session, a `RemoteTabCompletionClient` sends us requests. We invoke
135-
`rlcompleter` on the server side, and return its response to the client.
132+
class ControlSession(socketserver.BaseRequestHandler):
133+
"""Entry point for connections to the control server.
134+
135+
We use a separate connection for control to avoid head-of-line blocking.
136136
137-
For communication, we use JSON encoded dictionaries. This format was chosen
137+
In a session, the client sends us requests for remote tab completion. We
138+
invoke `rlcompleter` on the server side, and return its response to the
139+
client.
140+
141+
We encode the payload as JSON encoded dictionaries. This format was chosen
138142
instead of pickle to ensure the client and server can talk to each other
139143
regardless of the Python versions on each end of the connection.
140144
"""
@@ -146,43 +150,24 @@ def handle(self):
146150
class ClientExit(Exception):
147151
pass
148152
try:
149-
server_print("Remote tab completion session for {} opened.".format(client_address_str))
153+
server_print("Control channel for {} opened.".format(client_address_str))
150154
# TODO: fancier backend? See examples in https://pymotw.com/3/readline/
151155
backend = rlcompleter.Completer(_console_locals_namespace)
156+
buf = mkrecvbuf()
152157
sock = self.request
153158
while True:
154-
rs, ws, es = select.select([sock], [], [])
155-
for r in rs:
156-
# Control channel protocol:
157-
# - message-based
158-
# - text encoding: utf-8
159-
# - message structure: header body, where
160-
# header:
161-
# 0xFF message start, sync byte (never appears in utf-8 encoded text)
162-
# "v": start of protocol version field
163-
# one byte containing the version, currently the character "1" as utf-8.
164-
# Doesn't need to be a number character, any Unicode codepoint below 127 will do.
165-
# It's unlikely more than 127 - 32 = 95 versions of the protocol are ever needed.
166-
# "l": start of message length field
167-
# netstring containing the length of the message body, as bytes
168-
# Netstring format is e.g. "12:hello world!,"
169-
# so for example, for a 42-byte message body, this is "2:42,".
170-
# The comma is the field terminator.
171-
# body:
172-
# arbitrary payload, depending on the request.
173-
# TODO: must know how to receive until end of message, since TCP doesn't do datagrams
174-
# TODO: build a control channel protocol
175-
# https://docs.python.org/3/howto/sockets.html
176-
data_in = sock.recv(4096).decode("utf-8")
177-
if len(data_in) == 0: # EOF on network socket
178-
raise ClientExit
179-
request = json.loads(data_in)
180-
reply = backend.complete(request["text"], request["state"])
181-
# server_print(request, reply)
182-
data_out = json.dumps(reply).encode("utf-8")
183-
sock.sendall(data_out)
159+
# TODO: Add support for requests to inject Ctrl+C. Needs a command protocol layer.
160+
# TODO: Can use JSON dictionaries; we're guaranteed to get whole messages only.
161+
data_in = recvmsg(buf, sock)
162+
if not data_in:
163+
raise ClientExit
164+
request = json.loads(data_in.decode("utf-8"))
165+
reply = backend.complete(request["text"], request["state"])
166+
# server_print(request, reply)
167+
data_out = json.dumps(reply).encode("utf-8")
168+
sendmsg(data_out, sock)
184169
except ClientExit:
185-
server_print("Remote tab completion session for {} closed.".format(client_address_str))
170+
server_print("Control channel for {} closed.".format(client_address_str))
186171
except BaseException as err:
187172
server_print(err)
188173

@@ -265,17 +250,6 @@ def on_slave_disconnect(adaptor):
265250
_active_connections.remove(id(self))
266251

267252

268-
# https://docs.python.org/3/library/socketserver.html#socketserver.ThreadingTCPServer
269-
# https://docs.python.org/3/library/socketserver.html#socketserver.ThreadingMixIn
270-
# https://docs.python.org/3/library/socketserver.html#socketserver.TCPServer
271-
class ReuseAddrThreadingTCPServer(socketserver.ThreadingTCPServer):
272-
def server_bind(self):
273-
"""Custom server_bind ensuring the socket is available for rebind immediately."""
274-
# from https://stackoverflow.com/a/18858817
275-
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
276-
self.socket.bind(self.server_address)
277-
278-
279253
def start(locals, addrspec=("127.0.0.1", 1337), banner=None):
280254
"""Start the REPL server.
281255
@@ -349,11 +323,11 @@ def start(locals, addrspec=("127.0.0.1", 1337), banner=None):
349323
server_thread = threading.Thread(target=server.serve_forever, name="Unpythonic REPL server", daemon=True)
350324
server_thread.start()
351325

352-
# remote tab completion server
353-
# TODO: configurable tab completion port
326+
# control channel for remote tab completion and remote Ctrl+C requests
327+
# TODO: configurable port
354328
# Default is 8128 because it's for *completing* things, and https://en.wikipedia.org/wiki/Perfect_number
355329
# (This is the first one above 1024, and was already known to Nicomachus around 100 CE.)
356-
cserver = ReuseAddrThreadingTCPServer((addr, 8128), RemoteTabCompletionSession)
330+
cserver = ReuseAddrThreadingTCPServer((addr, 8128), ControlSession)
357331
cserver.daemon_threads = True
358332
cserver_thread = threading.Thread(target=cserver.serve_forever, name="Unpythonic REPL remote tab completion server", daemon=True)
359333
cserver_thread.start()

unpythonic/net/test/test_util.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# -*- coding: utf-8; -*-
2+
3+
import threading
4+
import socket
5+
from time import sleep
6+
7+
from ..util import recvall, \
8+
netstringify, \
9+
mkrecvbuf, recvmsg, sendmsg
10+
11+
addrspec = ("127.0.0.1", 7777)
12+
def nettest(server_recv_func, client_send_func):
13+
"""Harness/fixture.
14+
15+
It doesn't really matter which way client/server are, so we (arbitrarily)
16+
run our tests in a setup where the server receives and the client sends.
17+
18+
server_recv_func: 1-arg callable; take socket, return data read from it.
19+
20+
client_send_func: 1-arg callable; take socket, send data into it.
21+
No return value.
22+
"""
23+
# TODO: IPv6 support
24+
result = []
25+
def recv_server():
26+
try:
27+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
28+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
29+
sock.bind(addrspec)
30+
sock.listen()
31+
conn, addr = sock.accept()
32+
with conn:
33+
data = server_recv_func(conn)
34+
result.append(data)
35+
except Exception as err:
36+
print(err)
37+
def send_client():
38+
try:
39+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
40+
sock.connect(addrspec)
41+
client_send_func(sock)
42+
except Exception as err:
43+
print(err)
44+
ts = threading.Thread(target=recv_server)
45+
tc = threading.Thread(target=send_client)
46+
ts.start()
47+
sleep(0.05)
48+
tc.start()
49+
ts.join()
50+
tc.join()
51+
return result[0]
52+
53+
54+
def test():
55+
assert netstringify(b"hello world") == b"11:hello world,"
56+
57+
# s = server, c = client
58+
59+
# basic use of recvall
60+
s = lambda sock: recvall(1024, sock)
61+
c = lambda sock: [sock.sendall(b"x" * 512), sock.sendall(b"x" * 512)]
62+
assert len(nettest(s, c)) == 1024
63+
64+
# basic use of message protocol
65+
def s2(sock):
66+
buf = mkrecvbuf()
67+
data = recvmsg(buf, sock)
68+
return data
69+
def c2(sock):
70+
sendmsg(b"hello world", sock)
71+
assert nettest(s2, c2) == b"hello world"
72+
73+
# receiving a message gets a whole message and only that message
74+
def s3(sock):
75+
buf = mkrecvbuf()
76+
data = recvmsg(buf, sock)
77+
return data
78+
def c3(sock):
79+
sendmsg(b"hello world", sock)
80+
sendmsg(b"hello again", sock)
81+
assert nettest(s3, c3) == b"hello world"
82+
83+
# messages are received in order
84+
def s4(sock):
85+
buf = mkrecvbuf()
86+
data = []
87+
data.append(recvmsg(buf, sock))
88+
data.append(recvmsg(buf, sock))
89+
return data
90+
def c4(sock):
91+
sendmsg(b"hello world", sock)
92+
sendmsg(b"hello again", sock)
93+
assert nettest(s4, c4) == [b"hello world", b"hello again"]
94+
95+
print("All tests PASSED")
96+
97+
if __name__ == '__main__':
98+
test()

0 commit comments

Comments
 (0)