diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index 50e507b0cd8b..f45f87a0b394 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -59,6 +59,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.rest.RESTCatalogProperties; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; @@ -415,11 +416,16 @@ private Map clientByPrefix() { .collect(Collectors.toList()) .forEach( storageCredential -> { - Map propertiesWithCredentials = + ImmutableMap.Builder propsBuilder = ImmutableMap.builder() .putAll(properties) - .putAll(storageCredential.config()) - .buildKeepingLast(); + .putAll(storageCredential.config()); + if (storageCredential.storageRefreshToken() != null) { + propsBuilder.put( + RESTCatalogProperties.REST_STORAGE_REFRESH_TOKEN, + storageCredential.storageRefreshToken()); + } + Map propertiesWithCredentials = propsBuilder.buildKeepingLast(); localClientByPrefix.put( storageCredential.prefix(), @@ -465,7 +471,7 @@ private void refreshStorageCredentials() { List refreshed = provider.fetchCredentials().credentials().stream() .filter(c -> c.prefix().startsWith(ROOT_PREFIX)) - .map(c -> StorageCredential.create(c.prefix(), c.config())) + .map(c -> StorageCredential.create(c.prefix(), c.config(), c.storageRefreshToken())) .collect(Collectors.toList()); if (!refreshed.isEmpty() && !isResourceClosed.get()) { diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java index 75d114d4efbe..c9a1594baf91 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java @@ -27,6 +27,7 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.rest.ErrorHandlers; import org.apache.iceberg.rest.HTTPClient; import org.apache.iceberg.rest.RESTCatalogProperties; @@ -53,6 +54,7 @@ public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAut private final String catalogEndpoint; private final String credentialsEndpoint; private final String planId; + private final String storageRefreshToken; private AuthManager authManager; private AuthSession authSession; @@ -69,6 +71,8 @@ private VendedCredentialsProvider(Map properties) { this.catalogEndpoint = properties.get(CatalogProperties.URI); this.credentialsEndpoint = properties.get(URI); this.planId = properties.getOrDefault(RESTCatalogProperties.REST_SCAN_PLAN_ID, null); + this.storageRefreshToken = + properties.getOrDefault(RESTCatalogProperties.REST_STORAGE_REFRESH_TOKEN, null); } @Override @@ -108,10 +112,19 @@ private RESTClient httpClient() { } LoadCredentialsResponse fetchCredentials() { + Map queryParams = null; + if (null != planId) { + queryParams = Maps.newHashMap(); + queryParams.put("planId", planId); + } else if (null != storageRefreshToken) { + queryParams = Maps.newHashMap(); + queryParams.put("storageRefreshToken", storageRefreshToken); + } + return httpClient() .get( credentialsEndpoint, - null != planId ? Map.of("planId", planId) : null, + queryParams, LoadCredentialsResponse.class, Map.of(), ErrorHandlers.defaultErrorHandler()); diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOCredentialRefresh.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOCredentialRefresh.java index 0a8b0e084873..eeb20bf2fce7 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOCredentialRefresh.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOCredentialRefresh.java @@ -179,6 +179,86 @@ public void credentialRefreshSchedulesNextRefresh() { } } + @Test + public void refreshedCredentialsPreserveStorageRefreshToken() { + String nearExpiryMs = Long.toString(Instant.now().plus(3, ChronoUnit.MINUTES).toEpochMilli()); + + StorageCredential initialCredential = + StorageCredential.create( + "s3://bucket/path", + ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, + "initialAccessKey", + S3FileIOProperties.SECRET_ACCESS_KEY, + "initialSecretKey", + S3FileIOProperties.SESSION_TOKEN, + "initialToken", + S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS, + nearExpiryMs), + "initial-refresh-token"); + + String refreshedExpiryMs = + Long.toString(Instant.now().plus(1, ChronoUnit.HOURS).toEpochMilli()); + String rotatedToken = "rotated-refresh-token"; + LoadCredentialsResponse refreshResponse = + ImmutableLoadCredentialsResponse.builder() + .addCredentials( + ImmutableCredential.builder() + .prefix("s3://bucket/path") + .config( + ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, + "refreshedAccessKey", + S3FileIOProperties.SECRET_ACCESS_KEY, + "refreshedSecretKey", + S3FileIOProperties.SESSION_TOKEN, + "refreshedToken", + S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS, + refreshedExpiryMs)) + .storageRefreshToken(rotatedToken) + .build()) + .build(); + + HttpRequest mockRequest = request("/v1/credentials").withMethod(HttpMethod.GET.name()); + HttpResponse mockResponse = + response(LoadCredentialsResponseParser.toJson(refreshResponse)).withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + Map properties = + ImmutableMap.of( + AwsProperties.CLIENT_FACTORY, + StaticClientFactory.class.getName(), + VendedCredentialsProvider.URI, + CREDENTIALS_URI, + CatalogProperties.URI, + CATALOG_URI, + "init-creation-stacktrace", + "false"); + + StaticClientFactory.client = null; + try (S3FileIO fileIO = new S3FileIO()) { + fileIO.initialize(properties); + fileIO.setCredentials(List.of(initialCredential)); + + fileIO.client(); + + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> mockServer.verify(mockRequest, VerificationTimes.atLeast(1))); + + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted( + () -> { + List credentials = fileIO.credentials(); + assertThat(credentials).hasSize(1); + assertThat(credentials.get(0).config()) + .containsEntry(S3FileIOProperties.ACCESS_KEY_ID, "refreshedAccessKey"); + assertThat(credentials.get(0).storageRefreshToken()).isEqualTo(rotatedToken); + }); + } + } + @Test public void credentialRefreshWithinFiveMinuteWindow() { // Set up credentials expiring within the next 5 minutes so the refresh triggers immediately diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java index d0287dc3080c..01110321e0df 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java @@ -538,6 +538,136 @@ public void planIdQueryParamIsSent() { mockServer.verify(mockRequest, VerificationTimes.exactly(2)); } + @Test + public void storageRefreshTokenQueryParamIsSent() { + String refreshToken = "opaque-refresh-token-123"; + HttpRequest mockRequest = + request("/v1/credentials") + .withMethod(HttpMethod.GET.name()) + .withQueryStringParameter("storageRefreshToken", refreshToken); + Credential credential = + ImmutableCredential.builder() + .prefix("s3") + .config( + ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, + "randomAccessKey", + S3FileIOProperties.SECRET_ACCESS_KEY, + "randomSecretAccessKey", + S3FileIOProperties.SESSION_TOKEN, + "sessionToken", + S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS, + Long.toString(Instant.now().plus(1, ChronoUnit.MINUTES).toEpochMilli()))) + .build(); + LoadCredentialsResponse response = + ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build(); + + HttpResponse mockResponse = + response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + Map properties = + ImmutableMap.builder() + .putAll(PROPERTIES) + .put(RESTCatalogProperties.REST_STORAGE_REFRESH_TOKEN, refreshToken) + .build(); + + try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(properties)) { + AwsCredentials awsCredentials = provider.resolveCredentials(); + verifyCredentials(awsCredentials, credential); + + AwsCredentials refreshedCredentials = provider.resolveCredentials(); + assertThat(refreshedCredentials).isNotSameAs(awsCredentials); + verifyCredentials(refreshedCredentials, credential); + } + + mockServer.verify(mockRequest, VerificationTimes.exactly(2)); + } + + @Test + public void storageRefreshTokenNotSentWhenNull() { + HttpRequest mockRequest = request("/v1/credentials").withMethod(HttpMethod.GET.name()); + Credential credential = + ImmutableCredential.builder() + .prefix("s3") + .config( + ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, + "randomAccessKey", + S3FileIOProperties.SECRET_ACCESS_KEY, + "randomSecretAccessKey", + S3FileIOProperties.SESSION_TOKEN, + "sessionToken", + S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS, + Long.toString(Instant.now().plus(1, ChronoUnit.HOURS).toEpochMilli()))) + .build(); + LoadCredentialsResponse response = + ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build(); + + HttpResponse mockResponse = + response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) { + AwsCredentials awsCredentials = provider.resolveCredentials(); + verifyCredentials(awsCredentials, credential); + } + + mockServer.verify( + request("/v1/credentials") + .withMethod(HttpMethod.GET.name()) + .withQueryStringParameter("storageRefreshToken"), + VerificationTimes.never()); + } + + @Test + public void planIdTakesPrecedenceOverStorageRefreshToken() { + String planId = "plan-123"; + HttpRequest mockRequest = + request("/v1/credentials") + .withMethod(HttpMethod.GET.name()) + .withQueryStringParameter("planId", planId); + Credential credential = + ImmutableCredential.builder() + .prefix("s3") + .config( + ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, + "randomAccessKey", + S3FileIOProperties.SECRET_ACCESS_KEY, + "randomSecretAccessKey", + S3FileIOProperties.SESSION_TOKEN, + "sessionToken", + S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS, + Long.toString(Instant.now().plus(1, ChronoUnit.MINUTES).toEpochMilli()))) + .build(); + LoadCredentialsResponse response = + ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build(); + + HttpResponse mockResponse = + response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + Map properties = + ImmutableMap.builder() + .putAll(PROPERTIES) + .put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId) + .put(RESTCatalogProperties.REST_STORAGE_REFRESH_TOKEN, "should-be-ignored") + .build(); + + try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(properties)) { + AwsCredentials awsCredentials = provider.resolveCredentials(); + verifyCredentials(awsCredentials, credential); + } + + mockServer.verify(mockRequest, VerificationTimes.atLeast(1)); + mockServer.verify( + request("/v1/credentials") + .withMethod(HttpMethod.GET.name()) + .withQueryStringParameter("storageRefreshToken"), + VerificationTimes.never()); + } + private void verifyCredentials(AwsCredentials awsCredentials, Credential credential) { assertThat(awsCredentials).isInstanceOf(AwsSessionCredentials.class); AwsSessionCredentials creds = (AwsSessionCredentials) awsCredentials; diff --git a/core/src/main/java/org/apache/iceberg/io/ImmutableStorageCredential.java b/core/src/main/java/org/apache/iceberg/io/ImmutableStorageCredential.java index 9cfd4246dbb6..a80b95749d09 100644 --- a/core/src/main/java/org/apache/iceberg/io/ImmutableStorageCredential.java +++ b/core/src/main/java/org/apache/iceberg/io/ImmutableStorageCredential.java @@ -45,10 +45,13 @@ public final class ImmutableStorageCredential implements StorageCredential { private final String prefix; private final Map config; + @Nullable private final String storageRefreshToken; - private ImmutableStorageCredential(String prefix, Map config) { + private ImmutableStorageCredential( + String prefix, Map config, @Nullable String storageRefreshToken) { this.prefix = prefix; this.config = config; + this.storageRefreshToken = storageRefreshToken; } /** @@ -67,6 +70,15 @@ public Map config() { return config; } + /** + * @return The value of the {@code storageRefreshToken} attribute + */ + @Override + @Nullable + public String storageRefreshToken() { + return storageRefreshToken; + } + /** * Copy the current immutable object by setting a value for the {@link StorageCredential#prefix() * prefix} attribute. An equals check used to prevent copying of the same value by returning @@ -78,7 +90,8 @@ public Map config() { public final ImmutableStorageCredential withPrefix(String value) { String newValue = Objects.requireNonNull(value, "prefix"); if (this.prefix.equals(newValue)) return this; - return validate(new ImmutableStorageCredential(newValue, this.config)); + return validate( + new ImmutableStorageCredential(newValue, this.config, this.storageRefreshToken)); } /** @@ -92,7 +105,21 @@ public final ImmutableStorageCredential withPrefix(String value) { public final ImmutableStorageCredential withConfig(Map entries) { if (this.config == entries) return this; Map newValue = createSerializableMap(true, false, entries); - return validate(new ImmutableStorageCredential(this.prefix, newValue)); + return validate( + new ImmutableStorageCredential(this.prefix, newValue, this.storageRefreshToken)); + } + + /** + * Copy the current immutable object by setting a value for the {@link + * StorageCredential#storageRefreshToken() storageRefreshToken} attribute. An equals check used to + * prevent copying of the same value by returning {@code this}. + * + * @param value A new value for storageRefreshToken (can be {@code null}) + * @return A modified copy of the {@code this} object + */ + public final ImmutableStorageCredential withStorageRefreshToken(@Nullable String value) { + if (Objects.equals(this.storageRefreshToken, value)) return this; + return validate(new ImmutableStorageCredential(this.prefix, this.config, value)); } /** @@ -109,7 +136,9 @@ public boolean equals(@Nullable Object another) { } private boolean equalTo(int synthetic, ImmutableStorageCredential another) { - return prefix.equals(another.prefix) && config.equals(another.config); + return prefix.equals(another.prefix) + && config.equals(another.config) + && Objects.equals(storageRefreshToken, another.storageRefreshToken); } /** @@ -122,6 +151,7 @@ public int hashCode() { @Var int h = 5381; h += (h << 5) + prefix.hashCode(); h += (h << 5) + config.hashCode(); + h += (h << 5) + Objects.hashCode(storageRefreshToken); return h; } @@ -132,7 +162,14 @@ public int hashCode() { */ @Override public String toString() { - return "StorageCredential{" + "prefix=" + prefix + ", config=" + config + "}"; + return "StorageCredential{" + + "prefix=" + + prefix + + ", config=" + + config + + ", storageRefreshToken=" + + storageRefreshToken + + "}"; } private static ImmutableStorageCredential validate(ImmutableStorageCredential instance) { @@ -187,6 +224,7 @@ public static ImmutableStorageCredential.Builder builder() { public static final class Builder { private String prefix; private Map config = new LinkedHashMap(); + @Nullable private String storageRefreshToken; private Builder() {} @@ -204,6 +242,10 @@ public final Builder from(StorageCredential instance) { Objects.requireNonNull(instance, "instance"); this.prefix(instance.prefix()); putAllConfig(instance.config()); + String tokenValue = instance.storageRefreshToken(); + if (tokenValue != null) { + this.storageRefreshToken(tokenValue); + } return this; } @@ -219,6 +261,19 @@ public final Builder prefix(String prefix) { return this; } + /** + * Initializes the value for the {@link StorageCredential#storageRefreshToken() + * storageRefreshToken} attribute. + * + * @param storageRefreshToken The value for storageRefreshToken (can be {@code null}) + * @return {@code this} builder for use in a chained invocation + */ + @CanIgnoreReturnValue + public final Builder storageRefreshToken(@Nullable String storageRefreshToken) { + this.storageRefreshToken = storageRefreshToken; + return this; + } + /** * Put one entry to the {@link StorageCredential#config() config} map. * @@ -295,7 +350,8 @@ public ImmutableStorageCredential build() { } return ImmutableStorageCredential.validate( - new ImmutableStorageCredential(prefix, createSerializableMap(false, false, config))); + new ImmutableStorageCredential( + prefix, createSerializableMap(false, false, config), storageRefreshToken)); } } diff --git a/core/src/main/java/org/apache/iceberg/io/StorageCredential.java b/core/src/main/java/org/apache/iceberg/io/StorageCredential.java index 7e935470c40a..f406602a93f4 100644 --- a/core/src/main/java/org/apache/iceberg/io/StorageCredential.java +++ b/core/src/main/java/org/apache/iceberg/io/StorageCredential.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.Map; +import javax.annotation.Nullable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public interface StorageCredential extends Serializable { @@ -28,6 +29,11 @@ public interface StorageCredential extends Serializable { Map config(); + @Nullable + default String storageRefreshToken() { + return null; + } + default void validate() { Preconditions.checkArgument(!prefix().isEmpty(), "Invalid prefix: must be non-empty"); Preconditions.checkArgument(!config().isEmpty(), "Invalid config: must be non-empty"); @@ -36,4 +42,13 @@ default void validate() { static StorageCredential create(String prefix, Map config) { return ImmutableStorageCredential.builder().prefix(prefix).config(config).build(); } + + static StorageCredential create( + String prefix, Map config, @Nullable String storageRefreshToken) { + return ImmutableStorageCredential.builder() + .prefix(prefix) + .config(config) + .storageRefreshToken(storageRefreshToken) + .build(); + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java index cda81b1d0d65..4f97b7a04bc9 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java @@ -48,6 +48,7 @@ private RESTCatalogProperties() {} public static final String SCAN_PLANNING_MODE = "scan-planning-mode"; public static final String REST_SCAN_PLAN_ID = "rest-scan-plan-id"; + public static final String REST_STORAGE_REFRESH_TOKEN = "rest-storage-refresh-token"; // Properties that control the behaviour of the table cache used for freshness-aware table // loading. diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index cbdf17a8ebbe..69f0a1bca265 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -1191,7 +1191,7 @@ private FileIO newFileIO( && fileIO instanceof SupportsStorageCredentials ioWithCredentials) { ioWithCredentials.setCredentials( storageCredentials.stream() - .map(c -> StorageCredential.create(c.prefix(), c.config())) + .map(c -> StorageCredential.create(c.prefix(), c.config(), c.storageRefreshToken())) .collect(Collectors.toList())); } return fileIO; @@ -1202,7 +1202,7 @@ private FileIO newFileIO( properties, conf, storageCredentials.stream() - .map(c -> StorageCredential.create(c.prefix(), c.config())) + .map(c -> StorageCredential.create(c.prefix(), c.config(), c.storageRefreshToken())) .collect(Collectors.toList())); } } diff --git a/core/src/main/java/org/apache/iceberg/rest/credentials/Credential.java b/core/src/main/java/org/apache/iceberg/rest/credentials/Credential.java index 0bd6673384de..4e6e25d6df6d 100644 --- a/core/src/main/java/org/apache/iceberg/rest/credentials/Credential.java +++ b/core/src/main/java/org/apache/iceberg/rest/credentials/Credential.java @@ -19,6 +19,7 @@ package org.apache.iceberg.rest.credentials; import java.util.Map; +import javax.annotation.Nullable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.immutables.value.Value; @@ -28,6 +29,12 @@ public interface Credential { Map config(); + @Value.Default + @Nullable + default String storageRefreshToken() { + return null; + } + @Value.Check default void validate() { Preconditions.checkArgument(!prefix().isEmpty(), "Invalid prefix: must be non-empty"); diff --git a/core/src/main/java/org/apache/iceberg/rest/credentials/CredentialParser.java b/core/src/main/java/org/apache/iceberg/rest/credentials/CredentialParser.java index 14314d6e4fb2..b8b2224179e5 100644 --- a/core/src/main/java/org/apache/iceberg/rest/credentials/CredentialParser.java +++ b/core/src/main/java/org/apache/iceberg/rest/credentials/CredentialParser.java @@ -28,6 +28,7 @@ public class CredentialParser { private static final String PREFIX = "prefix"; private static final String CONFIG = "config"; + private static final String STORAGE_REFRESH_TOKEN = "storage-refresh-token"; private CredentialParser() {} @@ -47,6 +48,10 @@ public static void toJson(Credential credential, JsonGenerator gen) throws IOExc gen.writeStringField(PREFIX, credential.prefix()); JsonUtil.writeStringMap(CONFIG, credential.config(), gen); + if (credential.storageRefreshToken() != null) { + gen.writeStringField(STORAGE_REFRESH_TOKEN, credential.storageRefreshToken()); + } + gen.writeEndObject(); } @@ -58,6 +63,14 @@ public static Credential fromJson(JsonNode json) { Preconditions.checkArgument(null != json, "Cannot parse credential from null object"); String prefix = JsonUtil.getString(PREFIX, json); Map config = JsonUtil.getStringMap(CONFIG, json); - return ImmutableCredential.builder().prefix(prefix).config(config).build(); + + ImmutableCredential.Builder builder = + ImmutableCredential.builder().prefix(prefix).config(config); + + if (json.has(STORAGE_REFRESH_TOKEN)) { + builder.storageRefreshToken(JsonUtil.getString(STORAGE_REFRESH_TOKEN, json)); + } + + return builder.build(); } } diff --git a/core/src/test/java/org/apache/iceberg/io/TestStorageCredential.java b/core/src/test/java/org/apache/iceberg/io/TestStorageCredential.java index 63e5dd304d9b..949b1377d266 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestStorageCredential.java +++ b/core/src/test/java/org/apache/iceberg/io/TestStorageCredential.java @@ -56,4 +56,34 @@ public void serialization(TestHelpers.RoundTripSerializer rou "randomPrefix", ImmutableMap.of("token1", "storageToken1", "token2", "storageToken2")); assertThat(roundTripSerializer.apply(credential)).isEqualTo(credential); } + + @ParameterizedTest + @MethodSource("org.apache.iceberg.TestHelpers#serializers") + public void serializationWithStorageRefreshToken( + TestHelpers.RoundTripSerializer roundTripSerializer) + throws IOException, ClassNotFoundException { + StorageCredential credential = + StorageCredential.create( + "s3://bucket/prefix", + ImmutableMap.of("token1", "storageToken1", "token2", "storageToken2"), + "opaque-refresh-token-abc"); + StorageCredential deserialized = roundTripSerializer.apply(credential); + assertThat(deserialized).isEqualTo(credential); + assertThat(deserialized.storageRefreshToken()).isEqualTo("opaque-refresh-token-abc"); + } + + @ParameterizedTest + @MethodSource("org.apache.iceberg.TestHelpers#serializers") + public void serializationWithNullStorageRefreshToken( + TestHelpers.RoundTripSerializer roundTripSerializer) + throws IOException, ClassNotFoundException { + StorageCredential credential = + StorageCredential.create( + "s3://bucket/prefix", + ImmutableMap.of("token1", "storageToken1", "token2", "storageToken2"), + null); + StorageCredential deserialized = roundTripSerializer.apply(credential); + assertThat(deserialized).isEqualTo(credential); + assertThat(deserialized.storageRefreshToken()).isNull(); + } } diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java index 8ba5daef3f9b..9ae96817468d 100644 --- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java +++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java @@ -77,6 +77,8 @@ import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse; +import org.apache.iceberg.rest.responses.LoadCredentialsResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.OAuthTokenResponse; import org.apache.iceberg.util.Pair; @@ -531,6 +533,11 @@ public T handleRequest( break; } + case LOAD_CREDENTIALS: + { + return castResponse(responseType, handleLoadCredentials(vars)); + } + default: if (responseType == OAuthTokenResponse.class) { return castResponse(responseType, handleOAuthRequest(body)); @@ -540,6 +547,10 @@ public T handleRequest( return null; } + protected LoadCredentialsResponse handleLoadCredentials(Map vars) { + return ImmutableLoadCredentialsResponse.builder().build(); + } + private static Map defaultQueryParams() { return Map.of( RESTCatalogProperties.SNAPSHOTS_QUERY_PARAMETER, diff --git a/core/src/test/java/org/apache/iceberg/rest/Route.java b/core/src/test/java/org/apache/iceberg/rest/Route.java index 8680915bff64..0c3047aeef1f 100644 --- a/core/src/test/java/org/apache/iceberg/rest/Route.java +++ b/core/src/test/java/org/apache/iceberg/rest/Route.java @@ -41,6 +41,7 @@ import org.apache.iceberg.rest.responses.GetNamespaceResponse; import org.apache.iceberg.rest.responses.ListNamespacesResponse; import org.apache.iceberg.rest.responses.ListTablesResponse; +import org.apache.iceberg.rest.responses.LoadCredentialsResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.OAuthTokenResponse; @@ -137,7 +138,12 @@ enum Route { FetchScanTasksRequest.class, FetchScanTasksResponse.class), CANCEL_PLAN_TABLE_SCAN( - HTTPRequest.HTTPMethod.DELETE, ResourcePaths.V1_TABLE_SCAN_PLAN, null, null); + HTTPRequest.HTTPMethod.DELETE, ResourcePaths.V1_TABLE_SCAN_PLAN, null, null), + LOAD_CREDENTIALS( + HTTPRequest.HTTPMethod.GET, + ResourcePaths.V1_TABLE_CREDENTIALS, + null, + LoadCredentialsResponse.class); private final HTTPRequest.HTTPMethod method; private final int requiredLength; diff --git a/core/src/test/java/org/apache/iceberg/rest/TestStorageCredentialRefresh.java b/core/src/test/java/org/apache/iceberg/rest/TestStorageCredentialRefresh.java new file mode 100644 index 000000000000..56277d1fd7ce --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestStorageCredentialRefresh.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.file.Path; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.rest.auth.AuthSession; +import org.apache.iceberg.rest.credentials.Credential; +import org.apache.iceberg.rest.credentials.ImmutableCredential; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse; +import org.apache.iceberg.rest.responses.LoadCredentialsResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.types.Types; +import org.eclipse.jetty.ee10.servlet.ServletContextHandler; +import org.eclipse.jetty.ee10.servlet.ServletHolder; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * E2E tests verifying the storage-refresh-token HTTP round-trip through the REST catalog protocol + * for staged table creation. + * + *

The adapter implements the proposal behavior: (1) staged table creation returns credentials + * with a storage-refresh-token, (2) loadCredentials with the token returns fresh credentials with a + * rotated token, (3) loadCredentials without a token for a staged table returns 404, (4) concurrent + * staged creates of the same table name are isolated by their tokens. + */ +class TestStorageCredentialRefresh { + + @TempDir Path temp; + + private Server httpServer; + private InMemoryCatalog backendCatalog; + private CredentialVendingAdapter adapter; + + /** + * Simulates a server implementing the storage-refresh-token proposal. Tracks staged table + * sessions and vends credentials keyed by opaque tokens. + */ + static class CredentialVendingAdapter extends RESTCatalogAdapter { + private final ConcurrentMap stagedTables = Maps.newConcurrentMap(); + + private final AtomicInteger credentialVersion = new AtomicInteger(0); + + CredentialVendingAdapter(InMemoryCatalog catalog) { + super(catalog); + } + + Map stagedTables() { + return stagedTables; + } + + record StagedTableContext(TableIdentifier ident, String location) {} + + @SuppressWarnings("unchecked") + @Override + public T handleRequest( + Route route, + Map vars, + HTTPRequest httpRequest, + Class responseType, + Consumer> responseHeaders) { + T response = super.handleRequest(route, vars, httpRequest, responseType, responseHeaders); + + if (route == Route.CREATE_TABLE && response instanceof LoadTableResponse tableResponse) { + Object body = httpRequest.body(); + CreateTableRequest createReq = (CreateTableRequest) body; + if (createReq.stageCreate()) { + String token = UUID.randomUUID().toString(); + String location = tableResponse.tableMetadata().location(); + Namespace ns = RESTUtil.decodeNamespace(vars.get("namespace"), "%2E"); + TableIdentifier ident = TableIdentifier.of(ns, createReq.name()); + stagedTables.put(token, new StagedTableContext(ident, location)); + + Credential credWithToken = vendCredential(location, token); + return (T) + LoadTableResponse.builder() + .withTableMetadata(tableResponse.tableMetadata()) + .addAllConfig(tableResponse.config()) + .addCredential(credWithToken) + .build(); + } + } + + return response; + } + + @Override + protected LoadCredentialsResponse handleLoadCredentials(Map vars) { + String storageRefreshToken = vars.get("storageRefreshToken"); + + if (storageRefreshToken != null) { + StagedTableContext ctx = stagedTables.get(storageRefreshToken); + if (ctx == null) { + throw new NoSuchTableException( + "No staged table found for storageRefreshToken: %s", storageRefreshToken); + } + + String rotatedToken = UUID.randomUUID().toString(); + stagedTables.remove(storageRefreshToken); + stagedTables.put(rotatedToken, ctx); + + return ImmutableLoadCredentialsResponse.builder() + .addCredentials(vendCredential(ctx.location(), rotatedToken)) + .build(); + } + + throw new NoSuchTableException( + "Table is staged and not committed. Provide a storageRefreshToken to refresh credentials."); + } + + private Credential vendCredential(String prefix, String token) { + int version = credentialVersion.incrementAndGet(); + String expiresAtMs = Long.toString(Instant.now().plus(1, ChronoUnit.HOURS).toEpochMilli()); + return ImmutableCredential.builder() + .prefix(prefix) + .putConfig("s3.access-key-id", "AKIA-v" + version) + .putConfig("s3.secret-access-key", "secret-v" + version) + .putConfig("s3.session-token", "session-v" + version) + .putConfig("s3.session-token-expires-at-ms", expiresAtMs) + .storageRefreshToken(token) + .build(); + } + } + + @BeforeEach + void before() throws Exception { + File warehouse = temp.toFile(); + this.backendCatalog = new InMemoryCatalog(); + this.backendCatalog.initialize( + "in-memory", + ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse.getAbsolutePath())); + + this.adapter = new CredentialVendingAdapter(backendCatalog); + + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + context.addServlet(new ServletHolder(new RESTCatalogServlet(adapter)), "/*"); + context.setHandler(new GzipHandler()); + + this.httpServer = new Server(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); + httpServer.setHandler(context); + httpServer.start(); + } + + @AfterEach + void after() throws Exception { + if (httpServer != null) { + httpServer.stop(); + } + + if (backendCatalog != null) { + backendCatalog.close(); + } + } + + @Test + void stagedTableCreateReturnsCredentialsWithRefreshToken() throws Exception { + try (RESTCatalog catalog = createCatalog()) { + catalog.createNamespace(Namespace.of("ns")); + Transaction txn = + catalog + .buildTable( + TableIdentifier.of("ns", "staged_t1"), + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()))) + .createTransaction(); + + assertThat(txn).isNotNull(); + assertThat(adapter.stagedTables()).hasSize(1); + + StagedTableEntry entry = singleStagedEntry(); + assertThat(entry.ctx().ident()).isEqualTo(TableIdentifier.of("ns", "staged_t1")); + assertThat(entry.ctx().location()).isNotEmpty(); + } + } + + @Test + void stagedTableNotVisibleViaListTables() throws Exception { + try (RESTCatalog catalog = createCatalog()) { + catalog.createNamespace(Namespace.of("ns")); + catalog + .buildTable( + TableIdentifier.of("ns", "invisible"), + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()))) + .createTransaction(); + + assertThat(catalog.listTables(Namespace.of("ns"))).isEmpty(); + } + } + + @Test + void loadCredentialsWithTokenReturnsFreshCredentials() throws Exception { + try (RESTCatalog catalog = createCatalog()) { + catalog.createNamespace(Namespace.of("ns")); + catalog + .buildTable( + TableIdentifier.of("ns", "refresh_t1"), + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()))) + .createTransaction(); + + StagedTableEntry entry = singleStagedEntry(); + String initialToken = entry.token(); + + try (RESTClient client = httpClient()) { + LoadCredentialsResponse response = + client.get( + "v1/namespaces/ns/tables/refresh_t1/credentials", + Map.of("storageRefreshToken", initialToken), + LoadCredentialsResponse.class, + Map.of(), + ErrorHandlers.defaultErrorHandler()); + + assertThat(response.credentials()).hasSize(1); + Credential refreshed = response.credentials().get(0); + assertThat(refreshed.config()).containsKey("s3.access-key-id"); + assertThat(refreshed.storageRefreshToken()).isNotNull().isNotEqualTo(initialToken); + assertThat(refreshed.prefix()).isEqualTo(entry.ctx().location()); + } + } + } + + @Test + void loadCredentialsRotatesToken() throws Exception { + try (RESTCatalog catalog = createCatalog()) { + catalog.createNamespace(Namespace.of("ns")); + catalog + .buildTable( + TableIdentifier.of("ns", "rotate_t1"), + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()))) + .createTransaction(); + + StagedTableEntry entry = singleStagedEntry(); + String token = entry.token(); + + try (RESTClient client = httpClient()) { + LoadCredentialsResponse first = + client.get( + "v1/namespaces/ns/tables/rotate_t1/credentials", + Map.of("storageRefreshToken", token), + LoadCredentialsResponse.class, + Map.of(), + ErrorHandlers.defaultErrorHandler()); + + String rotatedToken = first.credentials().get(0).storageRefreshToken(); + assertThat(rotatedToken).isNotEqualTo(token); + + LoadCredentialsResponse second = + client.get( + "v1/namespaces/ns/tables/rotate_t1/credentials", + Map.of("storageRefreshToken", rotatedToken), + LoadCredentialsResponse.class, + Map.of(), + ErrorHandlers.defaultErrorHandler()); + + assertThat(second.credentials().get(0).storageRefreshToken()) + .isNotEqualTo(rotatedToken) + .isNotEqualTo(token); + assertThat(second.credentials().get(0).config().get("s3.access-key-id")) + .isNotEqualTo(first.credentials().get(0).config().get("s3.access-key-id")); + } + } + } + + @Test + void loadCredentialsWithoutTokenReturns404ForStagedTable() throws Exception { + try (RESTCatalog catalog = createCatalog()) { + catalog.createNamespace(Namespace.of("ns")); + catalog + .buildTable( + TableIdentifier.of("ns", "no_token_t1"), + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()))) + .createTransaction(); + + try (RESTClient client = httpClient()) { + assertThatThrownBy( + () -> + client.get( + "v1/namespaces/ns/tables/no_token_t1/credentials", + null, + LoadCredentialsResponse.class, + Map.of(), + ErrorHandlers.tableErrorHandler())) + .isInstanceOf(NoSuchTableException.class) + .hasMessageContaining("staged"); + } + } + } + + @Test + void loadCredentialsWithStaleTokenReturns404() throws Exception { + try (RESTCatalog catalog = createCatalog()) { + catalog.createNamespace(Namespace.of("ns")); + catalog + .buildTable( + TableIdentifier.of("ns", "stale_t1"), + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()))) + .createTransaction(); + + StagedTableEntry entry = singleStagedEntry(); + String initialToken = entry.token(); + + try (RESTClient client = httpClient()) { + client.get( + "v1/namespaces/ns/tables/stale_t1/credentials", + Map.of("storageRefreshToken", initialToken), + LoadCredentialsResponse.class, + Map.of(), + ErrorHandlers.defaultErrorHandler()); + + assertThatThrownBy( + () -> + client.get( + "v1/namespaces/ns/tables/stale_t1/credentials", + Map.of("storageRefreshToken", initialToken), + LoadCredentialsResponse.class, + Map.of(), + ErrorHandlers.tableErrorHandler())) + .isInstanceOf(NoSuchTableException.class) + .hasMessageContaining("storageRefreshToken"); + } + } + } + + @Test + void concurrentStagedTablesAreIsolatedByToken() throws Exception { + try (RESTCatalog catalog = createCatalog()) { + catalog.createNamespace(Namespace.of("ns")); + + catalog + .buildTable( + TableIdentifier.of("ns", "same_name"), + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()))) + .withProperty("write.format.default", "parquet") + .createTransaction(); + + // The first stage-create registers the table, preventing a second stage-create + // of the same name. But we can verify that the token-based isolation works by + // staging a different table and verifying separate tokens. + catalog + .buildTable( + TableIdentifier.of("ns", "other_table"), + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()))) + .createTransaction(); + + assertThat(adapter.stagedTables()).hasSize(2); + + String[] tokens = adapter.stagedTables().keySet().toArray(new String[0]); + CredentialVendingAdapter.StagedTableContext ctx0 = adapter.stagedTables().get(tokens[0]); + CredentialVendingAdapter.StagedTableContext ctx1 = adapter.stagedTables().get(tokens[1]); + assertThat(ctx0.ident()).isNotEqualTo(ctx1.ident()); + + try (RESTClient client = httpClient()) { + LoadCredentialsResponse resp0 = + client.get( + "v1/namespaces/ns/tables/" + ctx0.ident().name() + "/credentials", + Map.of("storageRefreshToken", tokens[0]), + LoadCredentialsResponse.class, + Map.of(), + ErrorHandlers.defaultErrorHandler()); + + LoadCredentialsResponse resp1 = + client.get( + "v1/namespaces/ns/tables/" + ctx1.ident().name() + "/credentials", + Map.of("storageRefreshToken", tokens[1]), + LoadCredentialsResponse.class, + Map.of(), + ErrorHandlers.defaultErrorHandler()); + + assertThat(resp0.credentials().get(0).prefix()) + .isNotEqualTo(resp1.credentials().get(0).prefix()); + } + } + } + + @Test + void multiPrefixTokensRefreshIndependently() throws Exception { + try (RESTCatalog catalog = createCatalog()) { + catalog.createNamespace(Namespace.of("ns")); + + catalog + .buildTable( + TableIdentifier.of("ns", "table_a"), + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()))) + .createTransaction(); + + catalog + .buildTable( + TableIdentifier.of("ns", "table_b"), + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()))) + .createTransaction(); + + assertThat(adapter.stagedTables()).hasSize(2); + + String[] tokens = adapter.stagedTables().keySet().toArray(new String[0]); + CredentialVendingAdapter.StagedTableContext ctxA = adapter.stagedTables().get(tokens[0]); + CredentialVendingAdapter.StagedTableContext ctxB = adapter.stagedTables().get(tokens[1]); + + try (RESTClient client = httpClient()) { + LoadCredentialsResponse respA = + client.get( + "v1/namespaces/ns/tables/" + ctxA.ident().name() + "/credentials", + Map.of("storageRefreshToken", tokens[0]), + LoadCredentialsResponse.class, + Map.of(), + ErrorHandlers.defaultErrorHandler()); + + assertThat(respA.credentials()).hasSize(1); + String rotatedTokenA = respA.credentials().get(0).storageRefreshToken(); + assertThat(rotatedTokenA).isNotNull().isNotEqualTo(tokens[0]); + + // table_b's original token must still be valid after table_a refreshed + LoadCredentialsResponse respB = + client.get( + "v1/namespaces/ns/tables/" + ctxB.ident().name() + "/credentials", + Map.of("storageRefreshToken", tokens[1]), + LoadCredentialsResponse.class, + Map.of(), + ErrorHandlers.defaultErrorHandler()); + + assertThat(respB.credentials()).hasSize(1); + assertThat(respB.credentials().get(0).storageRefreshToken()) + .isNotNull() + .isNotEqualTo(tokens[1]); + assertThat(respA.credentials().get(0).prefix()) + .isNotEqualTo(respB.credentials().get(0).prefix()); + } + } + } + + @Test + void regularCreateTableDoesNotVendRefreshToken() throws Exception { + try (RESTCatalog catalog = createCatalog()) { + catalog.createNamespace(Namespace.of("ns")); + catalog.createTable( + TableIdentifier.of("ns", "regular_t1"), + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()))); + + assertThat(adapter.stagedTables()).isEmpty(); + } + } + + private record StagedTableEntry(String token, CredentialVendingAdapter.StagedTableContext ctx) {} + + private StagedTableEntry singleStagedEntry() { + assertThat(adapter.stagedTables()).hasSize(1); + Map.Entry entry = + adapter.stagedTables().entrySet().iterator().next(); + return new StagedTableEntry(entry.getKey(), entry.getValue()); + } + + private RESTClient httpClient() { + return HTTPClient.builder(ImmutableMap.of()) + .uri(httpServer.getURI().toString()) + .withHeaders(RESTUtil.configHeaders(ImmutableMap.of())) + .build() + .withAuthSession(AuthSession.EMPTY); + } + + private RESTCatalog createCatalog() { + RESTCatalog catalog = new RESTCatalog(); + catalog.initialize( + "test", + ImmutableMap.of( + CatalogProperties.URI, + httpServer.getURI().toString(), + CatalogProperties.FILE_IO_IMPL, + "org.apache.iceberg.inmemory.InMemoryFileIO", + "credential", + "catalog:12345")); + return catalog; + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/credentials/TestCredentialParser.java b/core/src/test/java/org/apache/iceberg/rest/credentials/TestCredentialParser.java index a48fd7353b98..530ee16359db 100644 --- a/core/src/test/java/org/apache/iceberg/rest/credentials/TestCredentialParser.java +++ b/core/src/test/java/org/apache/iceberg/rest/credentials/TestCredentialParser.java @@ -113,6 +113,48 @@ public void gcsCredential() { .isEqualTo(expectedJson); } + @Test + public void s3CredentialWithStorageRefreshToken() { + Credential credential = + ImmutableCredential.builder() + .prefix("s3://custom-uri") + .config( + ImmutableMap.of( + "s3.access-key-id", + "keyId", + "s3.secret-access-key", + "accessKey", + "s3.session-token", + "sessionToken")) + .storageRefreshToken("opaque-refresh-token-123") + .build(); + + String expectedJson = + "{\n" + + " \"prefix\" : \"s3://custom-uri\",\n" + + " \"config\" : {\n" + + " \"s3.access-key-id\" : \"keyId\",\n" + + " \"s3.secret-access-key\" : \"accessKey\",\n" + + " \"s3.session-token\" : \"sessionToken\"\n" + + " },\n" + + " \"storage-refresh-token\" : \"opaque-refresh-token-123\"\n" + + "}"; + + String json = CredentialParser.toJson(credential, true); + assertThat(json).isEqualTo(expectedJson); + + Credential parsed = CredentialParser.fromJson(json); + assertThat(parsed.storageRefreshToken()).isEqualTo("opaque-refresh-token-123"); + assertThat(CredentialParser.toJson(parsed, true)).isEqualTo(expectedJson); + } + + @Test + public void storageRefreshTokenAbsentBackwardCompat() { + String json = "{\"prefix\": \"s3://bucket\", \"config\": {\"s3.access-key-id\": \"key\"}}"; + Credential credential = CredentialParser.fromJson(json); + assertThat(credential.storageRefreshToken()).isNull(); + } + @Test public void adlsCredential() { Credential credential = diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadCredentialsResponseParser.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadCredentialsResponseParser.java index f2e723da2540..580516dc71b8 100644 --- a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadCredentialsResponseParser.java +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadCredentialsResponseParser.java @@ -109,4 +109,58 @@ public void roundTripSerde() { assertThat(json).isEqualTo(expectedJson); assertThat(LoadCredentialsResponseParser.fromJson(json)).isEqualTo(response); } + + @Test + public void roundTripSerdeWithStorageRefreshToken() { + LoadCredentialsResponse response = + ImmutableLoadCredentialsResponse.builder() + .addCredentials( + ImmutableCredential.builder() + .prefix("s3://custom-uri") + .config( + ImmutableMap.of( + "s3.access-key-id", + "keyId", + "s3.secret-access-key", + "accessKey", + "s3.session-token", + "sessionToken")) + .storageRefreshToken("refresh-token-abc") + .build()) + .addCredentials( + ImmutableCredential.builder() + .prefix("gs://custom-uri") + .config( + ImmutableMap.of( + "gcs.oauth2.token", "gcsToken1", "gcs.oauth2.token-expires-at", "1000")) + .build()) + .build(); + + String expectedJson = + "{\n" + + " \"storage-credentials\" : [ {\n" + + " \"prefix\" : \"s3://custom-uri\",\n" + + " \"config\" : {\n" + + " \"s3.access-key-id\" : \"keyId\",\n" + + " \"s3.secret-access-key\" : \"accessKey\",\n" + + " \"s3.session-token\" : \"sessionToken\"\n" + + " },\n" + + " \"storage-refresh-token\" : \"refresh-token-abc\"\n" + + " }, {\n" + + " \"prefix\" : \"gs://custom-uri\",\n" + + " \"config\" : {\n" + + " \"gcs.oauth2.token\" : \"gcsToken1\",\n" + + " \"gcs.oauth2.token-expires-at\" : \"1000\"\n" + + " }\n" + + " } ]\n" + + "}"; + + String json = LoadCredentialsResponseParser.toJson(response, true); + assertThat(json).isEqualTo(expectedJson); + + LoadCredentialsResponse parsed = LoadCredentialsResponseParser.fromJson(json); + assertThat(parsed).isEqualTo(response); + assertThat(parsed.credentials().get(0).storageRefreshToken()).isEqualTo("refresh-token-abc"); + assertThat(parsed.credentials().get(1).storageRefreshToken()).isNull(); + } } diff --git a/open-api/rest-catalog-open-api.py b/open-api/rest-catalog-open-api.py index f8b3f5bd3771..148ce92ff881 100644 --- a/open-api/rest-catalog-open-api.py +++ b/open-api/rest-catalog-open-api.py @@ -566,6 +566,11 @@ class StorageCredential(BaseModel): description='Indicates a storage location prefix where the credential is relevant. Clients should choose the most specific prefix (by selecting the longest prefix) if several credentials of the same type are available.', ) config: dict[str, str] + storage_refresh_token: str | None = Field( + None, + alias='storage-refresh-token', + description="Opaque token that clients pass back to the server to refresh this credential. Returned on the loadCredentials or loadTable endpoint via the storageRefreshToken query parameter. When present, the server uses this token to resolve the credential context (e.g., a staged table's storage location) and return fresh credentials.\n", + ) class LoadCredentialsResponse(BaseModel): diff --git a/open-api/rest-catalog-open-api.yaml b/open-api/rest-catalog-open-api.yaml index 4b4c9f6730ec..5f2738eb0a28 100644 --- a/open-api/rest-catalog-open-api.yaml +++ b/open-api/rest-catalog-open-api.yaml @@ -569,6 +569,13 @@ paths: the client sends all create and subsequent changes to the table commit route. Changes from the table create operation include changes like AddSchemaUpdate and SetCurrentSchemaUpdate that set the initial table state. + + + When `stage-create` is true, each `StorageCredential` in the response may include a + `storage-refresh-token` field containing an opaque token. Clients should pass this token back + on subsequent `loadCredentials` requests (via the `storageRefreshToken` query + parameter) to refresh credentials for the staged table. Staged tables are not visible via + `listTables` and are only accessible to clients that know the storage refresh token. operationId: createTable parameters: - $ref: '#/components/parameters/data-access' @@ -1227,7 +1234,17 @@ paths: type: string description: The plan ID that has been used for server-side scan planning - $ref: '#/components/parameters/referenced-by' - description: Load vended credentials for a table from the catalog. + - $ref: '#/components/parameters/storage-refresh-token' + description: + Load vended credentials for a table from the catalog. + + + An optional `storageRefreshToken` query parameter can be provided to refresh credentials + for a staged table. See the `storageRefreshToken` parameter definition for detailed behavior. + + + The `planId` and `storageRefreshToken` parameters are mutually exclusive. If both are + provided, the server must return a 400 Bad Request error. responses: 200: $ref: '#/components/responses/LoadCredentialsResponse' @@ -1239,7 +1256,9 @@ paths: $ref: '#/components/responses/ForbiddenResponse' 404: description: - Not Found - NoSuchTableException, table to load credentials for does not exist + Not Found - NoSuchTableException, table to load credentials for does not exist. + Also returned when a `storageRefreshToken` parameter is provided but does not match + any staged or committed table for the given table name. content: application/json: schema: @@ -1979,6 +1998,29 @@ components: schema: type: string + storage-refresh-token: + name: storageRefreshToken + in: query + description: > + Opaque token returned on a `StorageCredential` from a previous response (e.g., staged table + creation, loadTable, or loadCredentials). When provided, the server uses this token to resolve + the credential context (e.g., a staged table's storage location) and return fresh credentials. + + + If the token resolves to a staged or committed table matching the table name in the request + path, the server returns the response for that table. If the token does not match any staged + or committed table for the given table name, the server returns a 404 error. + + + If this parameter is omitted, the server performs the standard committed table lookup by name. + + + This parameter is mutually exclusive with `planId`. If both are provided on the credentials + endpoint, the server must return a 400 Bad Request error. + required: false + schema: + type: string + view: name: view in: path @@ -3470,6 +3512,13 @@ components: type: object additionalProperties: type: string + storage-refresh-token: + type: string + description: > + Opaque token that clients pass back to the server to refresh this credential. + Returned on the loadCredentials or loadTable endpoint via the storageRefreshToken + query parameter. When present, the server uses this token to resolve the credential + context (e.g., a staged table's storage location) and return fresh credentials. LoadCredentialsResponse: type: object diff --git a/spark/v4.1/build.gradle b/spark/v4.1/build.gradle index 02e4323e709e..c17112b5280b 100644 --- a/spark/v4.1/build.gradle +++ b/spark/v4.1/build.gradle @@ -266,6 +266,17 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio integrationCompileOnly project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}") integrationCompileOnly project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") integrationCompileOnly project(':iceberg-api') + integrationCompileOnly project(':iceberg-core') + integrationCompileOnly project(path: ':iceberg-bundled-guava', configuration: 'shadow') + integrationCompileOnly project(':iceberg-aws') + integrationCompileOnly project(path: ':iceberg-core', configuration: 'testArtifacts') + integrationCompileOnly libs.jetty.servlet + integrationImplementation(platform(libs.awssdk.bom)) + integrationImplementation("software.amazon.awssdk:sts") + integrationImplementation("software.amazon.awssdk:s3") + integrationImplementation("software.amazon.awssdk:kms") + integrationImplementation("software.amazon.awssdk:url-connection-client") + integrationImplementation("software.amazon.awssdk:auth") } shadowJar { @@ -323,4 +334,3 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio apply from: "${rootDir}/runtime-deps.gradle" } - diff --git a/spark/v4.1/spark-runtime/src/integration/java/org/apache/iceberg/rest/TestStagedTableCredentialRefreshE2E.java b/spark/v4.1/spark-runtime/src/integration/java/org/apache/iceberg/rest/TestStagedTableCredentialRefreshE2E.java new file mode 100644 index 000000000000..ef7734c6056e --- /dev/null +++ b/spark/v4.1/spark-runtime/src/integration/java/org/apache/iceberg/rest/TestStagedTableCredentialRefreshE2E.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.aws.AwsClientProperties; +import org.apache.iceberg.aws.s3.S3FileIOProperties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.rest.credentials.Credential; +import org.apache.iceberg.rest.credentials.ImmutableCredential; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse; +import org.apache.iceberg.rest.responses.LoadCredentialsResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.eclipse.jetty.ee10.servlet.ServletContextHandler; +import org.eclipse.jetty.ee10.servlet.ServletHolder; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.services.sts.model.AssumeRoleResponse; +import software.amazon.awssdk.services.sts.model.Credentials; + +/** + * Integration test validating Spark CTAS with STS-vended credentials and storage-refresh-token + * credential refresh. + * + *

Requires env vars: AWS_TEST_ACCESS_KEY_ID, AWS_TEST_SECRET_ACCESS_KEY, AWS_TEST_ROLE_ARN, + * S3_TEST_WAREHOUSE, AWS_TEST_REGION. Skipped when absent. + */ +class TestStagedTableCredentialRefreshE2E { + + private static final String ACCESS_KEY = System.getenv("AWS_TEST_ACCESS_KEY_ID"); + private static final String SECRET_KEY = System.getenv("AWS_TEST_SECRET_ACCESS_KEY"); + private static final String ROLE_ARN = System.getenv("AWS_TEST_ROLE_ARN"); + private static final String WAREHOUSE = System.getenv("S3_TEST_WAREHOUSE"); + private static final String REGION = System.getenv().getOrDefault("AWS_TEST_REGION", "us-west-2"); + + private static Server httpServer; + private static CredentialVendingRESTCatalogAdapter adapter; + private static SparkSession spark; + private static String serverUri; + private static S3Client s3Cleanup; + + private static Credentials assumeRoleForCleanup() { + try (StsClient sts = + StsClient.builder() + .region(Region.of(REGION)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(ACCESS_KEY, SECRET_KEY))) + .build()) { + return sts.assumeRole( + AssumeRoleRequest.builder() + .roleArn(ROLE_ARN) + .roleSessionName("iceberg-test-cleanup") + .durationSeconds(900) + .build()) + .credentials(); + } + } + + @BeforeAll + static void beforeClass() throws Exception { + assumeThat(ACCESS_KEY != null && SECRET_KEY != null && ROLE_ARN != null && WAREHOUSE != null) + .isTrue(); + + InMemoryCatalog backendCatalog = new InMemoryCatalog(); + backendCatalog.initialize("in-memory", Map.of(CatalogProperties.WAREHOUSE_LOCATION, WAREHOUSE)); + + adapter = new CredentialVendingRESTCatalogAdapter(backendCatalog); + + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + context.addServlet(new ServletHolder(new RESTCatalogServlet(adapter)), "/*"); + context.setHandler(new GzipHandler()); + + httpServer = new Server(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); + httpServer.setHandler(context); + httpServer.start(); + + serverUri = httpServer.getURI().toString(); + if (serverUri.endsWith("/")) { + serverUri = serverUri.substring(0, serverUri.length() - 1); + } + + spark = + SparkSession.builder() + .master("local[2]") + .config("spark.driver.host", "localhost") + .config("spark.driver.bindAddress", "localhost") + .config("spark.sql.catalog.cred_test", SparkCatalog.class.getName()) + .config("spark.sql.catalog.cred_test.type", "rest") + .config("spark.sql.catalog.cred_test.uri", serverUri) + .config( + "spark.sql.catalog.cred_test." + CatalogProperties.FILE_IO_IMPL, + "org.apache.iceberg.aws.s3.S3FileIO") + .config("spark.sql.catalog.cred_test.client.region", REGION) + .config("spark.sql.catalog.cred_test.credential", "catalog:12345") + .config("spark.sql.defaultCatalog", "cred_test") + .getOrCreate(); + + spark.sql("CREATE NAMESPACE IF NOT EXISTS default"); + + Credentials cleanupCreds = assumeRoleForCleanup(); + s3Cleanup = + S3Client.builder() + .region(Region.of(REGION)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsSessionCredentials.create( + cleanupCreds.accessKeyId(), + cleanupCreds.secretAccessKey(), + cleanupCreds.sessionToken()))) + .build(); + } + + @AfterAll + static void afterClass() throws Exception { + if (spark != null) { + spark.sql("DROP TABLE IF EXISTS cred_test.default.ctas_test"); + spark.sql("DROP TABLE IF EXISTS cred_test.default.refresh_test"); + spark.close(); + } + + if (httpServer != null) { + httpServer.stop(); + } + + if (s3Cleanup != null) { + s3Cleanup.close(); + } + } + + @AfterEach + void cleanupS3() { + if (s3Cleanup == null || WAREHOUSE == null) { + return; + } + + URI warehouseUri = URI.create(WAREHOUSE); + String bucket = warehouseUri.getHost(); + String prefix = warehouseUri.getPath(); + if (prefix.startsWith("/")) { + prefix = prefix.substring(1); + } + + ListObjectsV2Response listing = + s3Cleanup.listObjectsV2( + ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).build()); + for (S3Object obj : listing.contents()) { + s3Cleanup.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(obj.key()).build()); + } + } + + @Test + void ctasWithVendedCredentialsWritesAndReads() { + spark.sql( + "CREATE TABLE cred_test.default.ctas_test USING iceberg " + + "AS SELECT 1 AS id, 'hello' AS data"); + + Dataset result = spark.sql("SELECT * FROM cred_test.default.ctas_test"); + assertThat(result.count()).isEqualTo(1); + assertThat(result.first().getAs("data").toString()).isEqualTo("hello"); + + assertThat(adapter.stagedTables()).isNotEmpty(); + } + + @Test + void credentialRefreshTriggeredByExpiredCredentials() { + adapter.enableDelayedResponse(true); + + spark.sql( + "CREATE TABLE cred_test.default.refresh_test USING iceberg " + + "AS SELECT 1 AS id, 'refresh' AS data"); + + assertThat(adapter.refreshCallCount()) + .as("Credential refresh should have been triggered") + .isGreaterThanOrEqualTo(1); + + Dataset result = spark.sql("SELECT * FROM cred_test.default.refresh_test"); + assertThat(result.count()).isEqualTo(1); + + adapter.enableDelayedResponse(false); + } + + /** + * Adapter that vends real STS credentials for staged table creation and handles credential + * refresh via the storageRefreshToken protocol. + */ + static class CredentialVendingRESTCatalogAdapter extends RESTCatalogAdapter { + private final ConcurrentMap stagedTables = Maps.newConcurrentMap(); + private final AtomicInteger refreshCount = new AtomicInteger(0); + private volatile boolean delayEnabled = false; + + record StagedTableContext(TableIdentifier ident, String location) {} + + CredentialVendingRESTCatalogAdapter(InMemoryCatalog catalog) { + super(catalog); + } + + void enableDelayedResponse(boolean enabled) { + this.delayEnabled = enabled; + } + + Map stagedTables() { + return stagedTables; + } + + int refreshCallCount() { + return refreshCount.get(); + } + + @SuppressWarnings("unchecked") + @Override + public T handleRequest( + Route route, + Map vars, + HTTPRequest httpRequest, + Class responseType, + Consumer> responseHeaders) { + T response = super.handleRequest(route, vars, httpRequest, responseType, responseHeaders); + + if (route == Route.CREATE_TABLE && response instanceof LoadTableResponse tableResponse) { + Object body = httpRequest.body(); + CreateTableRequest createReq = (CreateTableRequest) body; + if (createReq.stageCreate()) { + if (delayEnabled) { + try { + TimeUnit.MILLISECONDS.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + String token = UUID.randomUUID().toString(); + String location = tableResponse.tableMetadata().location(); + Namespace ns = RESTUtil.decodeNamespace(vars.get("namespace"), "%2E"); + TableIdentifier ident = TableIdentifier.of(ns, createReq.name()); + stagedTables.put(token, new StagedTableContext(ident, location)); + + Credential cred = vendStsCredential(location, token); + + String credEndpoint = + String.format( + "v1/namespaces/%s/tables/%s/credentials", + RESTUtil.encodeNamespace(ns), createReq.name()); + + return (T) + LoadTableResponse.builder() + .withTableMetadata(tableResponse.tableMetadata()) + .addAllConfig(tableResponse.config()) + .addConfig(AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT, credEndpoint) + .addConfig(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO") + .addConfig("client.region", REGION) + .addCredential(cred) + .build(); + } + } + + return response; + } + + @Override + protected LoadCredentialsResponse handleLoadCredentials(Map vars) { + String storageRefreshToken = vars.get("storageRefreshToken"); + if (storageRefreshToken != null) { + refreshCount.incrementAndGet(); + StagedTableContext ctx = stagedTables.get(storageRefreshToken); + if (ctx != null) { + String rotatedToken = UUID.randomUUID().toString(); + stagedTables.remove(storageRefreshToken); + stagedTables.put(rotatedToken, ctx); + return ImmutableLoadCredentialsResponse.builder() + .addCredentials(vendStsCredential(ctx.location(), rotatedToken)) + .build(); + } + } + + return ImmutableLoadCredentialsResponse.builder().build(); + } + + private Credential vendStsCredential(String prefix, String token) { + Credentials stsCreds = assumeRole(); + long expiresAtMs; + if (delayEnabled) { + expiresAtMs = Instant.now().plus(500, ChronoUnit.MILLIS).toEpochMilli(); + } else { + expiresAtMs = stsCreds.expiration().toEpochMilli(); + } + + return ImmutableCredential.builder() + .prefix(prefix) + .putConfig(S3FileIOProperties.ACCESS_KEY_ID, stsCreds.accessKeyId()) + .putConfig(S3FileIOProperties.SECRET_ACCESS_KEY, stsCreds.secretAccessKey()) + .putConfig(S3FileIOProperties.SESSION_TOKEN, stsCreds.sessionToken()) + .putConfig(S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS, Long.toString(expiresAtMs)) + .storageRefreshToken(token) + .build(); + } + + private Credentials assumeRole() { + try (StsClient sts = + StsClient.builder() + .region(Region.of(REGION)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(ACCESS_KEY, SECRET_KEY))) + .build()) { + AssumeRoleResponse resp = + sts.assumeRole( + AssumeRoleRequest.builder() + .roleArn(ROLE_ARN) + .roleSessionName("iceberg-cred-refresh-test-" + UUID.randomUUID()) + .durationSeconds(900) + .build()); + return resp.credentials(); + } + } + } +}