Skip to content

Commit 1a19079

Browse files
committed
fix(batch): drain retried items
1 parent f1f89ad commit 1a19079

4 files changed

Lines changed: 142 additions & 67 deletions

File tree

src/main/java/io/weaviate/client6/v1/api/collections/batch/BatchContext.java

Lines changed: 49 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.concurrent.Future;
2222
import java.util.concurrent.ScheduledExecutorService;
2323
import java.util.concurrent.ScheduledFuture;
24-
import java.util.concurrent.Semaphore;
2524
import java.util.concurrent.TimeUnit;
2625
import java.util.concurrent.locks.Condition;
2726
import java.util.concurrent.locks.Lock;
@@ -81,7 +80,7 @@
8180
* <li>{@code null} -- context hasn't been {@link #start}ed yet. The context
8281
* SHOULD NOT be used in this state, as it will likely result in an NPE.
8382
* <li>AwaitStarted -- client's opened the stream, sent Start,
84-
* and is now awaiting for the server to respond with Started.
83+
* and is now awaiting the server to respond with Started.
8584
* <li>Active -- the server is ready to accept the next Data message.
8685
* <li>InFlight -- the latest batch has been sent, awaiting Acks.
8786
* <li>OOM -- server has OOM'ed and will not accept any more data.
@@ -145,8 +144,8 @@ public final class BatchContext<PropertiesT> implements Closeable {
145144
* <p>
146145
* In the event of abrupt stream termination ({@link Recv#onError} is called),
147146
* the "recv" thread MAY shutdown this service in order to interrupt the "send"
148-
* thread; the latter may be blocked on {@link Send#awaitCanSend} or
149-
* {@link Send#awaitCanPrepareNext}.
147+
* thread; the latter may be blocked on {@link State#awaitCanSend} or
148+
* {@link State#awaitCanPrepareNext}.
150149
*/
151150
private final ExecutorService sendService = Executors.newSingleThreadExecutor();
152151

@@ -158,6 +157,9 @@ public final class BatchContext<PropertiesT> implements Closeable {
158157
*/
159158
private final ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(1);
160159

160+
/** Executor for processing {@link Event.Results} and enqueing retried items. */
161+
private final ExecutorService retryService = Executors.newSingleThreadExecutor();
162+
161163
/** The thread that created the context. */
162164
private final Thread parent = Thread.currentThread();
163165

@@ -208,20 +210,6 @@ public final class BatchContext<PropertiesT> implements Closeable {
208210
/** stateChanged notifies threads about a state transition. */
209211
private final Condition stateChanged = lock.newCondition();
210212

211-
/**
212-
* Releasing a permit notifies the "sender" about an incoming
213-
* {@link Event.Results} batch. Acquire a permit to await the next batch.
214-
*
215-
* <p>
216-
* A semaphore provides signal semantics, similar to a {@link Condition},
217-
* but without being associated with a predicate. This comes handy when
218-
* {@link wip} is being drained after the context is closed, and the "sender"
219-
* needs to be notified about incoming {@link Event.Results}; a separate
220-
* condition is not necessary, as we can simply probe the {@link queue} to
221-
* find out if any new items have been added to it.
222-
*/
223-
private final Semaphore awaitResults = new Semaphore(0);
224-
225213
/**
226214
* Client-side part of the current stream, created on {@link #start}.
227215
* Other threads MAY use stream but MUST NOT update this field on their own.
@@ -323,6 +311,11 @@ private TaskHandle add(final TaskHandle taskHandle) throws InterruptedException
323311
throw new DuplicateTaskException(taskHandle, existing);
324312
}
325313

314+
// Remove the task from the WIP list as soon as it completes,
315+
// successfully or otherwise. Note, that TaskHandle::done future
316+
// only completes exceptionally after all retries have been exhausted.
317+
taskHandle.done().whenComplete((__, t) -> wip.remove(taskHandle.id()));
318+
326319
queue.put(taskHandle);
327320
return taskHandle;
328321
}
@@ -507,6 +500,7 @@ private void shutdownNow(Exception e) {
507500
private void shutdownExecutors() {
508501
sendService.shutdownNow();
509502
scheduledService.shutdownNow();
503+
retryService.shutdownNow();
510504
shutdownService.shutdownNow();
511505
scheduledReconnectService.shutdownNow();
512506
}
@@ -647,7 +641,11 @@ private void trySend() {
647641

648642
TaskHandle task = queue.take();
649643

650-
if (task == TaskHandle.POISON) {
644+
if (task == TaskHandle.END_RESULTS) {
645+
// This marker is only relevant when sent after POISON,
646+
// as we process retried items in drainWip.
647+
continue;
648+
} else if (task == TaskHandle.POISON) {
651649
assert closed : "queue poisoned before the context is closed";
652650

653651
log.debug("Took poison");
@@ -673,10 +671,9 @@ private void trySend() {
673671
return;
674672
}
675673

676-
Data data = task.data();
677-
batch.add(data);
674+
batch.add(task.data());
678675

679-
// Retred tasks already exist in the WIP list, replacing them is redundant.
676+
// Retried tasks already exist in the WIP list, replacing them is redundant.
680677
wip.putIfAbsent(task.id(), task);
681678
}
682679
} catch (InterruptedException ignored) {
@@ -695,14 +692,14 @@ private void trySend() {
695692
* in the while-loop is that nothing is sent <i>unless</i> the batch is full.
696693
*/
697694
private void send() throws InterruptedException {
698-
log.atInfo()
699-
.addKeyValue("batch_size_total_bytes", batch::sizeBytes)
700-
.log("Send next batch");
701-
702695
// Continue flushing until we get the batch to not a "not full" state.
703696
// This is to account for the backlog, which might re-fill the batch
704697
// after .clear().
705698
while (batch.isFull()) {
699+
log.atInfo()
700+
.addKeyValue("batch_size_items", batch::size)
701+
.addKeyValue("batch_size_total_bytes", batch::sizeBytes)
702+
.log("Send next batch");
706703
flush();
707704
}
708705
assert !batch.isFull() : "batch is full after send";
@@ -729,24 +726,22 @@ private void drainWip() throws InterruptedException {
729726
.addKeyValue("batch_size_total_items", batch::size)
730727
.addKeyValue("wip_tasks", wip::size)
731728
.log("Await Results");
732-
awaitResults.acquire();
733729

734730
TaskHandle task;
735-
while ((task = queue.poll()) != null) {
736-
Data data = task.data();
737-
batch.add(data);
731+
while ((task = queue.take()) != TaskHandle.END_RESULTS) {
732+
batch.add(task.data());
738733
}
739734

740735
assert batch.size() <= wip.size() : "batch has more items than wip";
741736

742737
if (batch.size() == wip.size()) {
743738
// This means the batch already contains all items in WIP,
744739
// and no more tasks will be added to the queue until the
745-
// current ones are send.
740+
// current ones are sent.
746741
drain();
747742
} else {
748743
// Only sends if the batch is full. If the batch is not full,
749-
// the we can keep accumulating items from the failed tasks.
744+
// then we can keep accumulating items from the failed tasks.
750745
send();
751746
}
752747
}
@@ -773,7 +768,7 @@ private void drain() throws InterruptedException {
773768
}
774769

775770
/**
776-
* Block until the current state allows {@link State#canSend},
771+
* Block until the current state allows {@link State#canSendNext},
777772
* then prepare the batch, send it, and set InFlight state.
778773
* Block until the current state allows {@link State#canPrepareNext}.
779774
*
@@ -953,24 +948,28 @@ private void onResults(Event.Results results) {
953948
.addKeyValue("wip_tasks", wip::size)
954949
.log("Received Results");
955950

956-
// Remove successfully completed tasks from the WIP list and mark them done.
957-
results.successful().stream()
958-
.map(wip::remove).filter(Objects::nonNull)
959-
.forEach(TaskHandle::setSuccess);
960-
961-
// Report errors for failed tasks. Do NOT remove them from the WIP list.
962-
results.errors().keySet().stream()
963-
.map(wip::get).filter(Objects::nonNull)
964-
.forEach(taskHandle -> taskHandle.setError(
965-
new ServerException(results.errors().get(taskHandle.id()))));
966-
967-
log.atDebug()
968-
.addKeyValue("count_success", results.errors().size())
969-
.addKeyValue("count_errors", results.successful().size())
970-
.log("Received results");
951+
// Scheduling items to be retried may block temporarily if the queue is not
952+
// adequately sized. Offload the next step to a different thread to avoid
953+
// blocking the internal gRPC thread on which this callback is running.
954+
retryService.execute(() -> {
955+
// Mark successfully completed tasks in the WIP list.
956+
// The whenComplete hook added in .add() will remove them from WIP.
957+
results.successful().stream()
958+
.map(wip::get).filter(Objects::nonNull)
959+
.forEach(TaskHandle::setSuccess);
960+
961+
// Report errors for failed tasks. Do NOT remove them from the WIP list.
962+
results.errors().keySet().stream()
963+
.map(wip::get).filter(Objects::nonNull)
964+
.forEach(taskHandle -> taskHandle.setError(
965+
new ServerException(results.errors().get(taskHandle.id()))));
971966

972-
awaitResults.release();
973-
assert awaitResults.availablePermits() == 1;
967+
try {
968+
queue.put(TaskHandle.END_RESULTS);
969+
} catch (InterruptedException e) {
970+
Thread.currentThread().interrupt();
971+
}
972+
});
974973
}
975974

976975
private void onBackoff(Event.Backoff backoff) {

src/main/java/io/weaviate/client6/v1/api/collections/batch/RetriableTask.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ public final boolean setSuccess() {
5858
* Mark the task failed. This status cannot be changed, so calling
5959
* {@link #setSuccess} afterwards will have no effect.
6060
*
61-
* @param error Error message. Null values are tolerated, but are only expected
62-
* to occur due to a server's mistake.
63-
* Do not use {@code setError(null)} if the server reports success
64-
* status for the task; prefer {@link #setSuccess} in that case.
61+
* @param t Error message. Null values are tolerated, but are only expected
62+
* to occur due to a server's mistake.
63+
* Do not use {@code setError(null)} if the server reports success
64+
* status for the task; prefer {@link #setSuccess} in that case.
6565
*/
6666
public final boolean setError(Throwable t) {
6767
return current.completeExceptionally(t);

src/main/java/io/weaviate/client6/v1/api/collections/batch/TaskHandle.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,17 @@
1616
@ThreadSafe
1717
@SuppressWarnings("deprecation") // protoc uses GeneratedMessageV3
1818
public final class TaskHandle extends RetriableTask {
19-
static final TaskHandle POISON = new TaskHandle();
19+
/**
20+
* Poison pill informs the "sender" that the batch is closed
21+
* and no more user-supplied items will be incoming.
22+
*/
23+
static final TaskHandle POISON = new TaskHandle("POISON");
24+
/**
25+
* Break pill informs the "sender" that the event handler has finished
26+
* processing an incoming {@link Event.Results} message and no more retry items
27+
* will be added to the queue until the next message arrives.
28+
*/
29+
static final TaskHandle END_RESULTS = new TaskHandle("END_RESULTS");
2030

2131
/**
2232
* Input value as passed by the user.
@@ -56,16 +66,16 @@ private TaskHandle(Data data, RetryPolicy retryPolicy, Consumer<String> onRetry)
5666
}
5767

5868
/**
59-
* Poison pill constructor.
69+
* Poison / Break pill constructor.
6070
*
6171
* <p>
6272
* A handle created with this constructor should not be
6373
* used for anything other that direct comparison using {@code ==} operator;
6474
* calling any method on a poison pill is likely to result in a
6575
* {@link NullPointerException} being thrown.
6676
*/
67-
private TaskHandle() {
68-
super("POISON", RetryPolicy.never(), __ -> {
77+
private TaskHandle(String name) {
78+
super(name, RetryPolicy.never(), __ -> {
6979
});
7080
this.data = null;
7181
}
@@ -76,8 +86,8 @@ Data data() {
7686

7787
@Override
7888
public String toString() {
79-
if (this == POISON) {
80-
return "TaskHandle<POISON>";
89+
if (this == POISON || this == END_RESULTS) {
90+
return "TaskHandle<%s>".formatted(id());
8191
}
8292
return "TaskHandle<id=%s, retried=%d, created=%s>".formatted(id(), timesRetried(), createdAt);
8393
}

0 commit comments

Comments
 (0)