Skip to content

Commit 00e0ab8

Browse files
Closing now actually works
1 parent d22d4cf commit 00e0ab8

File tree

2 files changed

+25
-10
lines changed

2 files changed

+25
-10
lines changed

google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.storage.contrib.nio;
1818

19+
import com.google.common.collect.ImmutableList;
1920
import com.google.common.util.concurrent.Futures;
2021

2122
import java.io.Closeable;
@@ -72,6 +73,7 @@ public class SeekableByteChannelPrefetcher implements SeekableByteChannel {
7273
private final ExecutorService exec;
7374
private final Sorted buffers;
7475
private final ArrayList<Worker> idleWorkers;
76+
private final int workerCount;
7577

7678
private class Buffer {
7779
// index*bufferSize = file position. Set to -1 when we haven't yet decided.
@@ -139,6 +141,7 @@ public void init(long pos, Buffer buf) throws ExecutionException, InterruptedExc
139141

140142
public ByteBuffer call() throws IOException, ExecutionException, InterruptedException {
141143
if (pos > chan.size()) {
144+
reassignWorker(this);
142145
return null;
143146
}
144147
chan.position(pos);
@@ -163,10 +166,12 @@ public SeekableByteChannelPrefetcher(SeekableByteChannelPrefetcherOptions opts,
163166
this.idleWorkers = new ArrayList<>(this.prefetchingThreads + this.extraThreads);
164167
this.exec = Executors.newFixedThreadPool(prefetchingThreads + extraThreads);
165168
SeekableByteChannel chan = null;
166-
for (SeekableByteChannel bc : channels) {
169+
ImmutableList<SeekableByteChannel> underlyingChannels = ImmutableList.copyOf(channels);
170+
for (SeekableByteChannel bc : underlyingChannels) {
167171
chan = bc;
168172
idleWorkers.add(new Worker(bc));
169173
}
174+
workerCount = underlyingChannels.size();
170175
size = chan.size();
171176
position = 0;
172177
}
@@ -323,20 +328,27 @@ public boolean isOpen() {
323328
public void close() throws IOException {
324329
if (open) {
325330
closing = true;
326-
while (true) {
327-
synchronized (idleWorkers) {
328-
if (idleWorkers.size() == prefetchingThreads + extraThreads) {
331+
exec.shutdown();
332+
try {
333+
while (true) synchronized (idleWorkers) {
334+
if (idleWorkers.size() >= workerCount) {
329335
// every thread is idle, we're done.
330336
break;
331337
}
338+
idleWorkers.wait(60_000);
332339
}
340+
} catch (InterruptedException e) {
341+
System.out.println("Timed out while waiting for channels to close.");
333342
}
334-
exec.shutdown();
335343
try {
336344
exec.awaitTermination(60, TimeUnit.SECONDS);
337345
} catch (InterruptedException e) {
338346
exec.shutdownNow();
339347
}
348+
// Close all underlying channels
349+
for (Worker w : idleWorkers) {
350+
w.close();
351+
}
340352
open = false;
341353
}
342354
}
@@ -417,7 +429,9 @@ private void reassignWorker(Worker w) throws ExecutionException, InterruptedExce
417429
long curIndex = index(position);
418430
if (!closing) {
419431
for (int i = 0; i < prefetchingThreads; i++) {
420-
if (i > lastIndex) break;
432+
if (i > lastIndex) {
433+
break;
434+
}
421435
if (buffers.get(curIndex + i) == null) {
422436
// work for you!
423437
Buffer buf = getEmptyBuffer();
@@ -440,7 +454,9 @@ private void startPrefetching(long position) throws ExecutionException, Interrup
440454
long lastIndex = index(size);
441455
long curIndex = index(position);
442456
for (int i = 0; i < prefetchingThreads; i++) {
443-
if (i > lastIndex) break;
457+
if (i > lastIndex) {
458+
break;
459+
}
444460
if (buffers.get(curIndex + i) == null) {
445461
// work available!
446462
Worker w = tryGetIdleWorker();
@@ -449,7 +465,6 @@ private void startPrefetching(long position) throws ExecutionException, Interrup
449465
}
450466
Buffer buf = getEmptyBuffer();
451467
sicWorker(w, bufferSize * (curIndex + i), buf);
452-
return;
453468
}
454469
}
455470
}

google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcherOptions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222
*/
2323
public class SeekableByteChannelPrefetcherOptions implements java.nio.file.OpenOption {
2424
// normal-case number of parallel reads.
25-
public int prefetchingThreads = 4;
25+
public int prefetchingThreads = 2;
2626
// in case the data we need isn't being prefetched, we can use up to this many
2727
// extra threads to fetch user-requested data.
2828
public int extraThreads = 1;
2929
// size in bytes for our buffer. Every fetcher grabs one buffer at a time.
3030
public int bufferSize = 50 * 1024 * 1024;
3131
// how many buffers we keep around. Should be at least prefetchingThreads + extraThreads.
3232
// bufferSize * bufferCount is how much memory this class'll allocate.
33-
public int bufferCount = 6;
33+
public int bufferCount = 4;
3434
}

0 commit comments

Comments
 (0)