Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Core, AWS: Add storage-refresh-token support for staged table credent…
…ial refresh
  • Loading branch information
manisin committed Apr 3, 2026
commit 9a3383de873ac043bb2a1eb659de379e39dea7e0
19 changes: 15 additions & 4 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.rest.RESTCatalogProperties;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
Expand Down Expand Up @@ -415,11 +416,16 @@ private Map<String, PrefixedS3Client> clientByPrefix() {
.collect(Collectors.toList())
.forEach(
storageCredential -> {
Map<String, String> propertiesWithCredentials =
ImmutableMap.Builder<String, String> propsBuilder =
ImmutableMap.<String, String>builder()
.putAll(properties)
.putAll(storageCredential.config())
.buildKeepingLast();
.putAll(storageCredential.config());
if (storageCredential.storageRefreshToken() != null) {
propsBuilder.put(
RESTCatalogProperties.REST_STORAGE_REFRESH_TOKEN,
storageCredential.storageRefreshToken());
}
Map<String, String> propertiesWithCredentials = propsBuilder.buildKeepingLast();

localClientByPrefix.put(
storageCredential.prefix(),
Expand Down Expand Up @@ -461,11 +467,16 @@ private void refreshStorageCredentials() {
return;
}

// Note: the VendedCredentialsProvider is constructed from the original FileIO properties,
// which contain the initial storageRefreshToken. If the server rotates the token on each
// refresh response, subsequent refreshes will use the stale original token. Servers that
// implement single-use token rotation for storageRefreshToken should be aware that the
// S3FileIO refresh loop does not yet propagate rotated tokens across cycles.
try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(properties)) {
List<StorageCredential> refreshed =
provider.fetchCredentials().credentials().stream()
.filter(c -> c.prefix().startsWith(ROOT_PREFIX))
.map(c -> StorageCredential.create(c.prefix(), c.config()))
.map(c -> StorageCredential.create(c.prefix(), c.config(), c.storageRefreshToken()))
.collect(Collectors.toList());

if (!refreshed.isEmpty() && !isResourceClosed.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.ErrorHandlers;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTCatalogProperties;
Expand All @@ -53,6 +54,7 @@ public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAut
private final String catalogEndpoint;
private final String credentialsEndpoint;
private final String planId;
private final String storageRefreshToken;
private AuthManager authManager;
private AuthSession authSession;

Expand All @@ -69,6 +71,8 @@ private VendedCredentialsProvider(Map<String, String> properties) {
this.catalogEndpoint = properties.get(CatalogProperties.URI);
this.credentialsEndpoint = properties.get(URI);
this.planId = properties.getOrDefault(RESTCatalogProperties.REST_SCAN_PLAN_ID, null);
this.storageRefreshToken =
properties.getOrDefault(RESTCatalogProperties.REST_STORAGE_REFRESH_TOKEN, null);
}

@Override
Expand Down Expand Up @@ -108,10 +112,19 @@ private RESTClient httpClient() {
}

LoadCredentialsResponse fetchCredentials() {
Map<String, String> queryParams = null;
if (null != planId) {
queryParams = Maps.newHashMap();
queryParams.put("planId", planId);
} else if (null != storageRefreshToken) {
queryParams = Maps.newHashMap();
queryParams.put("storageRefreshToken", storageRefreshToken);
}

return httpClient()
.get(
credentialsEndpoint,
null != planId ? Map.of("planId", planId) : null,
queryParams,
LoadCredentialsResponse.class,
Map.of(),
ErrorHandlers.defaultErrorHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,86 @@ public void credentialRefreshSchedulesNextRefresh() {
}
}

@Test
public void refreshedCredentialsPreserveStorageRefreshToken() {
String nearExpiryMs = Long.toString(Instant.now().plus(3, ChronoUnit.MINUTES).toEpochMilli());

StorageCredential initialCredential =
StorageCredential.create(
"s3://bucket/path",
ImmutableMap.of(
S3FileIOProperties.ACCESS_KEY_ID,
"initialAccessKey",
S3FileIOProperties.SECRET_ACCESS_KEY,
"initialSecretKey",
S3FileIOProperties.SESSION_TOKEN,
"initialToken",
S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS,
nearExpiryMs),
"initial-refresh-token");

String refreshedExpiryMs =
Long.toString(Instant.now().plus(1, ChronoUnit.HOURS).toEpochMilli());
String rotatedToken = "rotated-refresh-token";
LoadCredentialsResponse refreshResponse =
ImmutableLoadCredentialsResponse.builder()
.addCredentials(
ImmutableCredential.builder()
.prefix("s3://bucket/path")
.config(
ImmutableMap.of(
S3FileIOProperties.ACCESS_KEY_ID,
"refreshedAccessKey",
S3FileIOProperties.SECRET_ACCESS_KEY,
"refreshedSecretKey",
S3FileIOProperties.SESSION_TOKEN,
"refreshedToken",
S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS,
refreshedExpiryMs))
.storageRefreshToken(rotatedToken)
.build())
.build();

HttpRequest mockRequest = request("/v1/credentials").withMethod(HttpMethod.GET.name());
HttpResponse mockResponse =
response(LoadCredentialsResponseParser.toJson(refreshResponse)).withStatusCode(200);
mockServer.when(mockRequest).respond(mockResponse);

Map<String, String> properties =
ImmutableMap.of(
AwsProperties.CLIENT_FACTORY,
StaticClientFactory.class.getName(),
VendedCredentialsProvider.URI,
CREDENTIALS_URI,
CatalogProperties.URI,
CATALOG_URI,
"init-creation-stacktrace",
"false");

StaticClientFactory.client = null;
try (S3FileIO fileIO = new S3FileIO()) {
fileIO.initialize(properties);
fileIO.setCredentials(List.of(initialCredential));

fileIO.client();

Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> mockServer.verify(mockRequest, VerificationTimes.atLeast(1)));

Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(
() -> {
List<StorageCredential> credentials = fileIO.credentials();
assertThat(credentials).hasSize(1);
assertThat(credentials.get(0).config())
.containsEntry(S3FileIOProperties.ACCESS_KEY_ID, "refreshedAccessKey");
assertThat(credentials.get(0).storageRefreshToken()).isEqualTo(rotatedToken);
});
}
}

@Test
public void credentialRefreshWithinFiveMinuteWindow() {
// Set up credentials expiring within the next 5 minutes so the refresh triggers immediately
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,136 @@ public void planIdQueryParamIsSent() {
mockServer.verify(mockRequest, VerificationTimes.exactly(2));
}

@Test
public void storageRefreshTokenQueryParamIsSent() {
String refreshToken = "opaque-refresh-token-123";
HttpRequest mockRequest =
request("/v1/credentials")
.withMethod(HttpMethod.GET.name())
.withQueryStringParameter("storageRefreshToken", refreshToken);
Credential credential =
ImmutableCredential.builder()
.prefix("s3")
.config(
ImmutableMap.of(
S3FileIOProperties.ACCESS_KEY_ID,
"randomAccessKey",
S3FileIOProperties.SECRET_ACCESS_KEY,
"randomSecretAccessKey",
S3FileIOProperties.SESSION_TOKEN,
"sessionToken",
S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS,
Long.toString(Instant.now().plus(1, ChronoUnit.MINUTES).toEpochMilli())))
.build();
LoadCredentialsResponse response =
ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build();

HttpResponse mockResponse =
response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
mockServer.when(mockRequest).respond(mockResponse);

Map<String, String> properties =
ImmutableMap.<String, String>builder()
.putAll(PROPERTIES)
.put(RESTCatalogProperties.REST_STORAGE_REFRESH_TOKEN, refreshToken)
.build();

try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(properties)) {
AwsCredentials awsCredentials = provider.resolveCredentials();
verifyCredentials(awsCredentials, credential);

AwsCredentials refreshedCredentials = provider.resolveCredentials();
assertThat(refreshedCredentials).isNotSameAs(awsCredentials);
verifyCredentials(refreshedCredentials, credential);
}

mockServer.verify(mockRequest, VerificationTimes.exactly(2));
}

@Test
public void storageRefreshTokenNotSentWhenNull() {
HttpRequest mockRequest = request("/v1/credentials").withMethod(HttpMethod.GET.name());
Credential credential =
ImmutableCredential.builder()
.prefix("s3")
.config(
ImmutableMap.of(
S3FileIOProperties.ACCESS_KEY_ID,
"randomAccessKey",
S3FileIOProperties.SECRET_ACCESS_KEY,
"randomSecretAccessKey",
S3FileIOProperties.SESSION_TOKEN,
"sessionToken",
S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS,
Long.toString(Instant.now().plus(1, ChronoUnit.HOURS).toEpochMilli())))
.build();
LoadCredentialsResponse response =
ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build();

HttpResponse mockResponse =
response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
mockServer.when(mockRequest).respond(mockResponse);

try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
AwsCredentials awsCredentials = provider.resolveCredentials();
verifyCredentials(awsCredentials, credential);
}

mockServer.verify(
request("/v1/credentials")
.withMethod(HttpMethod.GET.name())
.withQueryStringParameter("storageRefreshToken"),
VerificationTimes.never());
}

@Test
public void planIdTakesPrecedenceOverStorageRefreshToken() {
String planId = "plan-123";
HttpRequest mockRequest =
request("/v1/credentials")
.withMethod(HttpMethod.GET.name())
.withQueryStringParameter("planId", planId);
Credential credential =
ImmutableCredential.builder()
.prefix("s3")
.config(
ImmutableMap.of(
S3FileIOProperties.ACCESS_KEY_ID,
"randomAccessKey",
S3FileIOProperties.SECRET_ACCESS_KEY,
"randomSecretAccessKey",
S3FileIOProperties.SESSION_TOKEN,
"sessionToken",
S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS,
Long.toString(Instant.now().plus(1, ChronoUnit.MINUTES).toEpochMilli())))
.build();
LoadCredentialsResponse response =
ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build();

HttpResponse mockResponse =
response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
mockServer.when(mockRequest).respond(mockResponse);

Map<String, String> properties =
ImmutableMap.<String, String>builder()
.putAll(PROPERTIES)
.put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId)
.put(RESTCatalogProperties.REST_STORAGE_REFRESH_TOKEN, "should-be-ignored")
.build();

try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(properties)) {
AwsCredentials awsCredentials = provider.resolveCredentials();
verifyCredentials(awsCredentials, credential);
}

mockServer.verify(mockRequest, VerificationTimes.atLeast(1));
mockServer.verify(
request("/v1/credentials")
.withMethod(HttpMethod.GET.name())
.withQueryStringParameter("storageRefreshToken"),
VerificationTimes.never());
}

private void verifyCredentials(AwsCredentials awsCredentials, Credential credential) {
assertThat(awsCredentials).isInstanceOf(AwsSessionCredentials.class);
AwsSessionCredentials creds = (AwsSessionCredentials) awsCredentials;
Expand Down
Loading
Loading