Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ All notable changes to this project will be documented in this file. See [standa

### Added

- Typed error hierarchy for the Stream API per [CHA-2958](https://linear.app/stream/issue/CHA-2958). New checked subclasses of the existing `StreamException`:
* `StreamApiException` — HTTP 4xx/5xx with the `APIError` envelope parsed. Getters: `getStatusCode`, `getCode`, `getMessage`, `getExceptionFields`, `isUnrecoverable`, `getRawResponseBody`, `getMoreInfo`, `getDetails`. The `unrecoverable` and `details` fields were previously dropped on the floor.
* `StreamRateLimitException` — HTTP 429. Subclass of `StreamApiException` with `Duration getRetryAfter()` populated from `Retry-After` (RFC 7231 §7.1.3 integer seconds + HTTP-date). `null` when header absent or unparseable.
* `StreamTransportException` — connection reset, timeout, DNS, TLS, etc. Carries `getErrorType()` matching the logging spec enum (`connection_reset` / `timeout` / `dns_failure` / `tls_handshake_failed` / `unknown`). Cause chain preserved.
* `StreamTaskException` — async task observed with `status: "failed"`. Carries `getTaskId`, `getErrorType`, `getDescription`, `getStackTraceText`, `getVersion`. (`getStackTraceText` rather than `getStackTrace` to avoid colliding with `Throwable.getStackTrace()`.)
- `StreamSDKClient.waitForTask(taskId)` — main-source helper that polls the task endpoint to a terminal state. Returns `GetTaskResponse` on `completed`, throws `StreamTaskException` on `failed`, throws `StreamTransportException(errorType=timeout)` if the wait elapses (defaults: 1s poll, 60s timeout; overloaded with explicit `Duration`s). Replaces the test-only `ChatTestBase.waitForTask` (removed).
- Webhook handling spec helpers (CHA-2961): `UnknownEvent` class for forward-compat;
`gunzipPayload`, `decodeSqsPayload`, `decodeSnsPayload` primitives;
`verifyAndParseWebhook` HTTP composite; `parseSqs` / `parseSns`
Expand All @@ -32,6 +38,9 @@ All notable changes to this project will be documented in this file. See [standa

### Changed

- Exceptions remain **checked** (CHA-2958 §9.3). All new subclasses extend the existing checked `StreamException`, so `throws StreamException` declarations continue to compile and `catch (StreamException)` continues to handle every SDK error.
- `StreamRequest` now throws `StreamApiException` (or `StreamRateLimitException` for 429) for HTTP-response errors, and `StreamTransportException` for IO failures. The static type on declarations is still `StreamException` — these are subclasses. The static `StreamException.build(Throwable)` factory continues to wrap into a base `StreamException` (the request path classifies transport failures directly so this factory is no longer auto-routed). `StreamException.getResponseData()` is still populated on API exceptions for back-compat.
- The `APIError` envelope parser (formerly `StreamException.ResponseData`) gained the previously-dropped `details` and `unrecoverable` fields.
- **Default per-call `RequestTimeout` is now `30s` (was `10s`).** Aligns with CHA-2956 cross-SDK contract. The previous `10s` came from the hardcoded `timeout = 10000` ms in `StreamHTTPClient`. To keep the old ceiling, pass `new StreamClientOptions().setRequestTimeout(Duration.ofSeconds(10))`.
- Default idle-connection lifetime now `55s` (was `59s` via the `STREAM_API_CONNECTION_MAX_AGE` env var path). 55s sits 5s below the typical 60s LB idle timeout for safer eviction. `MaxConnsPerHost` default is unchanged at `5`.
- No other breaking changes. Existing `StreamSDKClient(apiKey, secret)`, `StreamSDKClient(apiKey, secret, OkHttpClient)`, and `StreamSDKClient(Properties)` constructors are preserved.
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/io/getstream/exceptions/RetryAfterParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.getstream.exceptions;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import org.jetbrains.annotations.Nullable;

/**
* Parses the HTTP {@code Retry-After} header per RFC 7231 §7.1.3.
*
* <p>Accepts either non-negative integer seconds (e.g. {@code 30}) or an HTTP-date (IMF-fixdate,
* e.g. {@code Fri, 31 Dec 2026 23:59:59 GMT}). HTTP-date values are converted to a delta from
* {@code now()}, clamped to {@code >= 0}. Returns {@code null} when the header is absent or
* unparseable — graceful per CHA-2958 §7.
*/
final class RetryAfterParser {
private RetryAfterParser() {}

@Nullable
static Duration parse(@Nullable String header) {
if (header == null) {
return null;
}
String trimmed = header.trim();
if (trimmed.isEmpty()) {
return null;
}
// Integer seconds path.
try {
long seconds = Long.parseLong(trimmed);
if (seconds < 0) {
return Duration.ZERO;
}
return Duration.ofSeconds(seconds);
} catch (NumberFormatException ignored) {
// Fall through to HTTP-date parsing.
}
// HTTP-date (IMF-fixdate). RFC 7231 §7.1.1.1 says HTTP/1.1 servers MUST generate this form.
try {
ZonedDateTime when = ZonedDateTime.parse(trimmed, DateTimeFormatter.RFC_1123_DATE_TIME);
Duration delta = Duration.between(ZonedDateTime.now(when.getZone()), when);
return delta.isNegative() ? Duration.ZERO : delta;
} catch (DateTimeParseException ignored) {
return null;
}
}
}
105 changes: 105 additions & 0 deletions src/main/java/io/getstream/exceptions/StreamApiException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package io.getstream.exceptions;

import java.util.Collections;
import java.util.Map;
import org.jetbrains.annotations.Nullable;

/**
* Thrown when the Stream API returns a 4xx/5xx response. Carries the parsed {@code APIError}
* envelope per CHA-2958 §5.1.
*
* <p>Checked subclass of {@link StreamException}: existing {@code throws StreamException}
* declarations continue to compile.
*/
public class StreamApiException extends StreamException {
private static final long serialVersionUID = 1L;

private final int statusCode;
private final int code;
private final Map<String, String> exceptionFields;
private final boolean unrecoverable;
private final String rawResponseBody;
@Nullable private final String moreInfo;
@Nullable private final Object details;

public StreamApiException(
String message,
int statusCode,
int code,
@Nullable Map<String, String> exceptionFields,
boolean unrecoverable,
String rawResponseBody,
@Nullable String moreInfo,
@Nullable Object details,
@Nullable Throwable cause) {
super(
message,
cause,
buildResponseData(
statusCode,
code,
message,
exceptionFields,
unrecoverable,
rawResponseBody,
moreInfo,
details));
this.statusCode = statusCode;
this.code = code;
this.exceptionFields = exceptionFields != null ? exceptionFields : Collections.emptyMap();
this.unrecoverable = unrecoverable;
this.rawResponseBody = rawResponseBody != null ? rawResponseBody : "";
this.moreInfo = moreInfo;
this.details = details;
}

private static ResponseData buildResponseData(
int statusCode,
int code,
String message,
@Nullable Map<String, String> exceptionFields,
boolean unrecoverable,
String rawResponseBody,
@Nullable String moreInfo,
@Nullable Object details) {
ResponseData rd = new ResponseData();
rd.setStatusCode(statusCode);
rd.setCode(code);
rd.setMessage(message);
rd.setExceptionFields(exceptionFields);
rd.setUnrecoverable(unrecoverable);
rd.setMoreInfo(moreInfo);
rd.setDetails(details);
return rd;
}

public int getStatusCode() {
return statusCode;
}

public int getCode() {
return code;
}

public Map<String, String> getExceptionFields() {
return exceptionFields;
}

public boolean isUnrecoverable() {
return unrecoverable;
}

public String getRawResponseBody() {
return rawResponseBody;
}

@Nullable
public String getMoreInfo() {
return moreInfo;
}

@Nullable
public Object getDetails() {
return details;
}
}
123 changes: 102 additions & 21 deletions src/main/java/io/getstream/exceptions/StreamException.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ public StreamException(Throwable t) {
super(t);
}

// Allows subclasses (StreamApiException) to set both the cause and the back-compat responseData
// mirror in a single super() call. Package-private: only the exceptions package builds these.
StreamException(String message, Throwable cause, ResponseData responseData) {
super(message, cause);
this.responseData = responseData;
}

/**
* Builds a StreamException to signal an issue
*
Expand All @@ -40,10 +47,15 @@ public static StreamException build(String issue) {
}

/**
* Builds a StreamException using the response body when Stream API request fails
* Builds a typed API exception from the Stream API error body.
*
* @param responseBody Stream API response body
* @return the StreamException
* <p>Per CHA-2958 §6.2: parseable {@code APIError} envelope → {@link StreamApiException};
* unparseable body but HTTP layer succeeded → {@code StreamApiException} with {@code code=0} and
* the raw body preserved (§6.3).
*
* <p>This overload does not see the status code; callers that have a {@link Response} should
* prefer {@link #build(Response)} so 429 is routed to {@link StreamRateLimitException} and {@code
* statusCode} is preserved.
*/
public static StreamException build(ResponseBody responseBody) {
ObjectMapper objectMapper = new ObjectMapper();
Expand All @@ -52,44 +64,91 @@ public static StreamException build(ResponseBody responseBody) {
String responseBodyString = responseBody.string();
try {
ResponseData responseData = objectMapper.readValue(responseBodyString, ResponseData.class);
return new StreamException(responseData.getMessage(), responseData);
int status = responseData.getStatusCode() != null ? responseData.getStatusCode() : 0;
return apiExceptionFromResponseData(responseData, responseBodyString, status, null);
} catch (JsonProcessingException e) {
return new StreamException(responseBodyString, e);
return new StreamApiException(
"failed to parse error response", 0, 0, null, false, responseBodyString, null, null, e);
}
} catch (IOException e) {
return new StreamException(e);
}
}

/**
* Builds a StreamException based on response from the server and http code
*
* @param httpResponse Stream API response
* @return the StreamException
* Builds a typed API exception from an HTTP response. Per CHA-2958 §6.2: 429 → {@link
* StreamRateLimitException} with {@code Retry-After} parsed per RFC 7231 §7.1.3 (integer seconds
* or HTTP-date). Other 4xx/5xx → {@link StreamApiException}.
*/
public static StreamException build(Response httpResponse) {
StreamException exception;
int status = httpResponse.code();
String bodyString = "";
ResponseData parsed = null;
Throwable parseCause = null;

ResponseBody errorBody = httpResponse.body();
if (errorBody != null) {
exception = StreamException.build(errorBody);
} else {
exception =
StreamException.build(
String.format("Unexpected server response code %d", httpResponse.code()));
try {
bodyString = errorBody.string();
} catch (IOException e) {
// Body unreadable — treat as empty.
parseCause = e;
}
if (!bodyString.isEmpty()) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
try {
parsed = objectMapper.readValue(bodyString, ResponseData.class);
} catch (JsonProcessingException e) {
parseCause = e;
}
}
}

if (exception.responseData == null) {
ResponseData responseData = new ResponseData();
responseData.statusCode = httpResponse.code();
exception.responseData = responseData;
if (parsed == null) {
String msg =
parseCause != null
? "failed to parse error response"
: String.format("Unexpected server response code %d", status);
if (status == 429) {
return new StreamRateLimitException(
msg,
status,
0,
null,
false,
bodyString,
null,
null,
RetryAfterParser.parse(httpResponse.header("Retry-After")),
parseCause);
}
return new StreamApiException(
msg, status, 0, null, false, bodyString, null, null, parseCause);
}

return exception;
if (status == 429) {
return new StreamRateLimitException(
parsed.getMessage() != null ? parsed.getMessage() : "rate limited",
status,
parsed.getCode() != null ? parsed.getCode() : 0,
parsed.getExceptionFields(),
parsed.getUnrecoverable() != null ? parsed.getUnrecoverable() : false,
bodyString,
parsed.getMoreInfo(),
parsed.getDetails(),
RetryAfterParser.parse(httpResponse.header("Retry-After")),
null);
}
return apiExceptionFromResponseData(parsed, bodyString, status, null);
}

/**
* Builds a StreamException when an exception occurs calling the API
* Builds a StreamException when an exception occurs calling the API.
*
* <p>Historic factory preserved for back-compat. The HTTP-call path now classifies transport
* failures directly via {@link StreamTransportException#fromIOException(IOException)} so this
* factory no longer auto-routes; it simply wraps the cause.
*
* @param t the underlying exception
* @return the StreamException
Expand All @@ -98,6 +157,22 @@ public static StreamException build(Throwable t) {
return new StreamException(t);
}

// statusCode comes from the HTTP layer (§5.1: "Source: HTTP status"). The envelope's
// StatusCode is only used as a fallback by build(ResponseBody), which has no live response.
private static StreamApiException apiExceptionFromResponseData(
ResponseData rd, String rawBody, int statusCode, Throwable cause) {
return new StreamApiException(
rd.getMessage() != null ? rd.getMessage() : "",
statusCode,
rd.getCode() != null ? rd.getCode() : 0,
rd.getExceptionFields(),
rd.getUnrecoverable() != null ? rd.getUnrecoverable() : false,
rawBody != null ? rawBody : "",
rd.getMoreInfo(),
rd.getDetails(),
cause);
}

@Data
public static class ResponseData {
@JsonProperty("code")
Expand All @@ -117,5 +192,11 @@ public static class ResponseData {

@JsonProperty("more_info")
private String moreInfo;

@JsonProperty("details")
private Object details;

@JsonProperty("unrecoverable")
private Boolean unrecoverable;
}
}
Loading
Loading