2121import java .util .concurrent .Future ;
2222import java .util .concurrent .ScheduledExecutorService ;
2323import java .util .concurrent .ScheduledFuture ;
24- import java .util .concurrent .Semaphore ;
2524import java .util .concurrent .TimeUnit ;
2625import java .util .concurrent .locks .Condition ;
2726import java .util .concurrent .locks .Lock ;
8180 * <li>{@code null} -- context hasn't been {@link #start}ed yet. The context
8281 * SHOULD NOT be used in this state, as it will likely result in an NPE.
8382 * <li>AwaitStarted -- client's opened the stream, sent Start,
84- * and is now awaiting for the server to respond with Started.
83+ * and is now awaiting the server to respond with Started.
8584 * <li>Active -- the server is ready to accept the next Data message.
8685 * <li>InFlight -- the latest batch has been sent, awaiting Acks.
8786 * <li>OOM -- server has OOM'ed and will not accept any more data.
@@ -145,8 +144,8 @@ public final class BatchContext<PropertiesT> implements Closeable {
145144 * <p>
146145 * In the event of abrupt stream termination ({@link Recv#onError} is called),
147146 * the "recv" thread MAY shutdown this service in order to interrupt the "send"
148- * thread; the latter may be blocked on {@link Send #awaitCanSend} or
149- * {@link Send #awaitCanPrepareNext}.
147+ * thread; the latter may be blocked on {@link State #awaitCanSend} or
148+ * {@link State #awaitCanPrepareNext}.
150149 */
151150 private final ExecutorService sendService = Executors .newSingleThreadExecutor ();
152151
@@ -158,6 +157,9 @@ public final class BatchContext<PropertiesT> implements Closeable {
158157 */
159158 private final ScheduledExecutorService scheduledService = Executors .newScheduledThreadPool (1 );
160159
160+ /** Executor for processing {@link Event.Results} and enqueing retried items. */
161+ private final ExecutorService retryService = Executors .newSingleThreadExecutor ();
162+
161163 /** The thread that created the context. */
162164 private final Thread parent = Thread .currentThread ();
163165
@@ -208,20 +210,6 @@ public final class BatchContext<PropertiesT> implements Closeable {
208210 /** stateChanged notifies threads about a state transition. */
209211 private final Condition stateChanged = lock .newCondition ();
210212
211- /**
212- * Releasing a permit notifies the "sender" about an incoming
213- * {@link Event.Results} batch. Acquire a permit to await the next batch.
214- *
215- * <p>
216- * A semaphore provides signal semantics, similar to a {@link Condition},
217- * but without being associated with a predicate. This comes handy when
218- * {@link wip} is being drained after the context is closed, and the "sender"
219- * needs to be notified about incoming {@link Event.Results}; a separate
220- * condition is not necessary, as we can simply probe the {@link queue} to
221- * find out if any new items have been added to it.
222- */
223- private final Semaphore awaitResults = new Semaphore (0 );
224-
225213 /**
226214 * Client-side part of the current stream, created on {@link #start}.
227215 * Other threads MAY use stream but MUST NOT update this field on their own.
@@ -323,6 +311,11 @@ private TaskHandle add(final TaskHandle taskHandle) throws InterruptedException
323311 throw new DuplicateTaskException (taskHandle , existing );
324312 }
325313
314+ // Remove the task from the WIP list as soon as it completes,
315+ // successfully or otherwise. Note, that TaskHandle::done future
316+ // only completes exceptionally after all retries have been exhausted.
317+ taskHandle .done ().whenComplete ((__ , t ) -> wip .remove (taskHandle .id ()));
318+
326319 queue .put (taskHandle );
327320 return taskHandle ;
328321 }
@@ -507,6 +500,7 @@ private void shutdownNow(Exception e) {
507500 private void shutdownExecutors () {
508501 sendService .shutdownNow ();
509502 scheduledService .shutdownNow ();
503+ retryService .shutdownNow ();
510504 shutdownService .shutdownNow ();
511505 scheduledReconnectService .shutdownNow ();
512506 }
@@ -647,7 +641,11 @@ private void trySend() {
647641
648642 TaskHandle task = queue .take ();
649643
650- if (task == TaskHandle .POISON ) {
644+ if (task == TaskHandle .END_RESULTS ) {
645+ // This marker is only relevant when sent after POISON,
646+ // as we process retried items in drainWip.
647+ continue ;
648+ } else if (task == TaskHandle .POISON ) {
651649 assert closed : "queue poisoned before the context is closed" ;
652650
653651 log .debug ("Took poison" );
@@ -673,10 +671,9 @@ private void trySend() {
673671 return ;
674672 }
675673
676- Data data = task .data ();
677- batch .add (data );
674+ batch .add (task .data ());
678675
679- // Retred tasks already exist in the WIP list, replacing them is redundant.
676+ // Retried tasks already exist in the WIP list, replacing them is redundant.
680677 wip .putIfAbsent (task .id (), task );
681678 }
682679 } catch (InterruptedException ignored ) {
@@ -695,14 +692,14 @@ private void trySend() {
695692 * in the while-loop is that nothing is sent <i>unless</i> the batch is full.
696693 */
697694 private void send () throws InterruptedException {
698- log .atInfo ()
699- .addKeyValue ("batch_size_total_bytes" , batch ::sizeBytes )
700- .log ("Send next batch" );
701-
702695 // Continue flushing until we get the batch to not a "not full" state.
703696 // This is to account for the backlog, which might re-fill the batch
704697 // after .clear().
705698 while (batch .isFull ()) {
699+ log .atInfo ()
700+ .addKeyValue ("batch_size_items" , batch ::size )
701+ .addKeyValue ("batch_size_total_bytes" , batch ::sizeBytes )
702+ .log ("Send next batch" );
706703 flush ();
707704 }
708705 assert !batch .isFull () : "batch is full after send" ;
@@ -729,24 +726,22 @@ private void drainWip() throws InterruptedException {
729726 .addKeyValue ("batch_size_total_items" , batch ::size )
730727 .addKeyValue ("wip_tasks" , wip ::size )
731728 .log ("Await Results" );
732- awaitResults .acquire ();
733729
734730 TaskHandle task ;
735- while ((task = queue .poll ()) != null ) {
736- Data data = task .data ();
737- batch .add (data );
731+ while ((task = queue .take ()) != TaskHandle .END_RESULTS ) {
732+ batch .add (task .data ());
738733 }
739734
740735 assert batch .size () <= wip .size () : "batch has more items than wip" ;
741736
742737 if (batch .size () == wip .size ()) {
743738 // This means the batch already contains all items in WIP,
744739 // and no more tasks will be added to the queue until the
745- // current ones are send .
740+ // current ones are sent .
746741 drain ();
747742 } else {
748743 // Only sends if the batch is full. If the batch is not full,
749- // the we can keep accumulating items from the failed tasks.
744+ // then we can keep accumulating items from the failed tasks.
750745 send ();
751746 }
752747 }
@@ -773,7 +768,7 @@ private void drain() throws InterruptedException {
773768 }
774769
775770 /**
776- * Block until the current state allows {@link State#canSend },
771+ * Block until the current state allows {@link State#canSendNext },
777772 * then prepare the batch, send it, and set InFlight state.
778773 * Block until the current state allows {@link State#canPrepareNext}.
779774 *
@@ -953,24 +948,28 @@ private void onResults(Event.Results results) {
953948 .addKeyValue ("wip_tasks" , wip ::size )
954949 .log ("Received Results" );
955950
956- // Remove successfully completed tasks from the WIP list and mark them done.
957- results . successful (). stream ()
958- . map ( wip :: remove ). filter ( Objects :: nonNull )
959- . forEach ( TaskHandle :: setSuccess );
960-
961- // Report errors for failed tasks. Do NOT remove them from the WIP list .
962- results .errors (). keySet ().stream ()
963- .map (wip ::get ).filter (Objects ::nonNull )
964- . forEach ( taskHandle -> taskHandle . setError (
965- new ServerException ( results . errors (). get ( taskHandle . id ()))));
966-
967- log . atDebug ()
968- . addKeyValue ( "count_success" , results . errors (). size () )
969- . addKeyValue ( "count_errors" , results . successful (). size ())
970- . log ( "Received results" );
951+ // Scheduling items to be retried may block temporarily if the queue is not
952+ // adequately sized. Offload the next step to a different thread to avoid
953+ // blocking the internal gRPC thread on which this callback is running.
954+ retryService . execute (() -> {
955+ // Mark successfully completed tasks in the WIP list.
956+ // The whenComplete hook added in .add() will remove them from WIP.
957+ results .successful ().stream ()
958+ .map (wip ::get ).filter (Objects ::nonNull )
959+ . forEach ( TaskHandle :: setSuccess );
960+
961+ // Report errors for failed tasks. Do NOT remove them from the WIP list.
962+ results . errors (). keySet (). stream ()
963+ . map ( wip :: get ). filter ( Objects :: nonNull )
964+ . forEach ( taskHandle -> taskHandle . setError (
965+ new ServerException ( results . errors (). get ( taskHandle . id ()))) );
971966
972- awaitResults .release ();
973- assert awaitResults .availablePermits () == 1 ;
967+ try {
968+ queue .put (TaskHandle .END_RESULTS );
969+ } catch (InterruptedException e ) {
970+ Thread .currentThread ().interrupt ();
971+ }
972+ });
974973 }
975974
976975 private void onBackoff (Event .Backoff backoff ) {
0 commit comments