Skip to content

Commit 1e86ca5

Browse files
authored
[backport][iomgr][EventEngine] Improve server handling of file descriptor exhaustion (#33672)
Backport of #33656
1 parent aff3066 commit 1e86ca5

7 files changed

Lines changed: 106 additions & 14 deletions

File tree

src/core/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1909,6 +1909,7 @@ grpc_cc_library(
19091909
"posix_event_engine_tcp_socket_utils",
19101910
"socket_mutator",
19111911
"status_helper",
1912+
"time",
19121913
"//:event_engine_base_hdrs",
19131914
"//:gpr",
19141915
],

src/core/lib/event_engine/posix_engine/posix_engine.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport,
196196
const DNSResolver::ResolverOptions& options) override;
197197
void Run(Closure* closure) override;
198198
void Run(absl::AnyInvocable<void()> closure) override;
199+
// Caution!! The timer implementation cannot create any fds. See #20418.
199200
TaskHandle RunAfter(Duration when, Closure* closure) override;
200201
TaskHandle RunAfter(Duration when,
201202
absl::AnyInvocable<void()> closure) override;

src/core/lib/event_engine/posix_engine/posix_engine_listener.cc

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
#include <sys/socket.h> // IWYU pragma: keep
2424
#include <unistd.h> // IWYU pragma: keep
2525

26+
#include <atomic>
2627
#include <string>
28+
#include <tuple>
2729
#include <utility>
2830

2931
#include "absl/functional/any_invocable.h"
@@ -41,6 +43,7 @@
4143
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
4244
#include "src/core/lib/event_engine/tcp_socket_utils.h"
4345
#include "src/core/lib/gprpp/status_helper.h"
46+
#include "src/core/lib/gprpp/time.h"
4447
#include "src/core/lib/iomgr/socket_mutator.h"
4548

4649
namespace grpc_event_engine {
@@ -133,6 +136,32 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
133136
switch (errno) {
134137
case EINTR:
135138
continue;
139+
case EMFILE:
140+
// When the process runs out of fds, accept4() returns EMFILE. When
141+
// this happens, the connection is left in the accept queue until
142+
// either a read event triggers the on_read callback, or time has
143+
// passed and the accept should be re-tried regardless. This callback
144+
// is not cancelled, so a spurious wakeup may occur even when there's
145+
// nothing to accept. This is not a performant code path, but if an fd
146+
// limit has been reached, the system is likely in an unhappy state
147+
// regardless.
148+
GRPC_LOG_EVERY_N_SEC(1, "%s",
149+
"File descriptor limit reached. Retrying.");
150+
handle_->NotifyOnRead(notify_on_accept_);
151+
// Do not schedule another timer if one is already armed.
152+
if (retry_timer_armed_.exchange(true)) return;
153+
// Hold a ref while the retry timer is waiting, to prevent listener
154+
// destruction and the races that would ensue.
155+
Ref();
156+
std::ignore =
157+
engine_->RunAfter(grpc_core::Duration::Seconds(1), [this]() {
158+
retry_timer_armed_.store(false);
159+
if (!handle_->IsHandleShutdown()) {
160+
handle_->SetReadable();
161+
}
162+
Unref();
163+
});
164+
return;
136165
case EAGAIN:
137166
case ECONNABORTED:
138167
handle_->NotifyOnRead(notify_on_accept_);

src/core/lib/event_engine/posix_engine/posix_engine_listener.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ class PosixEngineListenerImpl
121121
ListenerSocketsContainer::ListenerSocket socket_;
122122
EventHandle* handle_;
123123
PosixEngineClosure* notify_on_accept_;
124+
// Tracks the status of a backup timer to retry accept4 calls after file
125+
// descriptor exhaustion.
126+
std::atomic<bool> retry_timer_armed_{false};
124127
};
125128
class ListenerAsyncAcceptors : public ListenerSocketsContainer {
126129
public:

src/core/lib/iomgr/tcp_server_posix.cc

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616
//
1717
//
1818

19+
#include <grpc/support/port_platform.h>
20+
21+
#include <utility>
22+
23+
#include <grpc/support/atm.h>
24+
1925
// FIXME: "posix" files shouldn't be depending on _GNU_SOURCE
2026
#ifndef _GNU_SOURCE
2127
#define _GNU_SOURCE
2228
#endif
2329

24-
#include <grpc/support/port_platform.h>
25-
2630
#include "src/core/lib/iomgr/port.h"
2731

2832
#ifdef GRPC_POSIX_SOCKET_TCP_SERVER
@@ -45,6 +49,7 @@
4549

4650
#include <grpc/byte_buffer.h>
4751
#include <grpc/event_engine/endpoint_config.h>
52+
#include <grpc/event_engine/event_engine.h>
4853
#include <grpc/support/alloc.h>
4954
#include <grpc/support/log.h>
5055
#include <grpc/support/sync.h>
@@ -74,6 +79,8 @@
7479
#include "src/core/lib/transport/error_utils.h"
7580

7681
static std::atomic<int64_t> num_dropped_connections{0};
82+
static constexpr grpc_core::Duration kRetryAcceptWaitTime{
83+
grpc_core::Duration::Seconds(1)};
7784

7885
using ::grpc_event_engine::experimental::EndpointConfig;
7986
using ::grpc_event_engine::experimental::EventEngine;
@@ -350,22 +357,38 @@ static void on_read(void* arg, grpc_error_handle err) {
350357
if (fd < 0) {
351358
if (errno == EINTR) {
352359
continue;
353-
} else if (errno == EAGAIN || errno == ECONNABORTED ||
354-
errno == EWOULDBLOCK) {
360+
}
361+
// When the process runs out of fds, accept4() returns EMFILE. When this
362+
// happens, the connection is left in the accept queue until either a
363+
// read event triggers the on_read callback, or time has passed and the
364+
// accept should be re-tried regardless. This callback is not cancelled,
365+
// so a spurious wakeup may occur even when there's nothing to accept.
366+
// This is not a performant code path, but if an fd limit has been
367+
// reached, the system is likely in an unhappy state regardless.
368+
if (errno == EMFILE) {
369+
GRPC_LOG_EVERY_N_SEC(1, "%s",
370+
"File descriptor limit reached. Retrying.");
371+
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
372+
if (gpr_atm_full_xchg(&sp->retry_timer_armed, true)) return;
373+
grpc_timer_init(&sp->retry_timer,
374+
grpc_core::Timestamp::Now() + kRetryAcceptWaitTime,
375+
&sp->retry_closure);
376+
return;
377+
}
378+
if (errno == EAGAIN || errno == ECONNABORTED || errno == EWOULDBLOCK) {
355379
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
356380
return;
381+
}
382+
gpr_mu_lock(&sp->server->mu);
383+
if (!sp->server->shutdown_listeners) {
384+
gpr_log(GPR_ERROR, "Failed accept4: %s",
385+
grpc_core::StrError(errno).c_str());
357386
} else {
358-
gpr_mu_lock(&sp->server->mu);
359-
if (!sp->server->shutdown_listeners) {
360-
gpr_log(GPR_ERROR, "Failed accept4: %s",
361-
grpc_core::StrError(errno).c_str());
362-
} else {
363-
// if we have shutdown listeners, accept4 could fail, and we
364-
// needn't notify users
365-
}
366-
gpr_mu_unlock(&sp->server->mu);
367-
goto error;
387+
// if we have shutdown listeners, accept4 could fail, and we
388+
// needn't notify users
368389
}
390+
gpr_mu_unlock(&sp->server->mu);
391+
goto error;
369392
}
370393

371394
if (sp->server->memory_quota->IsMemoryPressureHigh()) {
@@ -558,6 +581,7 @@ static grpc_error_handle clone_port(grpc_tcp_listener* listener,
558581
sp->port_index = listener->port_index;
559582
sp->fd_index = listener->fd_index + count - i;
560583
GPR_ASSERT(sp->emfd);
584+
grpc_tcp_server_listener_initialize_retry_timer(sp);
561585
while (listener->server->tail->next != nullptr) {
562586
listener->server->tail = listener->server->tail->next;
563587
}
@@ -791,6 +815,7 @@ static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
791815
if (s->active_ports) {
792816
grpc_tcp_listener* sp;
793817
for (sp = s->head; sp; sp = sp->next) {
818+
grpc_timer_cancel(&sp->retry_timer);
794819
grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE("Server shutdown"));
795820
}
796821
}

src/core/lib/iomgr/tcp_server_utils_posix.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "src/core/lib/iomgr/resolve_address.h"
3131
#include "src/core/lib/iomgr/socket_utils_posix.h"
3232
#include "src/core/lib/iomgr/tcp_server.h"
33+
#include "src/core/lib/iomgr/timer.h"
3334
#include "src/core/lib/resource_quota/memory_quota.h"
3435

3536
// one listening port
@@ -52,6 +53,11 @@ typedef struct grpc_tcp_listener {
5253
// identified while iterating through 'next'.
5354
struct grpc_tcp_listener* sibling;
5455
int is_sibling;
56+
// If an accept4() call fails, a timer is started to drain the accept queue in
57+
// case no further connection attempts reach the gRPC server.
58+
grpc_closure retry_closure;
59+
grpc_timer retry_timer;
60+
gpr_atm retry_timer_armed;
5561
} grpc_tcp_listener;
5662

5763
// the overall server
@@ -139,4 +145,10 @@ grpc_error_handle grpc_tcp_server_prepare_socket(
139145
// Ruturn true if the platform supports ifaddrs
140146
bool grpc_tcp_server_have_ifaddrs(void);
141147

148+
// Initialize (but don't start) the timer and callback to retry accept4() on a
149+
// listening socket after file descriptors have been exhausted. This must be
150+
// called when creating a new listener.
151+
void grpc_tcp_server_listener_initialize_retry_timer(
152+
grpc_tcp_listener* listener);
153+
142154
#endif // GRPC_SRC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H

src/core/lib/iomgr/tcp_server_utils_posix_common.cc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
#include <grpc/support/port_platform.h>
2020

21+
#include <grpc/support/atm.h>
22+
2123
#include "src/core/lib/iomgr/port.h"
2224

2325
#ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
@@ -81,6 +83,24 @@ static int get_max_accept_queue_size(void) {
8183
return s_max_accept_queue_size;
8284
}
8385

86+
static void listener_retry_timer_cb(void* arg, grpc_error_handle err) {
87+
// Do nothing if cancelled.
88+
if (!err.ok()) return;
89+
grpc_tcp_listener* listener = static_cast<grpc_tcp_listener*>(arg);
90+
gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
91+
if (!grpc_fd_is_shutdown(listener->emfd)) {
92+
grpc_fd_set_readable(listener->emfd);
93+
}
94+
}
95+
96+
void grpc_tcp_server_listener_initialize_retry_timer(
97+
grpc_tcp_listener* listener) {
98+
gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
99+
grpc_timer_init_unset(&listener->retry_timer);
100+
GRPC_CLOSURE_INIT(&listener->retry_closure, listener_retry_timer_cb, listener,
101+
grpc_schedule_on_exec_ctx);
102+
}
103+
84104
static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd,
85105
const grpc_resolved_address* addr,
86106
unsigned port_index,
@@ -112,6 +132,7 @@ static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd,
112132
sp->server = s;
113133
sp->fd = fd;
114134
sp->emfd = grpc_fd_create(fd, name.c_str(), true);
135+
grpc_tcp_server_listener_initialize_retry_timer(sp);
115136

116137
// Check and set fd as prellocated
117138
if (grpc_tcp_server_pre_allocated_fd(s) == fd) {

0 commit comments

Comments
 (0)