@@ -6802,8 +6802,13 @@ LibraryManager.library = {
68026802 socket : function ( family , type , protocol ) {
68036803 var fd = Sockets . nextFd ++ ;
68046804 assert ( fd < 64 ) ; // select() assumes socket fd values are in 0..63
6805+ var stream = type == { { { cDefine ( 'SOCK_STREAM' ) } } } ;
6806+ if ( protocol ) {
6807+ assert ( stream == ( protocol == { { { cDefine ( 'IPPROTO_TCP' ) } } } ) ) ; // if stream, must be tcp
6808+ }
68056809 Sockets . fds [ fd ] = {
6806- connected : false
6810+ connected : false ,
6811+ stream : stream
68076812 } ;
68086813 return fd ;
68096814 } ,
@@ -6832,28 +6837,49 @@ LibraryManager.library = {
68326837 var i8Temp = new Uint8Array ( i32Temp . buffer ) ;
68336838
68346839 info . inQueue = [ ] ;
6840+ if ( ! info . stream ) {
6841+ var partialBuffer = null ; // inQueue contains full dgram messages; this buffers incomplete data. Must begin with the beginning of a message
6842+ }
6843+
68356844 info . socket . onmessage = function ( event ) {
68366845 assert ( typeof event . data !== 'string' && event . data . byteLength ) ; // must get binary data!
68376846 var data = new Uint8Array ( event . data ) ; // make a typed array view on the array buffer
68386847#if SOCKET_DEBUG
68396848 Module . print ( [ 'onmessage' , data . length , '|' , Array . prototype . slice . call ( data ) ] ) ;
68406849#endif
6841- #if SOCKET_FORCED_MESSAGING
6842- var start = 0 ;
6843- while ( start + 4 < data . length ) {
6844- i8Temp . set ( data . subarray ( start , start + 4 ) ) ;
6845- var currLen = i32Temp [ 0 ] ;
6846- assert ( currLen > 0 ) ;
6847- start += 4 ;
6848- assert ( start + currLen <= data . length , [ data . length , start , currLen ] ) ; // must not receive fractured messages!
6849- info . inQueue . push ( data . subarray ( start , start + currLen ) ) ;
6850+ if ( info . stream ) {
6851+ info . inQueue . push ( data ) ;
6852+ } else {
6853+ // we added headers with message sizes, read those to find discrete messages
6854+ if ( partialBuffer ) {
6855+ // append to the partial buffer
6856+ var newBuffer = new Uint8Array ( partialBuffer . length + data . length ) ;
6857+ newBuffer . set ( partialBuffer ) ;
6858+ newBuffer . set ( data , partialBuffer . length ) ;
6859+ // forget the partial buffer and work on data
6860+ data = newBuffer ;
6861+ partialBuffer = null ;
6862+ }
6863+ var currPos = 0 ;
6864+ while ( currPos + 4 < data . length ) {
6865+ i8Temp . set ( data . subarray ( currPos , currPos + 4 ) ) ;
6866+ var currLen = i32Temp [ 0 ] ;
6867+ assert ( currLen > 0 ) ;
6868+ if ( currPos + 4 + currLen > data . length ) {
6869+ break ; // not enough data has arrived
6870+ }
6871+ currPos += 4 ;
68506872#if SOCKET_DEBUG
6851- Module . print ( [ 'onmessage message' , currLen , '|' , Array . prototype . slice . call ( data . subarray ( start , start + currLen ) ) ] ) ;
6873+ Module . print ( [ 'onmessage message' , currLen , '|' , Array . prototype . slice . call ( data . subarray ( currPos , currPos + currLen ) ) ] ) ;
68526874#endif
6853- start += currLen ;
6875+ info . inQueue . push ( data . subarray ( currPos , currPos + currLen ) ) ;
6876+ currPos += currLen ;
6877+ }
6878+ // If data remains, buffer it
6879+ if ( currPos < data . length ) {
6880+ partialBuffer = data . subarray ( currPos ) ;
6881+ }
68546882 }
6855- #else
6856- info . inQueue . push ( data ) ;
68576883#endif
68586884 }
68596885 function send ( data ) {
@@ -6885,15 +6911,14 @@ LibraryManager.library = {
68856911 }
68866912 }
68876913 info . sender = function ( data ) {
6888- # if SOCKET_FORCED_MESSAGING
6889- var buffer = new Uint8Array ( data . length + 4 ) ;
6890- i32Temp [ 0 ] = data . length ;
6891- buffer . set ( i8Temp ) ;
6892- buffer . set ( data , 4 ) ;
6893- outQueue . push ( buffer ) ;
6894- #else
6914+ if ( ! info . stream ) {
6915+ // add a header with the message size
6916+ var header = new Uint8Array ( 4 ) ;
6917+ i32Temp [ 0 ] = data . length ;
6918+ header . set ( i8Temp ) ;
6919+ outQueue . push ( header ) ;
6920+ }
68956921 outQueue . push ( new Uint8Array ( data ) ) ;
6896- #endif
68976922 trySend ( ) ;
68986923 } ;
68996924 return 0 ;
@@ -7007,6 +7032,12 @@ LibraryManager.library = {
70077032 HEAPU8 . set ( buffer . subarray ( bufferPos , bufferPos + currNum ) , currBuf ) ;
70087033 bufferPos += currNum ;
70097034 }
7035+ if ( info . stream ) {
7036+ // This is tcp (reliable), so if not all was read, keep it
7037+ if ( bufferPos < bytes ) {
7038+ info . inQueue . unshift ( buffer . subArray ( bufferPos ) ) ;
7039+ }
7040+ }
70107041 return ret ;
70117042 } ,
70127043
0 commit comments