Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
dgram: use uv_udp_try_send()
This improves dgram performance by avoiding unnecessary async
operations.

One issue with this commit is that it seems hard to actually create
conditions under which the fallback path to the async case is
actually taken, for all supported OS, so an internal CLI option
is used for testing that path.

Another caveat is that the lack of an async operation means
that there are slight timing differences (essentially `nextTick()`
rather than `setImmediate()` for the send callback).
  • Loading branch information
addaleax committed Oct 3, 2019
commit 828e08b976d049245527f57b11a38567f8624b79
20 changes: 14 additions & 6 deletions benchmark/dgram/array-vs-concat.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,25 @@ function main({ dur, len, num, type, chunks }) {

function onsendConcat() {
if (sent++ % num === 0) {
for (var i = 0; i < num; i++) {
socket.send(Buffer.concat(chunk), PORT, '127.0.0.1', onsend);
}
// The setImmediate() is necessary to have event loop progress on OSes
// that only perform synchronous I/O on nonblocking UDP sockets.
setImmediate(() => {
for (var i = 0; i < num; i++) {
socket.send(Buffer.concat(chunk), PORT, '127.0.0.1', onsend);
}
});
}
}

function onsendMulti() {
if (sent++ % num === 0) {
for (var i = 0; i < num; i++) {
socket.send(chunk, PORT, '127.0.0.1', onsend);
}
// The setImmediate() is necessary to have event loop progress on OSes
// that only perform synchronous I/O on nonblocking UDP sockets.
setImmediate(() => {
for (var i = 0; i < num; i++) {
socket.send(chunk, PORT, '127.0.0.1', onsend);
}
});
}
}

Expand Down
10 changes: 7 additions & 3 deletions benchmark/dgram/multi-buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ function main({ dur, len, num, type, chunks }) {

function onsend() {
if (sent++ % num === 0) {
for (var i = 0; i < num; i++) {
socket.send(chunk, PORT, '127.0.0.1', onsend);
}
// The setImmediate() is necessary to have event loop progress on OSes
// that only perform synchronous I/O on nonblocking UDP sockets.
setImmediate(() => {
for (var i = 0; i < num; i++) {
socket.send(chunk, PORT, '127.0.0.1', onsend);
}
});
}
}

Expand Down
10 changes: 7 additions & 3 deletions benchmark/dgram/offset-length.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ function main({ dur, len, num, type }) {

function onsend() {
if (sent++ % num === 0) {
for (var i = 0; i < num; i++) {
socket.send(chunk, 0, chunk.length, PORT, '127.0.0.1', onsend);
}
// The setImmediate() is necessary to have event loop progress on OSes
// that only perform synchronous I/O on nonblocking UDP sockets.
setImmediate(() => {
for (var i = 0; i < num; i++) {
socket.send(chunk, 0, chunk.length, PORT, '127.0.0.1', onsend);
}
});
}
}

Expand Down
10 changes: 7 additions & 3 deletions benchmark/dgram/single-buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ function main({ dur, len, num, type }) {

function onsend() {
if (sent++ % num === 0) {
for (var i = 0; i < num; i++) {
socket.send(chunk, PORT, '127.0.0.1', onsend);
}
// The setImmediate() is necessary to have event loop progress on OSes
// that only perform synchronous I/O on nonblocking UDP sockets.
setImmediate(() => {
for (var i = 0; i < num; i++) {
socket.send(chunk, PORT, '127.0.0.1', onsend);
}
});
}
}

Expand Down
7 changes: 7 additions & 0 deletions lib/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,13 @@ function doSend(ex, self, ip, list, address, port, callback) {
else
err = state.handle.send(req, list, list.length, !!callback);

if (err >= 1) {
// Synchronous finish.
if (callback)
process.nextTick(callback, null, err - 1);
Comment thread
addaleax marked this conversation as resolved.
return;
}

if (err && callback) {
// Don't emit as error, dgram_legacy.js compatibility
const ex = exceptionWithHostPort(err, 'send', address, port);
Expand Down
2 changes: 2 additions & 0 deletions src/node_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ EnvironmentOptionsParser::EnvironmentOptionsParser() {
"write warnings to file instead of stderr",
&EnvironmentOptions::redirect_warnings,
kAllowedInEnvironment);
AddOption("--test-udp-no-try-send", "", // For testing only.
Comment thread
mscdex marked this conversation as resolved.
&EnvironmentOptions::test_udp_no_try_send);
AddOption("--throw-deprecation",
"throw an exception on deprecations",
&EnvironmentOptions::throw_deprecation,
Expand Down
1 change: 1 addition & 0 deletions src/node_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class EnvironmentOptions : public Options {
bool heap_prof = false;
#endif // HAVE_INSPECTOR
std::string redirect_warnings;
bool test_udp_no_try_send = false;
bool throw_deprecation = false;
bool trace_deprecation = false;
bool trace_sync_io = false;
Expand Down
44 changes: 33 additions & 11 deletions src/udp_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -429,11 +429,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
size_t count = args[2].As<Uint32>()->Value();
const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();

SendWrap* req_wrap;
{
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
}
size_t msg_size = 0;

MaybeStackBuffer<uv_buf_t, 16> bufs(count);
Expand All @@ -448,8 +443,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
msg_size += length;
}

req_wrap->msg_size = msg_size;

int err = 0;
struct sockaddr_storage addr_storage;
sockaddr* addr = nullptr;
Expand All @@ -462,18 +455,47 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
}
}

uv_buf_t* bufs_ptr = *bufs;
if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) {
err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr);
if (err == UV_ENOSYS || err == UV_EAGAIN) {
err = 0;
} else if (err >= 0) {
size_t sent = err;
while (count > 0 && bufs_ptr->len <= sent) {
sent -= bufs_ptr->len;
bufs_ptr++;
count--;
}
if (count > 0) {
CHECK_LE(sent, bufs_ptr->len);
Comment thread
addaleax marked this conversation as resolved.
Outdated
bufs_ptr->base += sent;
bufs_ptr->len -= sent;
} else {
CHECK_EQ(static_cast<size_t>(err), msg_size);
// + 1 so that the JS side can distinguish 0-length async sends from
// 0-length sync sends.
args.GetReturnValue().Set(static_cast<uint32_t>(msg_size) + 1);
return;
}
}
}

if (err == 0) {
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
req_wrap->msg_size = msg_size;

err = req_wrap->Dispatch(uv_udp_send,
&wrap->handle_,
*bufs,
bufs_ptr,
count,
addr,
OnSend);
if (err)
delete req_wrap;
}

if (err)
delete req_wrap;

args.GetReturnValue().Set(err);
}

Expand Down
1 change: 1 addition & 0 deletions test/async-hooks/test-udpsendwrap.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Flags: --test-udp-no-try-send
'use strict';

const common = require('../common');
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-dgram-send-callback-recursive.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ function onsend() {
client.on('listening', function() {
port = this.address().port;

setImmediate(function() {
process.nextTick(() => {
async = true;
});

Expand Down
2 changes: 1 addition & 1 deletion test/sequential/test-async-wrap-getasyncid.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use strict';
// Flags: --expose-gc --expose-internals --no-warnings
// Flags: --expose-gc --expose-internals --no-warnings --test-udp-no-try-send

const common = require('../common');
const { internalBinding } = require('internal/test/binding');
Expand Down