Skip to content

Commit d15a3b3

Browse files
committed
fix networking for both stream and dgram
1 parent 814cd72 commit d15a3b3

5 files changed

Lines changed: 67 additions & 32 deletions

File tree

src/library.js

Lines changed: 53 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6802,8 +6802,13 @@ LibraryManager.library = {
68026802
socket: function(family, type, protocol) {
68036803
var fd = Sockets.nextFd++;
68046804
assert(fd < 64); // select() assumes socket fd values are in 0..63
6805+
var stream = type == {{{ cDefine('SOCK_STREAM') }}};
6806+
if (protocol) {
6807+
assert(stream == (protocol == {{{ cDefine('IPPROTO_TCP') }}})); // if stream, must be tcp
6808+
}
68056809
Sockets.fds[fd] = {
6806-
connected: false
6810+
connected: false,
6811+
stream: stream
68076812
};
68086813
return fd;
68096814
},
@@ -6832,28 +6837,49 @@ LibraryManager.library = {
68326837
var i8Temp = new Uint8Array(i32Temp.buffer);
68336838

68346839
info.inQueue = [];
6840+
if (!info.stream) {
6841+
var partialBuffer = null; // inQueue contains full dgram messages; this buffers incomplete data. Must begin with the beginning of a message
6842+
}
6843+
68356844
info.socket.onmessage = function(event) {
68366845
assert(typeof event.data !== 'string' && event.data.byteLength); // must get binary data!
68376846
var data = new Uint8Array(event.data); // make a typed array view on the array buffer
68386847
#if SOCKET_DEBUG
68396848
Module.print(['onmessage', data.length, '|', Array.prototype.slice.call(data)]);
68406849
#endif
6841-
#if SOCKET_FORCED_MESSAGING
6842-
var start = 0;
6843-
while (start+4 < data.length) {
6844-
i8Temp.set(data.subarray(start, start+4));
6845-
var currLen = i32Temp[0];
6846-
assert(currLen > 0);
6847-
start += 4;
6848-
assert(start + currLen <= data.length, [data.length, start, currLen]); // must not receive fractured messages!
6849-
info.inQueue.push(data.subarray(start, start+currLen));
6850+
if (info.stream) {
6851+
info.inQueue.push(data);
6852+
} else {
6853+
// we added headers with message sizes, read those to find discrete messages
6854+
if (partialBuffer) {
6855+
// append to the partial buffer
6856+
var newBuffer = new Uint8Array(partialBuffer.length + data.length);
6857+
newBuffer.set(partialBuffer);
6858+
newBuffer.set(data, partialBuffer.length);
6859+
// forget the partial buffer and work on data
6860+
data = newBuffer;
6861+
partialBuffer = null;
6862+
}
6863+
var currPos = 0;
6864+
while (currPos+4 < data.length) {
6865+
i8Temp.set(data.subarray(currPos, currPos+4));
6866+
var currLen = i32Temp[0];
6867+
assert(currLen > 0);
6868+
if (currPos + 4 + currLen > data.length) {
6869+
break; // not enough data has arrived
6870+
}
6871+
currPos += 4;
68506872
#if SOCKET_DEBUG
6851-
Module.print(['onmessage message', currLen, '|', Array.prototype.slice.call(data.subarray(start, start+currLen))]);
6873+
Module.print(['onmessage message', currLen, '|', Array.prototype.slice.call(data.subarray(currPos, currPos+currLen))]);
68526874
#endif
6853-
start += currLen;
6875+
info.inQueue.push(data.subarray(currPos, currPos+currLen));
6876+
currPos += currLen;
6877+
}
6878+
// If data remains, buffer it
6879+
if (currPos < data.length) {
6880+
partialBuffer = data.subarray(currPos);
6881+
}
68546882
}
6855-
#else
6856-
info.inQueue.push(data);
68576883
#endif
68586884
}
68596885
function send(data) {
@@ -6885,15 +6911,14 @@ LibraryManager.library = {
68856911
}
68866912
}
68876913
info.sender = function(data) {
6888-
#if SOCKET_FORCED_MESSAGING
6889-
var buffer = new Uint8Array(data.length+4);
6890-
i32Temp[0] = data.length;
6891-
buffer.set(i8Temp);
6892-
buffer.set(data, 4);
6893-
outQueue.push(buffer);
6894-
#else
6914+
if (!info.stream) {
6915+
// add a header with the message size
6916+
var header = new Uint8Array(4);
6917+
i32Temp[0] = data.length;
6918+
header.set(i8Temp);
6919+
outQueue.push(header);
6920+
}
68956921
outQueue.push(new Uint8Array(data));
6896-
#endif
68976922
trySend();
68986923
};
68996924
return 0;
@@ -7007,6 +7032,12 @@ LibraryManager.library = {
70077032
HEAPU8.set(buffer.subarray(bufferPos, bufferPos + currNum), currBuf);
70087033
bufferPos += currNum;
70097034
}
7035+
if (info.stream) {
7036+
// This is tcp (reliable), so if not all was read, keep it
7037+
if (bufferPos < bytes) {
7038+
info.inQueue.unshift(buffer.subArray(bufferPos));
7039+
}
7040+
}
70107041
return ret;
70117042
},
70127043

src/settings.js

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,6 @@ var LIBRARY_DEBUG = 0; // Print out when we enter a library call (library*.js).
147147
var GL_DEBUG = 0; // Print out all calls into WebGL. As with LIBRARY_DEBUG, you can set a runtime
148148
// option, in this case GL.debug.
149149
var SOCKET_DEBUG = 0; // Log out socket/network data transfer.
150-
var SOCKET_FORCED_MESSAGING = 0; // If 1, we make sure that each socket send ends up a single socket
151-
// receive, that is, we force proper messaging (otherwise, sending
152-
// [A] and [B] can show up on the other side as [A, B]). This will
153-
// only work if both sides have it enabled, obviously, so it only
154-
// makes sense for p2p or when connecting to a special server - we
155-
// add some metadata (message size) to messages in this mode
156150

157151
var PROFILE_MAIN_LOOP = 0; // Profile the function called in set_main_loop
158152

@@ -1197,6 +1191,8 @@ var C_DEFINES = {'SI_MESGQ': '5',
11971191
'_SC_TTY_NAME_MAX': '41',
11981192
'AF_INET': '1',
11991193
'AF_INET6': '6',
1200-
'FIONREAD': '1'
1194+
'FIONREAD': '1',
1195+
'SOCK_STREAM': '200',
1196+
'IPPROTO_TCP': 1
12011197
};
12021198

tests/runner.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10283,12 +10283,12 @@ def relay_server(q):
1028310283
return relay_server
1028410284

1028510285
def test_zz_websockets_bi(self):
10286-
for fm in [0,1]:
10286+
for datagram in [0,1]:
1028710287
try:
1028810288
with self.WebsockHarness(8992, self.make_relay_server(8992, 8994)):
1028910289
with self.WebsockHarness(8994, no_server=True):
10290-
Popen([PYTHON, EMCC, path_from_root('tests', 'websockets_bi_side.c'), '-o', 'side.html', '-DSOCKK=8995', '-s', 'SOCKET_FORCED_MESSAGING=%d' % fm]).communicate()
10291-
self.btest('websockets_bi.c', expected='2499', args=['-s', 'SOCKET_FORCED_MESSAGING=%d' % fm])
10290+
Popen([PYTHON, EMCC, path_from_root('tests', 'websockets_bi_side.c'), '-o', 'side.html', '-DSOCKK=8995', '-DTEST_DGRAM=%d' % datagram]).communicate()
10291+
self.btest('websockets_bi.c', expected='2499', args=['-DTEST_DGRAM=%d' % datagram])
1029210292
finally:
1029310293
self.clean_pids()
1029410294

tests/websockets_bi.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,11 @@ int main(void)
8181

8282
struct sockaddr_in stSockAddr;
8383
int Res;
84+
#if !TEST_DGRAM
8485
SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
86+
#else
87+
SocketFD = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
88+
#endif
8589

8690
if (-1 == SocketFD)
8791
{

tests/websockets_bi_side.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ int main(void)
1818
{
1919
struct sockaddr_in stSockAddr;
2020
int Res;
21+
#if !TEST_DGRAM
2122
int SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
23+
#else
24+
int SocketFD = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
25+
#endif
2226

2327
if (-1 == SocketFD)
2428
{

0 commit comments

Comments
 (0)