@@ -116,17 +116,25 @@ public class GrpcServerImpl implements RPCServer {
116116
117117 private static final long NANOSECONDS_IN_MS = TimeUnit .MILLISECONDS .toNanos (1 );
118118
119+ private static final long NANOS_PER_IDLE_CHECK =
120+ TimeUnit .NANOSECONDS .convert (5 , TimeUnit .SECONDS );
121+
119122 private class RunningCommand implements AutoCloseable {
120123 private final Thread thread ;
121124 private final String id ;
122125
123- private RunningCommand () {
126+ private RunningCommand () throws InterruptedException {
124127 thread = Thread .currentThread ();
125128 id = UUID .randomUUID ().toString ();
126129 synchronized (runningCommands ) {
127130 if (runningCommands .isEmpty ()) {
128131 busy ();
129132 }
133+
134+ if (shuttingDown ) {
135+ throw new InterruptedException ();
136+ }
137+
130138 runningCommands .put (id , this );
131139 runningCommands .notify ();
132140 }
@@ -443,21 +451,57 @@ public void write(int byteAsInt) throws IOException {
443451 }
444452 }
445453
454+ // The synchronized block is here so that if the "PID file deleted" timer or the idle shutdown
455+ // mechanism kicks in during a regular shutdown, they don't race.
456+ @ VisibleForTesting // productionVisibility = Visibility.PRIVATE
457+ void signalShutdown () {
458+ synchronized (runningCommands ) {
459+ shuttingDown = true ;
460+ server .shutdown ();
461+ }
462+ }
463+
446464 /**
447- * A thread that watches if the PID file changes and shuts down the server immediately if so.
465+ * A thread that shuts the server down under the following conditions:
466+ *
467+ * <ul>
468+ * <li>The PID file changes (in this case, *very* quickly)
469+ * <li>The workspace directory is deleted
470+ * <li>There is too much memory pressure on the host
471+ * </ul>
448472 */
449- private class PidFileWatcherThread extends Thread {
450- private boolean shuttingDown = false ;
473+ private class ShutdownWatcherThread extends Thread {
474+ private long lastIdleCheckNanos ;
451475
452- private PidFileWatcherThread () {
453- super ("pid-file -watcher" );
476+ private ShutdownWatcherThread () {
477+ super ("grpc-server-shutdown -watcher" );
454478 setDaemon (true );
455479 }
456480
457- // The synchronized block is here so that if the "PID file deleted" timer kicks in during a
458- // regular shutdown, they don't race.
459- private synchronized void signalShutdown () {
460- shuttingDown = true ;
481+ private void doIdleChecksMaybe () {
482+ synchronized (runningCommands ) {
483+ if (!runningCommands .isEmpty ()) {
484+ lastIdleCheckNanos = -1 ;
485+ return ;
486+ }
487+
488+ long currentNanos = BlazeClock .nanoTime ();
489+ if (lastIdleCheckNanos == -1 ) {
490+ lastIdleCheckNanos = currentNanos ;
491+ return ;
492+ }
493+
494+ if (currentNanos - lastIdleCheckNanos < NANOS_PER_IDLE_CHECK ) {
495+ return ;
496+ }
497+
498+ if (!idleServerTasks .continueProcessing ()) {
499+ signalShutdown ();
500+ server .shutdown ();
501+ }
502+
503+ lastIdleCheckNanos = currentNanos ;
504+ }
461505 }
462506
463507 @ Override
@@ -473,8 +517,12 @@ public void run() {
473517 // Handled by virtue of ok not being set to true
474518 }
475519
520+ if (ok ) {
521+ doIdleChecksMaybe ();
522+ }
523+
476524 if (!ok ) {
477- synchronized (PidFileWatcherThread .this ) {
525+ synchronized (ShutdownWatcherThread .this ) {
478526 if (shuttingDown ) {
479527 logger .warning ("PID file deleted or overwritten but shutdown is already in progress" );
480528 break ;
@@ -510,24 +558,21 @@ public void run() {
510558 private final String responseCookie ;
511559 private final AtomicLong interruptCounter = new AtomicLong (0 );
512560 private final int maxIdleSeconds ;
513- private final PidFileWatcherThread pidFileWatcherThread ;
561+ private final ShutdownWatcherThread shutdownWatcherThread ;
514562 private final Path pidFile ;
515563 private final String pidInFile ;
516564 private final List <Path > filesToDeleteAtExit = new ArrayList <>();
517565 private final int port ;
518566
519567 private Server server ;
520568 private IdleServerTasks idleServerTasks ;
521- boolean serving ;
569+ private InetSocketAddress address ;
570+ private boolean serving ;
571+ private boolean shuttingDown = false ;
522572
523573 public GrpcServerImpl (CommandExecutor commandExecutor , Clock clock , int port ,
524574 Path workspace , Path serverDirectory , int maxIdleSeconds ) throws IOException {
525- Runtime .getRuntime ().addShutdownHook (new Thread () {
526- @ Override
527- public void run () {
528- shutdownHook ();
529- }
530- });
575+ Runtime .getRuntime ().addShutdownHook (new Thread (() -> shutdownHook ()));
531576
532577 // server.pid was written in the C++ launcher after fork() but before exec() .
533578 // The client only accesses the pid file after connecting to the socket
@@ -556,12 +601,22 @@ public void run() {
556601 requestCookie = generateCookie (random , 16 );
557602 responseCookie = generateCookie (random , 16 );
558603
559- pidFileWatcherThread = new PidFileWatcherThread ();
560- pidFileWatcherThread .start ();
604+ shutdownWatcherThread = new ShutdownWatcherThread ();
605+ shutdownWatcherThread .start ();
561606 idleServerTasks = new IdleServerTasks (workspace );
562607 idleServerTasks .idle ();
563608 }
564609
610+ @ VisibleForTesting // productionVisibility = Visibility.PRIVATE
611+ String getRequestCookie () {
612+ return requestCookie ;
613+ }
614+
615+ @ VisibleForTesting // productionVisibility = Visibility.PRIVATE
616+ InetSocketAddress getAddress () {
617+ return address ;
618+ }
619+
565620 private void idle () {
566621 Preconditions .checkState (idleServerTasks == null );
567622 idleServerTasks = new IdleServerTasks (workspace );
@@ -646,7 +701,7 @@ private void timeoutThread() {
646701 }
647702 }
648703
649- server . shutdown ();
704+ signalShutdown ();
650705 }
651706
652707 /**
@@ -672,7 +727,7 @@ private void timeoutThread() {
672727 @ Override
673728 public void prepareForAbruptShutdown () {
674729 disableShutdownHooks ();
675- pidFileWatcherThread . signalShutdown ();
730+ signalShutdown ();
676731 }
677732
678733 @ Override
@@ -719,7 +774,7 @@ public void serve() throws IOException {
719774 timeoutThread .start ();
720775 }
721776 serving = true ;
722-
777+ this . address = new InetSocketAddress ( address . getAddress (), server . getPort ());
723778 writeServerFile (
724779 PORT_FILE , InetAddresses .toUriString (address .getAddress ()) + ":" + server .getPort ());
725780 writeServerFile (REQUEST_COOKIE_FILE , requestCookie );
@@ -761,9 +816,7 @@ private void shutdownHook() {
761816 }
762817 }
763818
764- /**
765- * Schedule the specified file for (attempted) deletion at JVM exit.
766- */
819+ /** Schedule the specified file for (attempted) deletion at JVM exit. */
767820 protected void deleteAtExit (final Path path ) {
768821 synchronized (filesToDeleteAtExit ) {
769822 filesToDeleteAtExit .add (path );
@@ -783,8 +836,8 @@ static void printStack(IOException e) {
783836 logger .severe (err .toString ());
784837 }
785838
786- private void executeCommand (
787- RunRequest request , StreamObserver <RunResponse > observer , GrpcSink sink ) {
839+ @ VisibleForTesting // productionVisibility = Visibility.PRIVATE
840+ void executeCommand ( RunRequest request , StreamObserver <RunResponse > observer , GrpcSink sink ) {
788841 sink .setCommandThread (Thread .currentThread ());
789842
790843 if (!request .getCookie ().equals (requestCookie ) || request .getClientDescription ().isEmpty ()) {
@@ -880,7 +933,7 @@ private void executeCommand(
880933
881934 boolean shutdown = commandExecutor .shutdown ();
882935 if (shutdown ) {
883- server . shutdown ();
936+ signalShutdown ();
884937 }
885938 RunResponse response =
886939 RunResponse .newBuilder ()
@@ -927,6 +980,8 @@ public void ping(PingRequest pingRequest, StreamObserver<PingResponse> streamObs
927980
928981 streamObserver .onNext (response .build ());
929982 streamObserver .onCompleted ();
983+ } catch (InterruptedException e ) {
984+ // Ignore, we are shutting down anyway
930985 }
931986 }
932987
@@ -969,6 +1024,8 @@ private void doCancel(
9691024 logger .info (
9701025 "Client cancelled RPC of cancellation request for " + request .getCommandId ());
9711026 }
1027+ } catch (InterruptedException e ) {
1028+ // Ignore, we are shutting down anyway
9721029 }
9731030 }
9741031 };
0 commit comments