diff --git a/java-storage/google-cloud-storage/pom.xml b/java-storage/google-cloud-storage/pom.xml
index 64c0aa4be522..ac2f992f356f 100644
--- a/java-storage/google-cloud-storage/pom.xml
+++ b/java-storage/google-cloud-storage/pom.xml
@@ -460,6 +460,15 @@
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+
+
+ true
+
+
+
diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java
index 566a7765ebd5..b764970760c9 100644
--- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java
+++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java
@@ -99,7 +99,7 @@ static FakeHttpServer of(HttpRequestHandler server) {
static FakeHttpServer of(HttpRequestHandler server, boolean trailingSlash) {
// based on
// https://github.com/netty/netty/blob/59aa6e635b9996cf21cd946e64353270679adc73/example/src/main/java/io/netty/example/http/helloworld/HttpHelloWorldServer.java
- InetSocketAddress address = new InetSocketAddress("localhost", 0);
+ InetSocketAddress address = new InetSocketAddress("127.0.0.1", 0);
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
@@ -129,7 +129,7 @@ protected void initChannel(SocketChannel ch) {
InetSocketAddress socketAddress = (InetSocketAddress) channel.localAddress();
String suffix = trailingSlash ? "/" : "";
- URI endpoint = URI.create("http://localhost:" + socketAddress.getPort() + suffix);
+ URI endpoint = URI.create("http://127.0.0.1:" + socketAddress.getPort() + suffix);
HttpStorageOptions httpStorageOptions =
HttpStorageOptions.http()
.setHost(endpoint.toString())
diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java
index 0481f2b062e6..87e28728868d 100644
--- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java
+++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java
@@ -16,10 +16,15 @@
package com.google.cloud.storage;
+import com.google.api.gax.core.ExecutorProvider;
+import com.google.api.gax.core.FixedExecutorProvider;
+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.NoCredentials;
+import com.google.cloud.ServiceFactory;
import com.google.cloud.storage.it.GrpcPlainRequestLoggingInterceptor;
import com.google.cloud.storage.it.runner.registry.Registry;
+import com.google.storage.v2.StorageClient;
import com.google.storage.v2.StorageGrpc;
import com.google.storage.v2.StorageSettings;
import io.grpc.Server;
@@ -28,16 +33,19 @@
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Locale;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
final class FakeServer implements AutoCloseable {
private final Server server;
private final GrpcStorageOptions grpcStorageOptions;
+ private final ScheduledThreadPoolExecutor executor;
- FakeServer(Server server, GrpcStorageOptions grpcStorageOptions) {
+ FakeServer(Server server, GrpcStorageOptions grpcStorageOptions, ScheduledThreadPoolExecutor executor) {
this.server = server;
this.grpcStorageOptions = grpcStorageOptions;
+ this.executor = executor;
}
GrpcStorageOptions getGrpcStorageOptions() {
@@ -45,19 +53,80 @@ GrpcStorageOptions getGrpcStorageOptions() {
}
StorageSettings storageSettings() throws IOException {
- return grpcStorageOptions.getStorageSettings();
+ StorageSettings settings = grpcStorageOptions.getStorageSettings();
+ if (executor != null) {
+ ExecutorProvider executorProvider = FixedExecutorProvider.create(executor);
+ StorageSettings.Builder builder = settings.toBuilder()
+ .setBackgroundExecutorProvider(executorProvider);
+ if (builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) {
+ builder.setTransportChannelProvider(
+ ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider())
+ .toBuilder()
+ .setExecutorProvider(executorProvider)
+ .build());
+ }
+ return builder.build();
+ }
+ return settings;
}
@Override
public void close() throws InterruptedException {
- server.shutdownNow().awaitTermination(10, TimeUnit.SECONDS);
+ try {
+ server.shutdownNow().awaitTermination(10, TimeUnit.SECONDS);
+ } finally {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ static void injectIsolatedClient(Storage storage, ScheduledThreadPoolExecutor executor) {
+ Storage delegate = storage;
+ if (storage instanceof OtelStorageDecorator) {
+ try {
+ java.lang.reflect.Field delegateField = OtelStorageDecorator.class.getDeclaredField("delegate");
+ delegateField.setAccessible(true);
+ delegate = (Storage) delegateField.get(storage);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to unwrap OtelStorageDecorator", e);
+ }
+ }
+ if (delegate instanceof GrpcStorageImpl) {
+ GrpcStorageImpl impl = (GrpcStorageImpl) delegate;
+ try {
+ StorageSettings settings = impl.getOptions().getStorageSettings();
+ ExecutorProvider executorProvider = FixedExecutorProvider.create(executor);
+ StorageSettings.Builder settingsBuilder = settings.toBuilder()
+ .setBackgroundExecutorProvider(executorProvider);
+ if (settingsBuilder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) {
+ settingsBuilder.setTransportChannelProvider(
+ ((InstantiatingGrpcChannelProvider) settingsBuilder.getTransportChannelProvider())
+ .toBuilder()
+ .setExecutorProvider(executorProvider)
+ .build());
+ }
+ StorageSettings isolatedSettings = settingsBuilder.build();
+ StorageClient isolatedClient = StorageClient.create(isolatedSettings);
+
+ java.lang.reflect.Field clientField = GrpcStorageImpl.class.getDeclaredField("storageClient");
+ clientField.setAccessible(true);
+ clientField.set(impl, isolatedClient);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to inject isolated StorageClient", e);
+ }
+ }
}
static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException {
- InetSocketAddress address = new InetSocketAddress("localhost", 0);
+ InetSocketAddress address = new InetSocketAddress("127.0.0.1", 0);
Server server = NettyServerBuilder.forAddress(address).addService(service).build();
server.start();
String endpoint = String.format(Locale.US, "%s:%d", address.getHostString(), server.getPort());
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(4);
+ executor.setKeepAliveTime(10, TimeUnit.SECONDS);
+ executor.allowCoreThreadTimeOut(true);
+
GrpcStorageOptions grpcStorageOptions =
StorageOptions.grpc()
.setHost("http://" + endpoint)
@@ -67,6 +136,16 @@ static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException {
.setEnableGrpcClientMetrics(false)
.setAttemptDirectPath(false)
.setOpenTelemetry(Registry.getInstance().otelSdk.get().get())
+ .setServiceFactory(
+ new ServiceFactory() {
+ @Override
+ @SuppressWarnings("deprecation")
+ public Storage create(StorageOptions opts) {
+ Storage storage = new GrpcStorageOptions.GrpcStorageFactory().create(opts);
+ injectIsolatedClient(storage, executor);
+ return storage;
+ }
+ })
// cut most retry settings by half. we're hitting an in process server.
.setRetrySettings(
RetrySettings.newBuilder()
@@ -80,6 +159,6 @@ static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException {
.setMaxRpcTimeoutDuration(Duration.ofSeconds(25))
.build())
.build();
- return new FakeServer(server, grpcStorageOptions);
+ return new FakeServer(server, grpcStorageOptions, executor);
}
}
diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java
index 475ba6b35c9f..f1b1bfcd68d3 100644
--- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java
+++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java
@@ -835,12 +835,9 @@ static class BidiWriteService extends StorageImplBase {
private final BiConsumer, List>
c;
- private ImmutableList.Builder requests;
-
BidiWriteService(
BiConsumer, List> c) {
this.c = c;
- this.requests = new ImmutableList.Builder<>();
}
BidiWriteService(ImmutableMap, BidiWriteObjectResponse> writes) {
@@ -880,12 +877,16 @@ private static void logUnexpectedRequest(
.collect(joining),
build.stream().map(StorageV2ProtoUtils::fmtProto).collect(oneLine));
LOGGER.warn(msg);
+ System.err.println(msg);
}
@Override
public StreamObserver bidiWriteObject(
StreamObserver obs) {
return new Adapter() {
+ private ImmutableList.Builder requests =
+ new ImmutableList.Builder<>();
+
@Override
public void onNext(BidiWriteObjectRequest value) {
requests.add(value);
diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java
index 1e1c05915ab5..7762d38992d2 100644
--- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java
+++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java
@@ -105,7 +105,12 @@ public void readObject(
if (request.equals(req1)) {
responseObserver.onNext(resp1);
responseObserver.onNext(resp2);
- responseObserver.onError(apiException(Code.DATA_LOSS));
+ new Thread(() -> {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ignored) {}
+ responseObserver.onError(apiException(Code.DATA_LOSS));
+ }).start();
} else if (request.equals(req2)) {
responseObserver.onNext(resp3);
responseObserver.onNext(resp4);
@@ -194,7 +199,12 @@ public void readObject(
if (count == 0) {
responseObserver.onNext(resp1);
responseObserver.onNext(resp2);
- responseObserver.onError(apiException(Code.DATA_LOSS));
+ new Thread(() -> {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ignored) {}
+ responseObserver.onError(apiException(Code.DATA_LOSS));
+ }).start();
}
} else if (request.equals(req2)) {
ReadObjectResponse.Builder builder = resp3.toBuilder();
diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionFakeTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionFakeTest.java
index 1cb32704982b..2c9368553282 100644
--- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionFakeTest.java
+++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionFakeTest.java
@@ -467,6 +467,7 @@ public void bidiReadObjectError() throws Exception {
assertThat(actual).hasLength(5);
assertThat(xxd(actual)).isEqualTo(xxd(content3.getBytes()));
}
+ Thread.sleep(50);
}
}
@@ -746,6 +747,7 @@ public void retrySettingsApplicable_objectRangeData_offset_notAligned_gt() throw
.contains("position = 10, readRange.read_offset = 11"),
() -> assertThat(suppressedMessages).contains("Asynchronous task failed"));
}
+ Thread.sleep(50);
}
}
@@ -795,6 +797,7 @@ public void validateReadRemovedFromStateWhenFailed() throws Exception {
ObjectReadSessionStreamRead> outstandingRead = orsi.state.getOutstandingRead(1L);
assertThat(outstandingRead).isNull();
}
+ Thread.sleep(50);
}
}
@@ -938,6 +941,7 @@ public void onNext(BidiReadObjectRequest request) {
assert503(f2),
assert503(f3));
}
+ Thread.sleep(50);
}
}
diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java
index 4d9340762093..e130627abc56 100644
--- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java
+++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java
@@ -252,7 +252,7 @@ public void start() {
ImmutableList.of(
"docker",
"run",
- "-i",
+ "-d",
"--rm",
"--publish",
port + ":9000",
@@ -323,62 +323,50 @@ public void stop() {
return;
}
try {
- process.destroy();
- process.waitFor(2, TimeUnit.SECONDS);
- boolean attemptForceStopContainer = false;
- try {
- int processExitValue = process.exitValue();
- if (processExitValue != 0) {
- attemptForceStopContainer = true;
- }
- LOGGER.warn("Container exit value = {}", processExitValue);
- } catch (IllegalThreadStateException e) {
- attemptForceStopContainer = true;
- }
-
- if (attemptForceStopContainer) {
- LOGGER.warn("Container did not gracefully exit, attempting to explicitly stop it.");
+ if (!Boolean.getBoolean("testbench.keepAlive")) {
+ LOGGER.warn("Attempting to explicitly stop container: {}", containerName);
ImmutableList command = ImmutableList.of("docker", "kill", containerName);
LOGGER.warn(command.toString());
Process shutdownProcess = new ProcessBuilder(command).start();
shutdownProcess.waitFor(5, TimeUnit.SECONDS);
int shutdownProcessExitValue = shutdownProcess.exitValue();
- LOGGER.warn("Container exit value = {}", shutdownProcessExitValue);
+ LOGGER.warn("Container stop exit value = {}", shutdownProcessExitValue);
+
+ // wait for the server to shutdown
+ runWithRetries(
+ () -> {
+ try {
+ listRetryTests();
+ } catch (SocketException e) {
+ // desired result
+ return null;
+ }
+ throw new NotShutdownException();
+ },
+ RetrySettings.newBuilder()
+ .setTotalTimeoutDuration(Duration.ofSeconds(30))
+ .setInitialRetryDelayDuration(Duration.ofMillis(500))
+ .setRetryDelayMultiplier(1.5)
+ .setMaxRetryDelayDuration(Duration.ofSeconds(5))
+ .build(),
+ new BasicResultRetryAlgorithm>() {
+ @Override
+ public boolean shouldRetry(Throwable previousThrowable, List> previousResponse) {
+ return previousThrowable instanceof NotShutdownException;
+ }
+ },
+ NanoClock.getDefaultClock());
}
-
- // wait for the server to shutdown
- runWithRetries(
- () -> {
- try {
- listRetryTests();
- } catch (SocketException e) {
- // desired result
- return null;
- }
- throw new NotShutdownException();
- },
- RetrySettings.newBuilder()
- .setTotalTimeoutDuration(Duration.ofSeconds(30))
- .setInitialRetryDelayDuration(Duration.ofMillis(500))
- .setRetryDelayMultiplier(1.5)
- .setMaxRetryDelayDuration(Duration.ofSeconds(5))
- .build(),
- new BasicResultRetryAlgorithm>() {
- @Override
- public boolean shouldRetry(Throwable previousThrowable, List> previousResponse) {
- return previousThrowable instanceof NotShutdownException;
- }
- },
- NanoClock.getDefaultClock());
+ } catch (InterruptedException | IOException e) {
+ throw new RuntimeException(e);
+ } finally {
try {
- Files.delete(errPath);
- Files.delete(outPath);
- Files.delete(tempDirectory);
+ Files.deleteIfExists(errPath);
+ Files.deleteIfExists(outPath);
+ Files.deleteIfExists(tempDirectory);
} catch (IOException e) {
- throw new RuntimeException(e);
+ LOGGER.warn("Failed to delete temporary files", e);
}
- } catch (InterruptedException | IOException e) {
- throw new RuntimeException(e);
}
}
@@ -442,8 +430,8 @@ public String toString() {
}
static final class Builder {
- private static final String DEFAULT_BASE_URI = "http://localhost:9000";
- private static final String DEFAULT_GRPC_BASE_URI = "http://localhost:9005";
+ private static final String DEFAULT_BASE_URI = "http://127.0.0.1:9000";
+ private static final String DEFAULT_GRPC_BASE_URI = "http://127.0.0.1:9005";
private static final String DEFAULT_IMAGE_NAME;
private static final String DEFAULT_IMAGE_TAG;