@@ -44,6 +44,13 @@ typedef struct _socket_obj_t {
4444 mp_obj_base_t base ;
4545 struct net_context * ctx ;
4646 struct k_fifo recv_q ;
47+ struct net_buf * cur_buf ;
48+
49+ #define STATE_NEW 0
50+ #define STATE_CONNECTING 1
51+ #define STATE_CONNECTED 2
52+ #define STATE_PEER_CLOSED 3
53+ int8_t state ;
4754} socket_obj_t ;
4855
4956STATIC const mp_obj_type_t socket_type ;
@@ -103,6 +110,19 @@ static void sock_received_cb(struct net_context *context, struct net_buf *net_bu
103110 }
104111 DEBUG_printf ("\n" );
105112
113+ // if net_buf == NULL, EOF
114+ if (net_buf == NULL ) {
115+ // TODO: k_fifo accessor for this?
116+ struct net_buf * last_buf = (struct net_buf * )sys_slist_peek_tail (& socket -> recv_q .data_q );
117+ // We abuse "buf_sent" flag to store EOF flag
118+ net_nbuf_set_buf_sent (last_buf , true);
119+ DEBUG_printf ("Set EOF flag on %p\n" , last_buf );
120+ return ;
121+ }
122+
123+ // Make sure that "EOF flag" is not set
124+ net_nbuf_set_buf_sent (net_buf , false);
125+
106126 // net_buf->frags will be overwritten by fifo, so save it
107127 net_nbuf_set_token (net_buf , net_buf -> frags );
108128 k_fifo_put (& socket -> recv_q , net_buf );
@@ -126,6 +146,8 @@ STATIC mp_obj_t socket_make_new(const mp_obj_type_t *type, size_t n_args, size_t
126146 socket_obj_t * socket = m_new_obj_with_finaliser (socket_obj_t );
127147 socket -> base .type = type ;
128148 k_fifo_init (& socket -> recv_q );
149+ socket -> cur_buf = NULL ;
150+ socket -> state = STATE_NEW ;
129151
130152 int family = AF_INET ;
131153 int socktype = SOCK_STREAM ;
@@ -226,6 +248,61 @@ STATIC mp_obj_t socket_recv(mp_obj_t self_in, mp_obj_t len_in) {
226248 net_buf_gather (net_buf , vstr .buf , recv_len );
227249 net_nbuf_unref (net_buf );
228250
251+ } else if (sock_type == SOCK_STREAM ) {
252+
253+ do {
254+ if (socket -> state == STATE_PEER_CLOSED ) {
255+ return mp_const_empty_bytes ;
256+ }
257+
258+ unsigned header_len = 0 ;
259+ if (socket -> cur_buf == NULL ) {
260+ DEBUG_printf ("TCP recv: no cur_buf, getting\n" );
261+ struct net_buf * net_buf = k_fifo_get (& socket -> recv_q , K_FOREVER );
262+ // Restore ->frags overwritten by fifo
263+ net_buf -> frags = net_nbuf_token (net_buf );
264+
265+ header_len = net_nbuf_appdata (net_buf ) - net_buf -> frags -> data ;
266+ DEBUG_printf ("TCP recv: new cur_buf: %p, hdr_len: %u\n" , net_buf , header_len );
267+ socket -> cur_buf = net_buf ;
268+ }
269+
270+ struct net_buf * frag = socket -> cur_buf -> frags ;
271+ if (frag == NULL ) {
272+ printf ("net_buf has empty fragments on start!\n" );
273+ assert (0 );
274+ }
275+
276+ net_buf_pull (frag , header_len );
277+ unsigned frag_len = frag -> len ;
278+ recv_len = frag_len ;
279+ if (recv_len > max_len ) {
280+ recv_len = max_len ;
281+ }
282+ DEBUG_printf ("%d data bytes in head frag, going to read %d\n" , frag_len , recv_len );
283+
284+ vstr_init_len (& vstr , recv_len );
285+ memcpy (vstr .buf , frag -> data , recv_len );
286+
287+ if (recv_len != frag_len ) {
288+ net_buf_pull (frag , recv_len );
289+ } else {
290+ frag = net_buf_frag_del (socket -> cur_buf , frag );
291+ if (frag == NULL ) {
292+ DEBUG_printf ("Finished processing net_buf %p\n" , socket -> cur_buf );
293+ // If "buf_sent" flag was set, it's last packet and we reached EOF
294+ if (net_nbuf_buf_sent (socket -> cur_buf )) {
295+ socket -> state = STATE_PEER_CLOSED ;
296+ }
297+ net_nbuf_unref (socket -> cur_buf );
298+ socket -> cur_buf = NULL ;
299+ }
300+ }
301+ // Keep repeating while we're getting empty fragments
302+ // Zephyr IP stack appears to feed empty net_buf's with empty
303+ // frags for various TCP control packets.
304+ } while (recv_len == 0 );
305+
229306 } else {
230307 mp_not_implemented ("" );
231308 }
0 commit comments