diff --git a/java-storage/google-cloud-storage/pom.xml b/java-storage/google-cloud-storage/pom.xml index 64c0aa4be522..ac2f992f356f 100644 --- a/java-storage/google-cloud-storage/pom.xml +++ b/java-storage/google-cloud-storage/pom.xml @@ -460,6 +460,15 @@ + + org.apache.maven.plugins + maven-failsafe-plugin + + + true + + + diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java index 566a7765ebd5..b764970760c9 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeHttpServer.java @@ -99,7 +99,7 @@ static FakeHttpServer of(HttpRequestHandler server) { static FakeHttpServer of(HttpRequestHandler server, boolean trailingSlash) { // based on // https://github.com/netty/netty/blob/59aa6e635b9996cf21cd946e64353270679adc73/example/src/main/java/io/netty/example/http/helloworld/HttpHelloWorldServer.java - InetSocketAddress address = new InetSocketAddress("localhost", 0); + InetSocketAddress address = new InetSocketAddress("127.0.0.1", 0); // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); @@ -129,7 +129,7 @@ protected void initChannel(SocketChannel ch) { InetSocketAddress socketAddress = (InetSocketAddress) channel.localAddress(); String suffix = trailingSlash ? "/" : ""; - URI endpoint = URI.create("http://localhost:" + socketAddress.getPort() + suffix); + URI endpoint = URI.create("http://127.0.0.1:" + socketAddress.getPort() + suffix); HttpStorageOptions httpStorageOptions = HttpStorageOptions.http() .setHost(endpoint.toString()) diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java index 0481f2b062e6..87e28728868d 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java @@ -16,10 +16,15 @@ package com.google.cloud.storage; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.NoCredentials; +import com.google.cloud.ServiceFactory; import com.google.cloud.storage.it.GrpcPlainRequestLoggingInterceptor; import com.google.cloud.storage.it.runner.registry.Registry; +import com.google.storage.v2.StorageClient; import com.google.storage.v2.StorageGrpc; import com.google.storage.v2.StorageSettings; import io.grpc.Server; @@ -28,16 +33,19 @@ import java.net.InetSocketAddress; import java.time.Duration; import java.util.Locale; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; final class FakeServer implements AutoCloseable { private final Server server; private final GrpcStorageOptions grpcStorageOptions; + private final ScheduledThreadPoolExecutor executor; - FakeServer(Server server, GrpcStorageOptions grpcStorageOptions) { + FakeServer(Server server, GrpcStorageOptions grpcStorageOptions, ScheduledThreadPoolExecutor executor) { this.server = server; this.grpcStorageOptions = grpcStorageOptions; + this.executor = executor; } GrpcStorageOptions getGrpcStorageOptions() { @@ -45,19 +53,80 @@ GrpcStorageOptions getGrpcStorageOptions() { } StorageSettings storageSettings() throws IOException { - return grpcStorageOptions.getStorageSettings(); + StorageSettings settings = grpcStorageOptions.getStorageSettings(); + if (executor != null) { + ExecutorProvider executorProvider = FixedExecutorProvider.create(executor); + StorageSettings.Builder builder = settings.toBuilder() + .setBackgroundExecutorProvider(executorProvider); + if (builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) { + builder.setTransportChannelProvider( + ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()) + .toBuilder() + .setExecutorProvider(executorProvider) + .build()); + } + return builder.build(); + } + return settings; } @Override public void close() throws InterruptedException { - server.shutdownNow().awaitTermination(10, TimeUnit.SECONDS); + try { + server.shutdownNow().awaitTermination(10, TimeUnit.SECONDS); + } finally { + if (executor != null) { + executor.shutdownNow(); + } + } + } + + static void injectIsolatedClient(Storage storage, ScheduledThreadPoolExecutor executor) { + Storage delegate = storage; + if (storage instanceof OtelStorageDecorator) { + try { + java.lang.reflect.Field delegateField = OtelStorageDecorator.class.getDeclaredField("delegate"); + delegateField.setAccessible(true); + delegate = (Storage) delegateField.get(storage); + } catch (Exception e) { + throw new RuntimeException("Failed to unwrap OtelStorageDecorator", e); + } + } + if (delegate instanceof GrpcStorageImpl) { + GrpcStorageImpl impl = (GrpcStorageImpl) delegate; + try { + StorageSettings settings = impl.getOptions().getStorageSettings(); + ExecutorProvider executorProvider = FixedExecutorProvider.create(executor); + StorageSettings.Builder settingsBuilder = settings.toBuilder() + .setBackgroundExecutorProvider(executorProvider); + if (settingsBuilder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) { + settingsBuilder.setTransportChannelProvider( + ((InstantiatingGrpcChannelProvider) settingsBuilder.getTransportChannelProvider()) + .toBuilder() + .setExecutorProvider(executorProvider) + .build()); + } + StorageSettings isolatedSettings = settingsBuilder.build(); + StorageClient isolatedClient = StorageClient.create(isolatedSettings); + + java.lang.reflect.Field clientField = GrpcStorageImpl.class.getDeclaredField("storageClient"); + clientField.setAccessible(true); + clientField.set(impl, isolatedClient); + } catch (Exception e) { + throw new RuntimeException("Failed to inject isolated StorageClient", e); + } + } } static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException { - InetSocketAddress address = new InetSocketAddress("localhost", 0); + InetSocketAddress address = new InetSocketAddress("127.0.0.1", 0); Server server = NettyServerBuilder.forAddress(address).addService(service).build(); server.start(); String endpoint = String.format(Locale.US, "%s:%d", address.getHostString(), server.getPort()); + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(4); + executor.setKeepAliveTime(10, TimeUnit.SECONDS); + executor.allowCoreThreadTimeOut(true); + GrpcStorageOptions grpcStorageOptions = StorageOptions.grpc() .setHost("http://" + endpoint) @@ -67,6 +136,16 @@ static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException { .setEnableGrpcClientMetrics(false) .setAttemptDirectPath(false) .setOpenTelemetry(Registry.getInstance().otelSdk.get().get()) + .setServiceFactory( + new ServiceFactory() { + @Override + @SuppressWarnings("deprecation") + public Storage create(StorageOptions opts) { + Storage storage = new GrpcStorageOptions.GrpcStorageFactory().create(opts); + injectIsolatedClient(storage, executor); + return storage; + } + }) // cut most retry settings by half. we're hitting an in process server. .setRetrySettings( RetrySettings.newBuilder() @@ -80,6 +159,6 @@ static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException { .setMaxRpcTimeoutDuration(Duration.ofSeconds(25)) .build()) .build(); - return new FakeServer(server, grpcStorageOptions); + return new FakeServer(server, grpcStorageOptions, executor); } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java index 475ba6b35c9f..f1b1bfcd68d3 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicBidiUnbufferedWritableByteChannelTest.java @@ -835,12 +835,9 @@ static class BidiWriteService extends StorageImplBase { private final BiConsumer, List> c; - private ImmutableList.Builder requests; - BidiWriteService( BiConsumer, List> c) { this.c = c; - this.requests = new ImmutableList.Builder<>(); } BidiWriteService(ImmutableMap, BidiWriteObjectResponse> writes) { @@ -880,12 +877,16 @@ private static void logUnexpectedRequest( .collect(joining), build.stream().map(StorageV2ProtoUtils::fmtProto).collect(oneLine)); LOGGER.warn(msg); + System.err.println(msg); } @Override public StreamObserver bidiWriteObject( StreamObserver obs) { return new Adapter() { + private ImmutableList.Builder requests = + new ImmutableList.Builder<>(); + @Override public void onNext(BidiWriteObjectRequest value) { requests.add(value); diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java index 1e1c05915ab5..7762d38992d2 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java @@ -105,7 +105,12 @@ public void readObject( if (request.equals(req1)) { responseObserver.onNext(resp1); responseObserver.onNext(resp2); - responseObserver.onError(apiException(Code.DATA_LOSS)); + new Thread(() -> { + try { + Thread.sleep(200); + } catch (InterruptedException ignored) {} + responseObserver.onError(apiException(Code.DATA_LOSS)); + }).start(); } else if (request.equals(req2)) { responseObserver.onNext(resp3); responseObserver.onNext(resp4); @@ -194,7 +199,12 @@ public void readObject( if (count == 0) { responseObserver.onNext(resp1); responseObserver.onNext(resp2); - responseObserver.onError(apiException(Code.DATA_LOSS)); + new Thread(() -> { + try { + Thread.sleep(200); + } catch (InterruptedException ignored) {} + responseObserver.onError(apiException(Code.DATA_LOSS)); + }).start(); } } else if (request.equals(req2)) { ReadObjectResponse.Builder builder = resp3.toBuilder(); diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionFakeTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionFakeTest.java index 1cb32704982b..2c9368553282 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionFakeTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionFakeTest.java @@ -467,6 +467,7 @@ public void bidiReadObjectError() throws Exception { assertThat(actual).hasLength(5); assertThat(xxd(actual)).isEqualTo(xxd(content3.getBytes())); } + Thread.sleep(50); } } @@ -746,6 +747,7 @@ public void retrySettingsApplicable_objectRangeData_offset_notAligned_gt() throw .contains("position = 10, readRange.read_offset = 11"), () -> assertThat(suppressedMessages).contains("Asynchronous task failed")); } + Thread.sleep(50); } } @@ -795,6 +797,7 @@ public void validateReadRemovedFromStateWhenFailed() throws Exception { ObjectReadSessionStreamRead outstandingRead = orsi.state.getOutstandingRead(1L); assertThat(outstandingRead).isNull(); } + Thread.sleep(50); } } @@ -938,6 +941,7 @@ public void onNext(BidiReadObjectRequest request) { assert503(f2), assert503(f3)); } + Thread.sleep(50); } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java index 4d9340762093..e130627abc56 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java @@ -252,7 +252,7 @@ public void start() { ImmutableList.of( "docker", "run", - "-i", + "-d", "--rm", "--publish", port + ":9000", @@ -323,62 +323,50 @@ public void stop() { return; } try { - process.destroy(); - process.waitFor(2, TimeUnit.SECONDS); - boolean attemptForceStopContainer = false; - try { - int processExitValue = process.exitValue(); - if (processExitValue != 0) { - attemptForceStopContainer = true; - } - LOGGER.warn("Container exit value = {}", processExitValue); - } catch (IllegalThreadStateException e) { - attemptForceStopContainer = true; - } - - if (attemptForceStopContainer) { - LOGGER.warn("Container did not gracefully exit, attempting to explicitly stop it."); + if (!Boolean.getBoolean("testbench.keepAlive")) { + LOGGER.warn("Attempting to explicitly stop container: {}", containerName); ImmutableList command = ImmutableList.of("docker", "kill", containerName); LOGGER.warn(command.toString()); Process shutdownProcess = new ProcessBuilder(command).start(); shutdownProcess.waitFor(5, TimeUnit.SECONDS); int shutdownProcessExitValue = shutdownProcess.exitValue(); - LOGGER.warn("Container exit value = {}", shutdownProcessExitValue); + LOGGER.warn("Container stop exit value = {}", shutdownProcessExitValue); + + // wait for the server to shutdown + runWithRetries( + () -> { + try { + listRetryTests(); + } catch (SocketException e) { + // desired result + return null; + } + throw new NotShutdownException(); + }, + RetrySettings.newBuilder() + .setTotalTimeoutDuration(Duration.ofSeconds(30)) + .setInitialRetryDelayDuration(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.5) + .setMaxRetryDelayDuration(Duration.ofSeconds(5)) + .build(), + new BasicResultRetryAlgorithm>() { + @Override + public boolean shouldRetry(Throwable previousThrowable, List previousResponse) { + return previousThrowable instanceof NotShutdownException; + } + }, + NanoClock.getDefaultClock()); } - - // wait for the server to shutdown - runWithRetries( - () -> { - try { - listRetryTests(); - } catch (SocketException e) { - // desired result - return null; - } - throw new NotShutdownException(); - }, - RetrySettings.newBuilder() - .setTotalTimeoutDuration(Duration.ofSeconds(30)) - .setInitialRetryDelayDuration(Duration.ofMillis(500)) - .setRetryDelayMultiplier(1.5) - .setMaxRetryDelayDuration(Duration.ofSeconds(5)) - .build(), - new BasicResultRetryAlgorithm>() { - @Override - public boolean shouldRetry(Throwable previousThrowable, List previousResponse) { - return previousThrowable instanceof NotShutdownException; - } - }, - NanoClock.getDefaultClock()); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } finally { try { - Files.delete(errPath); - Files.delete(outPath); - Files.delete(tempDirectory); + Files.deleteIfExists(errPath); + Files.deleteIfExists(outPath); + Files.deleteIfExists(tempDirectory); } catch (IOException e) { - throw new RuntimeException(e); + LOGGER.warn("Failed to delete temporary files", e); } - } catch (InterruptedException | IOException e) { - throw new RuntimeException(e); } } @@ -442,8 +430,8 @@ public String toString() { } static final class Builder { - private static final String DEFAULT_BASE_URI = "http://localhost:9000"; - private static final String DEFAULT_GRPC_BASE_URI = "http://localhost:9005"; + private static final String DEFAULT_BASE_URI = "http://127.0.0.1:9000"; + private static final String DEFAULT_GRPC_BASE_URI = "http://127.0.0.1:9005"; private static final String DEFAULT_IMAGE_NAME; private static final String DEFAULT_IMAGE_TAG;