3838
3939
4040static void uv__udp_run_completed (uv_udp_t * handle );
41- static void uv__udp_run_pending (uv_udp_t * handle );
4241static void uv__udp_io (uv_loop_t * loop , uv__io_t * w , unsigned int revents );
43- static void uv__udp_recvmsg (uv_loop_t * loop , uv__io_t * w , unsigned int revents );
44- static void uv__udp_sendmsg (uv_loop_t * loop , uv__io_t * w , unsigned int revents );
42+ static void uv__udp_recvmsg (uv_udp_t * handle );
43+ static void uv__udp_sendmsg (uv_udp_t * handle );
4544static int uv__udp_maybe_deferred_bind (uv_udp_t * handle ,
4645 int domain ,
4746 unsigned int flags );
@@ -65,25 +64,19 @@ void uv__udp_finish_close(uv_udp_t* handle) {
6564 assert (!uv__io_active (& handle -> io_watcher , UV__POLLIN | UV__POLLOUT ));
6665 assert (handle -> io_watcher .fd == -1 );
6766
68- uv__udp_run_completed (handle );
69-
7067 while (!QUEUE_EMPTY (& handle -> write_queue )) {
7168 q = QUEUE_HEAD (& handle -> write_queue );
7269 QUEUE_REMOVE (q );
7370
7471 req = QUEUE_DATA (q , uv_udp_send_t , queue );
75- uv__req_unregister (handle -> loop , req );
76-
77- if (req -> bufs != req -> bufsml )
78- free (req -> bufs );
79- req -> bufs = NULL ;
80-
81- if (req -> send_cb != NULL )
82- req -> send_cb (req , - ECANCELED );
72+ req -> status = - ECANCELED ;
73+ QUEUE_INSERT_TAIL (& handle -> write_completed_queue , & req -> queue );
8374 }
8475
85- handle -> send_queue_size = 0 ;
86- handle -> send_queue_count = 0 ;
76+ uv__udp_run_completed (handle );
77+
78+ assert (handle -> send_queue_size == 0 );
79+ assert (handle -> send_queue_count == 0 );
8780
8881 /* Now tear down the handle. */
8982 handle -> recv_cb = NULL ;
@@ -92,52 +85,6 @@ void uv__udp_finish_close(uv_udp_t* handle) {
9285}
9386
9487
95- static void uv__udp_run_pending (uv_udp_t * handle ) {
96- uv_udp_send_t * req ;
97- QUEUE * q ;
98- struct msghdr h ;
99- ssize_t size ;
100-
101- while (!QUEUE_EMPTY (& handle -> write_queue )) {
102- q = QUEUE_HEAD (& handle -> write_queue );
103- assert (q != NULL );
104-
105- req = QUEUE_DATA (q , uv_udp_send_t , queue );
106- assert (req != NULL );
107-
108- memset (& h , 0 , sizeof h );
109- h .msg_name = & req -> addr ;
110- h .msg_namelen = (req -> addr .ss_family == AF_INET6 ?
111- sizeof (struct sockaddr_in6 ) : sizeof (struct sockaddr_in ));
112- h .msg_iov = (struct iovec * ) req -> bufs ;
113- h .msg_iovlen = req -> nbufs ;
114-
115- do {
116- size = sendmsg (handle -> io_watcher .fd , & h , 0 );
117- }
118- while (size == -1 && errno == EINTR );
119-
120- /* TODO try to write once or twice more in the
121- * hope that the socket becomes readable again?
122- */
123- if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ))
124- break ;
125-
126- req -> status = (size == -1 ? - errno : size );
127-
128- /* Sending a datagram is an atomic operation: either all data
129- * is written or nothing is (and EMSGSIZE is raised). That is
130- * why we don't handle partial writes. Just pop the request
131- * off the write queue and onto the completed queue, done.
132- */
133- handle -> send_queue_size -= uv__count_bufs (req -> bufs , req -> nbufs );
134- handle -> send_queue_count -- ;
135- QUEUE_REMOVE (& req -> queue );
136- QUEUE_INSERT_TAIL (& handle -> write_completed_queue , & req -> queue );
137- }
138- }
139-
140-
14188static void uv__udp_run_completed (uv_udp_t * handle ) {
14289 uv_udp_send_t * req ;
14390 QUEUE * q ;
@@ -149,6 +96,9 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
14996 req = QUEUE_DATA (q , uv_udp_send_t , queue );
15097 uv__req_unregister (handle -> loop , req );
15198
99+ handle -> send_queue_size -= uv__count_bufs (req -> bufs , req -> nbufs );
100+ handle -> send_queue_count -- ;
101+
152102 if (req -> bufs != req -> bufsml )
153103 free (req -> bufs );
154104 req -> bufs = NULL ;
@@ -164,33 +114,40 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
164114 else
165115 req -> send_cb (req , req -> status );
166116 }
117+
118+ if (QUEUE_EMPTY (& handle -> write_queue )) {
119+ /* Pending queue and completion queue empty, stop watcher. */
120+ uv__io_stop (handle -> loop , & handle -> io_watcher , UV__POLLOUT );
121+ if (!uv__io_active (& handle -> io_watcher , UV__POLLIN ))
122+ uv__handle_stop (handle );
123+ }
167124}
168125
169126
170127static void uv__udp_io (uv_loop_t * loop , uv__io_t * w , unsigned int revents ) {
128+ uv_udp_t * handle ;
129+
130+ handle = container_of (w , uv_udp_t , io_watcher );
131+ assert (handle -> type == UV_UDP );
132+
171133 if (revents & UV__POLLIN )
172- uv__udp_recvmsg (loop , w , revents );
134+ uv__udp_recvmsg (handle );
173135
174- if (revents & UV__POLLOUT )
175- uv__udp_sendmsg (loop , w , revents );
136+ if (revents & UV__POLLOUT ) {
137+ uv__udp_sendmsg (handle );
138+ uv__udp_run_completed (handle );
139+ }
176140}
177141
178142
179- static void uv__udp_recvmsg (uv_loop_t * loop ,
180- uv__io_t * w ,
181- unsigned int revents ) {
143+ static void uv__udp_recvmsg (uv_udp_t * handle ) {
182144 struct sockaddr_storage peer ;
183145 struct msghdr h ;
184- uv_udp_t * handle ;
185146 ssize_t nread ;
186147 uv_buf_t buf ;
187148 int flags ;
188149 int count ;
189150
190- handle = container_of (w , uv_udp_t , io_watcher );
191- assert (handle -> type == UV_UDP );
192- assert (revents & UV__POLLIN );
193-
194151 assert (handle -> recv_cb != NULL );
195152 assert (handle -> alloc_cb != NULL );
196153
@@ -247,34 +204,46 @@ static void uv__udp_recvmsg(uv_loop_t* loop,
247204}
248205
249206
250- static void uv__udp_sendmsg (uv_loop_t * loop ,
251- uv__io_t * w ,
252- unsigned int revents ) {
253- uv_udp_t * handle ;
254-
255- handle = container_of (w , uv_udp_t , io_watcher );
256- assert (handle -> type == UV_UDP );
257- assert (revents & UV__POLLOUT );
207+ static void uv__udp_sendmsg (uv_udp_t * handle ) {
208+ uv_udp_send_t * req ;
209+ QUEUE * q ;
210+ struct msghdr h ;
211+ ssize_t size ;
258212
259213 assert (!QUEUE_EMPTY (& handle -> write_queue )
260214 || !QUEUE_EMPTY (& handle -> write_completed_queue ));
261215
262- /* Write out pending data first. */
263- uv__udp_run_pending (handle );
216+ while (!QUEUE_EMPTY (& handle -> write_queue )) {
217+ q = QUEUE_HEAD (& handle -> write_queue );
218+ assert (q != NULL );
264219
265- /* Drain 'request completed' queue. */
266- uv__udp_run_completed ( handle );
220+ req = QUEUE_DATA ( q , uv_udp_send_t , queue );
221+ assert ( req != NULL );
267222
268- if (!QUEUE_EMPTY (& handle -> write_completed_queue )) {
269- /* Schedule completion callbacks. */
270- uv__io_feed (handle -> loop , & handle -> io_watcher );
271- }
272- else if (QUEUE_EMPTY (& handle -> write_queue )) {
273- /* Pending queue and completion queue empty, stop watcher. */
274- uv__io_stop (loop , & handle -> io_watcher , UV__POLLOUT );
223+ memset (& h , 0 , sizeof h );
224+ h .msg_name = & req -> addr ;
225+ h .msg_namelen = (req -> addr .ss_family == AF_INET6 ?
226+ sizeof (struct sockaddr_in6 ) : sizeof (struct sockaddr_in ));
227+ h .msg_iov = (struct iovec * ) req -> bufs ;
228+ h .msg_iovlen = req -> nbufs ;
275229
276- if (!uv__io_active (& handle -> io_watcher , UV__POLLIN ))
277- uv__handle_stop (handle );
230+ do {
231+ size = sendmsg (handle -> io_watcher .fd , & h , 0 );
232+ } while (size == -1 && errno == EINTR );
233+
234+ if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ))
235+ break ;
236+
237+ req -> status = (size == -1 ? - errno : size );
238+
239+ /* Sending a datagram is an atomic operation: either all data
240+ * is written or nothing is (and EMSGSIZE is raised). That is
241+ * why we don't handle partial writes. Just pop the request
242+ * off the write queue and onto the completed queue, done.
243+ */
244+ QUEUE_REMOVE (& req -> queue );
245+ QUEUE_INSERT_TAIL (& handle -> write_completed_queue , & req -> queue );
246+ uv__io_feed (handle -> loop , & handle -> io_watcher );
278247 }
279248}
280249
@@ -415,15 +384,21 @@ int uv__udp_send(uv_udp_send_t* req,
415384 unsigned int addrlen ,
416385 uv_udp_send_cb send_cb ) {
417386 int err ;
387+ int empty_queue ;
418388
419389 assert (nbufs > 0 );
420390
421391 err = uv__udp_maybe_deferred_bind (handle , addr -> sa_family , 0 );
422392 if (err )
423393 return err ;
424394
425- uv__req_init (handle -> loop , req , UV_UDP_SEND );
395+ /* It's legal for send_queue_count > 0 even when the write_queue is empty;
396+ * it means there are error-state requests in the write_completed_queue that
397+ * will touch up send_queue_size/count later.
398+ */
399+ empty_queue = (handle -> send_queue_count == 0 );
426400
401+ uv__req_init (handle -> loop , req , UV_UDP_SEND );
427402 assert (addrlen <= sizeof (req -> addr ));
428403 memcpy (& req -> addr , addr , addrlen );
429404 req -> send_cb = send_cb ;
@@ -441,9 +416,13 @@ int uv__udp_send(uv_udp_send_t* req,
441416 handle -> send_queue_size += uv__count_bufs (req -> bufs , req -> nbufs );
442417 handle -> send_queue_count ++ ;
443418 QUEUE_INSERT_TAIL (& handle -> write_queue , & req -> queue );
444- uv__io_start (handle -> loop , & handle -> io_watcher , UV__POLLOUT );
445419 uv__handle_start (handle );
446420
421+ if (empty_queue )
422+ uv__udp_sendmsg (handle );
423+ else
424+ uv__io_start (handle -> loop , & handle -> io_watcher , UV__POLLOUT );
425+
447426 return 0 ;
448427}
449428
0 commit comments