diff --git a/conformance-tests/client-jdk-http-client/pom.xml b/conformance-tests/client-jdk-http-client/pom.xml index e09d565c5..3e5cce5a8 100644 --- a/conformance-tests/client-jdk-http-client/pom.xml +++ b/conformance-tests/client-jdk-http-client/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk conformance-tests - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT client-jdk-http-client jar @@ -28,7 +28,7 @@ io.modelcontextprotocol.sdk mcp - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT diff --git a/conformance-tests/client-spring-http-client/pom.xml b/conformance-tests/client-spring-http-client/pom.xml index cbf0d1970..a3bd41d08 100644 --- a/conformance-tests/client-spring-http-client/pom.xml +++ b/conformance-tests/client-spring-http-client/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk conformance-tests - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT client-spring-http-client jar diff --git a/conformance-tests/pom.xml b/conformance-tests/pom.xml index 9512ddd34..d9d090c3c 100644 --- a/conformance-tests/pom.xml +++ b/conformance-tests/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT conformance-tests pom diff --git a/conformance-tests/server-servlet/pom.xml b/conformance-tests/server-servlet/pom.xml index 17b38542b..7ca06bbd9 100644 --- a/conformance-tests/server-servlet/pom.xml +++ b/conformance-tests/server-servlet/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk conformance-tests - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT server-servlet jar @@ -28,7 +28,7 @@ io.modelcontextprotocol.sdk mcp - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT diff --git a/mcp-bom/pom.xml b/mcp-bom/pom.xml index 180dde0ef..32158fec0 100644 --- a/mcp-bom/pom.xml +++ b/mcp-bom/pom.xml @@ -7,7 +7,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT mcp-bom @@ -61,6 +61,13 @@ ${project.version} + + + io.modelcontextprotocol.sdk + mcp-resilience4j + ${project.version} + + diff --git a/mcp-core/pom.xml b/mcp-core/pom.xml index 4eabb8ec2..5a3c66d61 100644 --- a/mcp-core/pom.xml +++ b/mcp-core/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT mcp-core jar diff --git a/mcp-json-jackson2/pom.xml b/mcp-json-jackson2/pom.xml index 4ecbf98b4..89294e3c8 100644 --- a/mcp-json-jackson2/pom.xml +++ b/mcp-json-jackson2/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT mcp-json-jackson2 jar @@ -74,7 +74,7 @@ io.modelcontextprotocol.sdk mcp-core - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT com.networknt diff --git a/mcp-json-jackson3/pom.xml b/mcp-json-jackson3/pom.xml index 4f4c9ad1f..dae101cf7 100644 --- a/mcp-json-jackson3/pom.xml +++ b/mcp-json-jackson3/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT mcp-json-jackson3 jar @@ -68,7 +68,7 @@ io.modelcontextprotocol.sdk mcp-core - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT tools.jackson.core diff --git a/mcp-resilience4j/README.md b/mcp-resilience4j/README.md new file mode 100644 index 000000000..81e48d2a1 --- /dev/null +++ b/mcp-resilience4j/README.md @@ -0,0 +1,239 @@ +# mcp-resilience4j + +Resilience4j integration for the Java MCP SDK. Wraps any `McpClientTransport` with configurable circuit breaking, retry, rate limiting, time limiting, and bulkhead policies to make MCP tool calls resilient to transient failures, slow servers, and traffic spikes. + +## Overview + +MCP tool calls cross a network. Without resilience: + +- A slow server blocks a thread indefinitely +- A flaky server causes cascading failures upstream +- A burst of parallel agent calls can overwhelm a rate-limited endpoint +- One failing server keeps being called even though it cannot recover by itself + +`mcp-resilience4j` addresses all of these at the **transport level** — the single integration point exposed by the MCP SDK and frameworks like Google ADK. Because one transport wraps one MCP server connection, the policies are effectively per-server and composable across multiple clients. + +## Maven Dependency + +```xml + + io.modelcontextprotocol.sdk + mcp-resilience4j + 2.1.1-SNAPSHOT + +``` + +Or via the BOM: + +```xml + + + + io.modelcontextprotocol.sdk + mcp-bom + 2.1.1-SNAPSHOT + pom + import + + + +``` + +## Quick Start + +### High-level facade: `McpResilienceConfig` + +```java +McpResilienceConfig config = McpResilienceConfig.builder() + .transportCircuitBreaker(CircuitBreakerConfig.custom() + .slidingWindowSize(10) + .failureRateThreshold(50) + .waitDurationInOpenState(Duration.ofSeconds(30)) + .build()) + .transportRetry(RetryConfig.custom() + .maxAttempts(3) + .waitDuration(Duration.ofMillis(500)) + .build()) + .transportTimeLimiter(TimeLimiterConfig.custom() + .timeoutDuration(Duration.ofSeconds(8)) + .build()) + .build(); + +McpClientTransport resilientTransport = config.wrapTransport(rawTransport); +McpAsyncClient client = McpClient.async(resilientTransport).build(); +``` + +### Direct builder: `ResilientMcpClientTransport` + +For full control over all five policies: + +```java +McpClientTransport resilientTransport = ResilientMcpClientTransport.builder(rawTransport) + .circuitBreakerConfig(CircuitBreakerConfig.custom() + .slidingWindowSize(10) + .failureRateThreshold(50) + .waitDurationInOpenState(Duration.ofSeconds(30)) + .build()) + .retryConfig(RetryConfig.custom() + .maxAttempts(3) + .waitDuration(Duration.ofMillis(500)) + .build()) + .timeLimiterConfig(TimeLimiterConfig.custom() + .timeoutDuration(Duration.ofSeconds(8)) + .build()) + .rateLimiterConfig(RateLimiterConfig.custom() + .limitForPeriod(20) + .limitRefreshPeriod(Duration.ofSeconds(1)) + .build()) + .bulkheadConfig(BulkheadConfig.custom() + .maxConcurrentCalls(10) + .build()) + .build(); +``` + +Policies are optional — configure only what you need. + +## Policy Reference + +| Policy | Guards against | Exception thrown | Applied on | +|---|---|---|---| +| **CircuitBreaker** | Persistent server failures | `CallNotPermittedException` | `sendMessage`, `connect` | +| **Retry** | Transient failures | Last exception / `MaxRetriesExceededException` | `sendMessage`, `connect` | +| **TimeLimiter** | Slow servers exceeding deadline | `TimeoutException` | `sendMessage` only | +| **RateLimiter** | Request rate exceeding a threshold | `RequestNotPermitted` | `sendMessage` only | +| **Bulkhead** | Too many concurrent in-flight requests | `BulkheadFullException` | `sendMessage` only | + +`connect()` uses only CircuitBreaker and Retry — session establishment is not throttled or timed out, as it can legitimately take longer than a single request. + +## Policy Ordering + +Policies are applied in the following order (outermost to innermost): + +``` +Retry → CircuitBreaker → RateLimiter → TimeLimiter → Bulkhead → MCP Server +``` + +This is the standard Resilience4j recommended hierarchy. Each position is deliberate: + +- **Retry is outermost** so it orchestrates the entire inner chain per attempt. If Retry were inside CircuitBreaker, the CB would only see the outcome of the entire retry loop — delaying its failure detection by `maxAttempts × timeout`. +- **Bulkhead is innermost** so concurrency slots are held only during actual execution, not during Retry's backoff sleep. If Bulkhead were outermost, a failing request would clog a slot for the full backoff duration, blocking healthy concurrent callers. +- **RateLimiter is inside Retry** so each retry attempt consumes a rate token. This ensures your local token count matches the actual number of requests the server receives. +- **TimeLimiter is per-attempt** so each retry gets a fresh timeout window rather than sharing one budget across all attempts. + +## Using Shared Registries + +Register instances by name to observe policy state via Micrometer or the Resilience4j admin endpoints: + +```java +CircuitBreakerRegistry registry = CircuitBreakerRegistry.ofDefaults(); + +// Each transport gets a unique name — essential when sharing a registry +McpClientTransport weatherTransport = ResilientMcpClientTransport.builder(rawWeatherTransport) + .circuitBreakerName("mcp-weather") + .circuitBreakerRegistry(registry) + .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults()) + .build(); + +McpClientTransport searchTransport = ResilientMcpClientTransport.builder(rawSearchTransport) + .circuitBreakerName("mcp-search") + .circuitBreakerRegistry(registry) + .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults()) + .build(); +``` + +> **Note:** Resilience4j registries cache instances by name. If you supply a registry and a config but the name already exists in the registry, the existing instance is returned and your config is **silently ignored** by the registry. `ResilientMcpClientTransport` logs a `WARN` when this happens. Always use unique names when creating multiple transports from a shared registry. + +## Observability + +Circuit breaker state transitions and retry events are logged automatically at construction time — no metrics system required. + +| Event | Level | Example | +|---|---|---| +| Circuit breaker state change | `INFO` | `MCP circuit breaker 'mcp-weather': CLOSED -> OPEN` | +| Call rejected (circuit open) | `WARN` | `MCP circuit breaker 'mcp-weather' is OPEN, call rejected` | +| Retry attempt | `DEBUG` | `MCP retry 'mcp-weather': attempt #2` | +| Retry exhausted | `WARN` | `MCP retry 'mcp-weather' exhausted after 3 attempt(s)` | + +For Micrometer-based metrics (Prometheus, Datadog, etc.), add `resilience4j-micrometer` to your dependencies and bind the registry to your `MeterRegistry`: + +```java +TaggedCircuitBreakerMetrics.ofCircuitBreakerRegistry(registry) + .bindTo(meterRegistry); +``` + +## Integration with Google ADK + +When using [Google ADK](https://google.github.io/adk-docs/), the `McpTransportBuilder` interface is the only injection point for custom transport behaviour. Implement it to wrap the raw transport transparently: + +```java +public class ResilientMcpTransportBuilder implements McpTransportBuilder { + + private final McpTransportBuilder delegate; + private final CircuitBreakerRegistry cbRegistry; + + @Override + public McpClientTransport build(Object serverParameters) { + McpClientTransport raw = delegate.build(serverParameters); + return ResilientMcpClientTransport.builder(raw) + .circuitBreakerName("mcp-" + endpointName) + .circuitBreakerRegistry(cbRegistry) + .circuitBreakerConfig(CircuitBreakerConfig.custom() + .slidingWindowSize(10) + .failureRateThreshold(50) + .waitDurationInOpenState(Duration.ofSeconds(30)) + .build()) + .retryConfig(RetryConfig.custom() + .maxAttempts(3) + .waitDuration(Duration.ofMillis(500)) + .build()) + .timeLimiterConfig(TimeLimiterConfig.custom() + .timeoutDuration(Duration.ofSeconds(8)) + .build()) + .build(); + } +} +``` + +Pass `ResilientMcpTransportBuilder` wherever ADK expects an `McpTransportBuilder`. `McpSessionManager` calls `build()` lazily on first use, so each server connection gets its own named policy instance. + +## Sample Request Flow + +A normal `sendMessage()` call with all five policies configured: + +``` +caller + └─ Retry (attempt 1) + └─ CircuitBreaker [CLOSED — passes through, records attempt] + └─ RateLimiter [token available — acquires, passes through] + └─ TimeLimiter [8s countdown starts] + └─ Bulkhead [slot available — acquires] + └─ MCP Server ──► response in 200ms + Bulkhead slot released + TimeLimiter cancelled + CircuitBreaker records SUCCESS + Retry: no failure, done +caller receives response +``` + +On a transient failure with retry: + +``` +Retry (attempt 1) → server fails → CB records failure #1 +Retry waits 500ms (Bulkhead slot already released) +Retry (attempt 2) → server succeeds → CB records success #1 +caller receives response +``` + +On persistent failures, the CircuitBreaker opens after the sliding window fills with failures and subsequent calls are rejected immediately without a network round-trip. + +## Building from Source + +```bash +./mvnw clean install -pl mcp-resilience4j -am -DskipTests +``` + +To run the module's tests: + +```bash +./mvnw test -pl mcp-resilience4j +``` diff --git a/mcp-resilience4j/pom.xml b/mcp-resilience4j/pom.xml new file mode 100644 index 000000000..889c3d469 --- /dev/null +++ b/mcp-resilience4j/pom.xml @@ -0,0 +1,193 @@ + + + 4.0.0 + + io.modelcontextprotocol.sdk + mcp-parent + 2.1.1-SNAPSHOT + + mcp-resilience4j + jar + Java MCP SDK Resilience4j Integration + Resilience4j integration for the Java MCP SDK providing circuit breaker, retry, and time limiter support for MCP transport + https://github.com/modelcontextprotocol/java-sdk + + + https://github.com/modelcontextprotocol/java-sdk + scm:git:git://github.com/modelcontextprotocol/java-sdk.git + scm:git:ssh://git@github.com/modelcontextprotocol/java-sdk.git + + + + 2.2.0 + + + + + io.modelcontextprotocol.sdk + mcp-core + 2.1.1-SNAPSHOT + provided + + + + io.projectreactor + reactor-core + + + + org.slf4j + slf4j-api + ${slf4j-api.version} + + + + + io.github.resilience4j + resilience4j-circuitbreaker + ${resilience4j.version} + + + + io.github.resilience4j + resilience4j-retry + ${resilience4j.version} + + + + io.github.resilience4j + resilience4j-bulkhead + ${resilience4j.version} + + + + io.github.resilience4j + resilience4j-ratelimiter + ${resilience4j.version} + + + + io.github.resilience4j + resilience4j-timelimiter + ${resilience4j.version} + + + + + io.github.resilience4j + resilience4j-reactor + ${resilience4j.version} + + + + + io.github.resilience4j + resilience4j-micrometer + ${resilience4j.version} + true + + + + io.micrometer + micrometer-core + 1.13.0 + true + + + + + jakarta.servlet + jakarta.servlet-api + ${jakarta.servlet.version} + true + + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + + org.junit.jupiter + junit-jupiter-params + ${junit.version} + test + + + + org.mockito + mockito-core + ${mockito.version} + test + + + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + + + + org.assertj + assertj-core + ${assert4j.version} + test + + + + io.projectreactor + reactor-test + test + + + + ch.qos.logback + logback-classic + ${logback.version} + test + + + + net.bytebuddy + byte-buddy + ${byte-buddy.version} + test + + + + + + + jackson3 + + true + + + + io.modelcontextprotocol.sdk + mcp-json-jackson3 + 2.1.1-SNAPSHOT + test + + + + + jackson2 + + + io.modelcontextprotocol.sdk + mcp-json-jackson2 + 2.1.1-SNAPSHOT + test + + + + + + diff --git a/mcp-resilience4j/src/main/java/io/modelcontextprotocol/resilience/McpResilienceConfig.java b/mcp-resilience4j/src/main/java/io/modelcontextprotocol/resilience/McpResilienceConfig.java new file mode 100644 index 000000000..c26a7cbdf --- /dev/null +++ b/mcp-resilience4j/src/main/java/io/modelcontextprotocol/resilience/McpResilienceConfig.java @@ -0,0 +1,335 @@ +/* + * Copyright 2024-2026 the original author or authors. + */ + +package io.modelcontextprotocol.resilience; + +import io.github.resilience4j.bulkhead.BulkheadConfig; +import io.github.resilience4j.bulkhead.BulkheadRegistry; +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; +import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; +import io.github.resilience4j.ratelimiter.RateLimiterConfig; +import io.github.resilience4j.ratelimiter.RateLimiterRegistry; +import io.github.resilience4j.retry.RetryConfig; +import io.github.resilience4j.retry.RetryRegistry; +import io.github.resilience4j.timelimiter.TimeLimiterConfig; +import io.github.resilience4j.timelimiter.TimeLimiterRegistry; +import io.modelcontextprotocol.spec.McpClientTransport; + +/** + * High-level configuration facade that wires transport-level resilience for MCP clients. + * + *

+ * {@code McpResilienceConfig} provides a fluent DSL for configuring + * {@link ResilientMcpClientTransport} — applying circuit breaking, retry, and time + * limiting at the wire level to all outbound MCP messages. + * + *

+ * Usage example: + * + *

{@code
+ * McpResilienceConfig config = McpResilienceConfig.builder()
+ * 		.transportCircuitBreaker(CircuitBreakerConfig.custom()
+ * 				.slidingWindowSize(10)
+ * 				.failureRateThreshold(50)
+ * 				.waitDurationInOpenState(Duration.ofSeconds(30))
+ * 				.build())
+ * 		.transportRetry(RetryConfig.custom()
+ * 				.maxAttempts(3)
+ * 				.waitDuration(Duration.ofMillis(500))
+ * 				.build())
+ * 		.transportTimeLimiter(TimeLimiterConfig.custom()
+ * 				.timeoutDuration(Duration.ofSeconds(8))
+ * 				.build())
+ * 		.build();
+ *
+ * McpClientTransport resilientTransport = config.wrapTransport(rawTransport);
+ * McpAsyncClient client = McpClient.async(resilientTransport).build();
+ * }
+ * + * @author Pratyay Pandey + * @see ResilientMcpClientTransport + */ +public final class McpResilienceConfig { + + private final CircuitBreakerConfig transportCircuitBreakerConfig; + + private final String transportCircuitBreakerName; + + private final CircuitBreakerRegistry transportCircuitBreakerRegistry; + + private final RetryConfig transportRetryConfig; + + private final String transportRetryName; + + private final RetryRegistry transportRetryRegistry; + + private final TimeLimiterConfig transportTimeLimiterConfig; + + private final String transportTimeLimiterName; + + private final TimeLimiterRegistry transportTimeLimiterRegistry; + + private final RateLimiterConfig transportRateLimiterConfig; + + private final String transportRateLimiterName; + + private final RateLimiterRegistry transportRateLimiterRegistry; + + private final BulkheadConfig transportBulkheadConfig; + + private final String transportBulkheadName; + + private final BulkheadRegistry transportBulkheadRegistry; + + private McpResilienceConfig(Builder builder) { + this.transportCircuitBreakerConfig = builder.transportCircuitBreakerConfig; + this.transportCircuitBreakerName = builder.transportCircuitBreakerName; + this.transportCircuitBreakerRegistry = builder.transportCircuitBreakerRegistry; + this.transportRetryConfig = builder.transportRetryConfig; + this.transportRetryName = builder.transportRetryName; + this.transportRetryRegistry = builder.transportRetryRegistry; + this.transportTimeLimiterConfig = builder.transportTimeLimiterConfig; + this.transportTimeLimiterName = builder.transportTimeLimiterName; + this.transportTimeLimiterRegistry = builder.transportTimeLimiterRegistry; + this.transportRateLimiterConfig = builder.transportRateLimiterConfig; + this.transportRateLimiterName = builder.transportRateLimiterName; + this.transportRateLimiterRegistry = builder.transportRateLimiterRegistry; + this.transportBulkheadConfig = builder.transportBulkheadConfig; + this.transportBulkheadName = builder.transportBulkheadName; + this.transportBulkheadRegistry = builder.transportBulkheadRegistry; + } + + /** + * Wraps the given transport with the configured transport-level resilience policies. + * @param transport the raw transport to wrap + * @return a {@link ResilientMcpClientTransport} with circuit breaker, retry, and time + * limiter applied + */ + public McpClientTransport wrapTransport(McpClientTransport transport) { + ResilientMcpClientTransport.Builder builder = ResilientMcpClientTransport.builder(transport); + // Always forward the name whenever either config or registry is set, so that + // a registry-only caller's custom name is not silently dropped. + if (this.transportCircuitBreakerConfig != null || this.transportCircuitBreakerRegistry != null) { + builder.circuitBreakerName(this.transportCircuitBreakerName); + } + if (this.transportCircuitBreakerConfig != null) { + builder.circuitBreakerConfig(this.transportCircuitBreakerConfig); + } + if (this.transportCircuitBreakerRegistry != null) { + builder.circuitBreakerRegistry(this.transportCircuitBreakerRegistry); + } + if (this.transportRetryConfig != null || this.transportRetryRegistry != null) { + builder.retryName(this.transportRetryName); + } + if (this.transportRetryConfig != null) { + builder.retryConfig(this.transportRetryConfig); + } + if (this.transportRetryRegistry != null) { + builder.retryRegistry(this.transportRetryRegistry); + } + if (this.transportTimeLimiterConfig != null || this.transportTimeLimiterRegistry != null) { + builder.timeLimiterName(this.transportTimeLimiterName); + } + if (this.transportTimeLimiterConfig != null) { + builder.timeLimiterConfig(this.transportTimeLimiterConfig); + } + if (this.transportTimeLimiterRegistry != null) { + builder.timeLimiterRegistry(this.transportTimeLimiterRegistry); + } + if (this.transportRateLimiterConfig != null || this.transportRateLimiterRegistry != null) { + builder.rateLimiterName(this.transportRateLimiterName); + } + if (this.transportRateLimiterConfig != null) { + builder.rateLimiterConfig(this.transportRateLimiterConfig); + } + if (this.transportRateLimiterRegistry != null) { + builder.rateLimiterRegistry(this.transportRateLimiterRegistry); + } + if (this.transportBulkheadConfig != null || this.transportBulkheadRegistry != null) { + builder.bulkheadName(this.transportBulkheadName); + } + if (this.transportBulkheadConfig != null) { + builder.bulkheadConfig(this.transportBulkheadConfig); + } + if (this.transportBulkheadRegistry != null) { + builder.bulkheadRegistry(this.transportBulkheadRegistry); + } + return builder.build(); + } + + /** + * Creates a new builder for {@link McpResilienceConfig}. + * @return a new builder instance + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for {@link McpResilienceConfig}. + */ + public static final class Builder { + + private static final String DEFAULT_TRANSPORT_NAME = "mcp-transport"; + + private CircuitBreakerConfig transportCircuitBreakerConfig; + + private String transportCircuitBreakerName = DEFAULT_TRANSPORT_NAME; + + private CircuitBreakerRegistry transportCircuitBreakerRegistry; + + private RetryConfig transportRetryConfig; + + private String transportRetryName = DEFAULT_TRANSPORT_NAME; + + private RetryRegistry transportRetryRegistry; + + private TimeLimiterConfig transportTimeLimiterConfig; + + private String transportTimeLimiterName = DEFAULT_TRANSPORT_NAME; + + private TimeLimiterRegistry transportTimeLimiterRegistry; + + private RateLimiterConfig transportRateLimiterConfig; + + private String transportRateLimiterName = DEFAULT_TRANSPORT_NAME; + + private RateLimiterRegistry transportRateLimiterRegistry; + + private BulkheadConfig transportBulkheadConfig; + + private String transportBulkheadName = DEFAULT_TRANSPORT_NAME; + + private BulkheadRegistry transportBulkheadRegistry; + + private Builder() { + } + + public Builder transportCircuitBreaker(CircuitBreakerConfig config) { + if (config == null) { + throw new IllegalArgumentException("CircuitBreakerConfig must not be null"); + } + this.transportCircuitBreakerConfig = config; + return this; + } + + public Builder transportCircuitBreakerName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("Circuit breaker name must not be blank"); + } + this.transportCircuitBreakerName = name; + return this; + } + + public Builder transportCircuitBreakerRegistry(CircuitBreakerRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("CircuitBreakerRegistry must not be null"); + } + this.transportCircuitBreakerRegistry = registry; + return this; + } + + public Builder transportRetry(RetryConfig config) { + if (config == null) { + throw new IllegalArgumentException("RetryConfig must not be null"); + } + this.transportRetryConfig = config; + return this; + } + + public Builder transportRetryName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("Retry name must not be blank"); + } + this.transportRetryName = name; + return this; + } + + public Builder transportRetryRegistry(RetryRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("RetryRegistry must not be null"); + } + this.transportRetryRegistry = registry; + return this; + } + + public Builder transportTimeLimiter(TimeLimiterConfig config) { + if (config == null) { + throw new IllegalArgumentException("TimeLimiterConfig must not be null"); + } + this.transportTimeLimiterConfig = config; + return this; + } + + public Builder transportTimeLimiterName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("TimeLimiter name must not be blank"); + } + this.transportTimeLimiterName = name; + return this; + } + + public Builder transportTimeLimiterRegistry(TimeLimiterRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("TimeLimiterRegistry must not be null"); + } + this.transportTimeLimiterRegistry = registry; + return this; + } + + public Builder transportRateLimiter(RateLimiterConfig config) { + if (config == null) { + throw new IllegalArgumentException("RateLimiterConfig must not be null"); + } + this.transportRateLimiterConfig = config; + return this; + } + + public Builder transportRateLimiterName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("RateLimiter name must not be blank"); + } + this.transportRateLimiterName = name; + return this; + } + + public Builder transportRateLimiterRegistry(RateLimiterRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("RateLimiterRegistry must not be null"); + } + this.transportRateLimiterRegistry = registry; + return this; + } + + public Builder transportBulkhead(BulkheadConfig config) { + if (config == null) { + throw new IllegalArgumentException("BulkheadConfig must not be null"); + } + this.transportBulkheadConfig = config; + return this; + } + + public Builder transportBulkheadName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("Bulkhead name must not be blank"); + } + this.transportBulkheadName = name; + return this; + } + + public Builder transportBulkheadRegistry(BulkheadRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("BulkheadRegistry must not be null"); + } + this.transportBulkheadRegistry = registry; + return this; + } + + public McpResilienceConfig build() { + return new McpResilienceConfig(this); + } + + } + +} diff --git a/mcp-resilience4j/src/main/java/io/modelcontextprotocol/resilience/ResilientMcpClientTransport.java b/mcp-resilience4j/src/main/java/io/modelcontextprotocol/resilience/ResilientMcpClientTransport.java new file mode 100644 index 000000000..907def3a6 --- /dev/null +++ b/mcp-resilience4j/src/main/java/io/modelcontextprotocol/resilience/ResilientMcpClientTransport.java @@ -0,0 +1,607 @@ +/* + * Copyright 2024-2026 the original author or authors. + */ + +package io.modelcontextprotocol.resilience; + +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +import io.github.resilience4j.bulkhead.Bulkhead; +import io.github.resilience4j.bulkhead.BulkheadConfig; +import io.github.resilience4j.bulkhead.BulkheadRegistry; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; +import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; +import io.github.resilience4j.ratelimiter.RateLimiter; +import io.github.resilience4j.ratelimiter.RateLimiterConfig; +import io.github.resilience4j.ratelimiter.RateLimiterRegistry; +import io.github.resilience4j.reactor.bulkhead.operator.BulkheadOperator; +import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator; +import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator; +import io.github.resilience4j.reactor.retry.RetryOperator; +import io.github.resilience4j.reactor.timelimiter.TimeLimiterOperator; +import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.retry.RetryConfig; +import io.github.resilience4j.retry.RetryRegistry; +import io.github.resilience4j.timelimiter.TimeLimiter; +import io.github.resilience4j.timelimiter.TimeLimiterConfig; +import io.github.resilience4j.timelimiter.TimeLimiterRegistry; +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.spec.McpClientTransport; +import io.modelcontextprotocol.spec.McpSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +/** + * A decorator for {@link McpClientTransport} that applies Resilience4j policies to all + * outbound MCP messages and connection attempts. + * + *

+ * Wraps any {@link McpClientTransport} implementation and applies circuit breaking, retry + * with backoff, and time limiting at the wire level, before requests reach the server. + * This is the lowest-level resilience point in the MCP call graph: + * + *

+ * McpAsyncClient.callTool()
+ *   └─> McpClientSession.sendRequest()
+ *        └─> [ResilientMcpClientTransport] ← policies applied here
+ *             └─> HttpClientStreamableHttpTransport / StdioClientTransport
+ * 
+ * + *

+ * The standard policy hierarchy for {@code sendMessage} (outermost to innermost) is: + * Retry → CircuitBreaker → RateLimiter → TimeLimiter → Bulkhead. Bulkhead is innermost so + * a slot is only held during active execution, not during retry backoff sleeps. + * RateLimiter is inside Retry so every attempt — including retries — consumes a token, + * accurately reflecting the load the downstream service actually sees. + * + *

+ * Bulkhead and RateLimiter are intentionally not applied to {@code connect()} — session + * establishment should never be throttled. + * + *

+ * Usage example: + * + *

{@code
+ * McpClientTransport transport = ResilientMcpClientTransport.builder(rawTransport)
+ * 		.circuitBreakerConfig(CircuitBreakerConfig.custom()
+ * 				.slidingWindowSize(10)
+ * 				.failureRateThreshold(50)
+ * 				.waitDurationInOpenState(Duration.ofSeconds(30))
+ * 				.build())
+ * 		.retryConfig(RetryConfig.custom()
+ * 				.maxAttempts(3)
+ * 				.waitDuration(Duration.ofMillis(500))
+ * 				.build())
+ * 		.timeLimiterConfig(TimeLimiterConfig.custom()
+ * 				.timeoutDuration(Duration.ofSeconds(8))
+ * 				.build())
+ * 		.rateLimiterConfig(RateLimiterConfig.custom()
+ * 				.limitForPeriod(10)
+ * 				.limitRefreshPeriod(Duration.ofSeconds(1))
+ * 				.build())
+ * 		.bulkheadConfig(BulkheadConfig.custom()
+ * 				.maxConcurrentCalls(5)
+ * 				.build())
+ * 		.build();
+ *
+ * McpAsyncClient client = McpClient.async(transport).build();
+ * }
+ * + * @author Pratyay Pandey + * @see McpResilienceConfig + */ +public final class ResilientMcpClientTransport implements McpClientTransport { + + private static final Logger logger = LoggerFactory.getLogger(ResilientMcpClientTransport.class); + + private static final String DEFAULT_NAME = "mcp-transport"; + + private final McpClientTransport delegate; + + private final CircuitBreaker circuitBreaker; + + private final Retry retry; + + private final TimeLimiter timeLimiter; + + private final RateLimiter rateLimiter; + + private final Bulkhead bulkhead; + + private ResilientMcpClientTransport(McpClientTransport delegate, CircuitBreaker circuitBreaker, Retry retry, + TimeLimiter timeLimiter, RateLimiter rateLimiter, Bulkhead bulkhead) { + this.delegate = delegate; + this.circuitBreaker = circuitBreaker; + this.retry = retry; + this.timeLimiter = timeLimiter; + this.rateLimiter = rateLimiter; + this.bulkhead = bulkhead; + if (circuitBreaker != null) { + circuitBreaker.getEventPublisher() + .onStateTransition(e -> logger.info("MCP circuit breaker '{}': {} -> {}", circuitBreaker.getName(), + e.getStateTransition().getFromState(), e.getStateTransition().getToState())) + .onCallNotPermitted( + e -> logger.warn("MCP circuit breaker '{}' is OPEN, call rejected", circuitBreaker.getName())); + } + if (retry != null) { + retry.getEventPublisher() + .onRetry( + e -> logger.debug("MCP retry '{}': attempt #{}", retry.getName(), e.getNumberOfRetryAttempts())) + .onError(e -> logger.warn("MCP retry '{}' exhausted after {} attempt(s)", retry.getName(), + e.getNumberOfRetryAttempts())); + } + } + + /** + * Sends a message through the delegate transport, applying configured resilience + * policies. The standard hierarchy (outermost to innermost) is: Retry → + * CircuitBreaker → RateLimiter → TimeLimiter → Bulkhead. + * + *

+ * Bulkhead is innermost so a slot is held only during active execution — during a + * retry backoff sleep the slot is released back to the pool. RateLimiter is inside + * Retry so every retry attempt consumes a token, accurately reflecting the request + * rate the downstream service actually sees. + * @param message the JSON-RPC message to send + * @return a {@link Mono} that completes when the message is sent or fails after + * exhausting resilience policies + */ + @Override + public Mono sendMessage(McpSchema.JSONRPCMessage message) { + // Mono.defer ensures delegate.sendMessage() is called on each retry attempt, + // not just once at assembly time. + // Operators are applied innermost-first: each transformDeferred wraps the + // previous, + // so the last one applied becomes the outermost subscriber. + Mono op = Mono.defer(() -> this.delegate.sendMessage(message)); + if (this.bulkhead != null) { + op = op.transformDeferred(BulkheadOperator.of(this.bulkhead)); + } + if (this.timeLimiter != null) { + op = op.transformDeferred(TimeLimiterOperator.of(this.timeLimiter)); + } + if (this.rateLimiter != null) { + op = op.transformDeferred(RateLimiterOperator.of(this.rateLimiter)); + } + if (this.circuitBreaker != null) { + op = op.transformDeferred(CircuitBreakerOperator.of(this.circuitBreaker)); + } + if (this.retry != null) { + op = op.transformDeferred(RetryOperator.of(this.retry)); + } + return op; + } + + /** + * Connects to the MCP server, applying circuit breaker and retry policies to the + * connection attempt. The time limiter is not applied here as connection setup may + * legitimately take longer than a single request timeout. + * @param handler the incoming message handler + * @return a {@link Mono} that completes when the transport is ready + */ + @Override + public Mono connect(Function, Mono> handler) { + Mono connectOp = Mono.defer(() -> this.delegate.connect(handler)); + if (this.circuitBreaker != null) { + connectOp = connectOp.transformDeferred(CircuitBreakerOperator.of(this.circuitBreaker)); + } + if (this.retry != null) { + connectOp = connectOp.transformDeferred(RetryOperator.of(this.retry)); + } + return connectOp; + } + + @Override + public void setExceptionHandler(Consumer handler) { + this.delegate.setExceptionHandler(handler); + } + + @Override + public Mono closeGracefully() { + return this.delegate.closeGracefully(); + } + + @Override + public T unmarshalFrom(Object data, TypeRef typeRef) { + return this.delegate.unmarshalFrom(data, typeRef); + } + + @Override + public List protocolVersions() { + return this.delegate.protocolVersions(); + } + + /** + * Returns the underlying {@link CircuitBreaker} for monitoring or metric binding. + * @return the circuit breaker, or {@code null} if not configured + */ + public CircuitBreaker getCircuitBreaker() { + return this.circuitBreaker; + } + + /** + * Returns the underlying {@link Retry} policy. + * @return the retry policy, or {@code null} if not configured + */ + public Retry getRetry() { + return this.retry; + } + + /** + * Returns the underlying {@link TimeLimiter} policy. + * @return the time limiter, or {@code null} if not configured + */ + public TimeLimiter getTimeLimiter() { + return this.timeLimiter; + } + + /** + * Returns the underlying {@link RateLimiter} policy. + * @return the rate limiter, or {@code null} if not configured + */ + public RateLimiter getRateLimiter() { + return this.rateLimiter; + } + + /** + * Returns the underlying {@link Bulkhead} policy. + * @return the bulkhead, or {@code null} if not configured + */ + public Bulkhead getBulkhead() { + return this.bulkhead; + } + + /** + * Creates a new builder for {@link ResilientMcpClientTransport}. + * @param delegate the transport to wrap + * @return a new builder instance + */ + public static Builder builder(McpClientTransport delegate) { + return new Builder(delegate); + } + + /** + * Builder for {@link ResilientMcpClientTransport}. + */ + public static final class Builder { + + private final McpClientTransport delegate; + + private CircuitBreakerConfig circuitBreakerConfig; + + private String circuitBreakerName = DEFAULT_NAME; + + private CircuitBreakerRegistry circuitBreakerRegistry; + + private RetryConfig retryConfig; + + private String retryName = DEFAULT_NAME; + + private RetryRegistry retryRegistry; + + private TimeLimiterConfig timeLimiterConfig; + + private String timeLimiterName = DEFAULT_NAME; + + private TimeLimiterRegistry timeLimiterRegistry; + + private RateLimiterConfig rateLimiterConfig; + + private String rateLimiterName = DEFAULT_NAME; + + private RateLimiterRegistry rateLimiterRegistry; + + private BulkheadConfig bulkheadConfig; + + private String bulkheadName = DEFAULT_NAME; + + private BulkheadRegistry bulkheadRegistry; + + private Builder(McpClientTransport delegate) { + if (delegate == null) { + throw new IllegalArgumentException("Delegate transport must not be null"); + } + this.delegate = delegate; + } + + /** + * Configures a circuit breaker with the given configuration. A circuit breaker + * registry will be created internally using this configuration. + * @param config the circuit breaker configuration + * @return this builder + */ + public Builder circuitBreakerConfig(CircuitBreakerConfig config) { + if (config == null) { + throw new IllegalArgumentException("CircuitBreakerConfig must not be null"); + } + this.circuitBreakerConfig = config; + return this; + } + + /** + * Sets the name used to register the circuit breaker in its registry. Useful when + * exposing the registry to a Hystrix-compatible metrics stream. + * @param name the circuit breaker name + * @return this builder + */ + public Builder circuitBreakerName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("Circuit breaker name must not be blank"); + } + this.circuitBreakerName = name; + return this; + } + + /** + * Provides an existing {@link CircuitBreakerRegistry} from which the circuit + * breaker will be retrieved. When provided, the registry-level default config is + * used unless {@link #circuitBreakerConfig} is also set. + * @param registry the circuit breaker registry + * @return this builder + */ + public Builder circuitBreakerRegistry(CircuitBreakerRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("CircuitBreakerRegistry must not be null"); + } + this.circuitBreakerRegistry = registry; + return this; + } + + /** + * Configures a retry policy. + * @param config the retry configuration + * @return this builder + */ + public Builder retryConfig(RetryConfig config) { + if (config == null) { + throw new IllegalArgumentException("RetryConfig must not be null"); + } + this.retryConfig = config; + return this; + } + + /** + * Sets the name used to register the retry policy in its registry. + * @param name the retry name + * @return this builder + */ + public Builder retryName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("Retry name must not be blank"); + } + this.retryName = name; + return this; + } + + /** + * Provides an existing {@link RetryRegistry}. + * @param registry the retry registry + * @return this builder + */ + public Builder retryRegistry(RetryRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("RetryRegistry must not be null"); + } + this.retryRegistry = registry; + return this; + } + + /** + * Configures a time limiter applied to each individual {@code sendMessage} call. + * @param config the time limiter configuration + * @return this builder + */ + public Builder timeLimiterConfig(TimeLimiterConfig config) { + if (config == null) { + throw new IllegalArgumentException("TimeLimiterConfig must not be null"); + } + this.timeLimiterConfig = config; + return this; + } + + /** + * Sets the name for the time limiter in its registry. + * @param name the time limiter name + * @return this builder + */ + public Builder timeLimiterName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("TimeLimiter name must not be blank"); + } + this.timeLimiterName = name; + return this; + } + + /** + * Provides an existing {@link TimeLimiterRegistry}. + * @param registry the time limiter registry + * @return this builder + */ + public Builder timeLimiterRegistry(TimeLimiterRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("TimeLimiterRegistry must not be null"); + } + this.timeLimiterRegistry = registry; + return this; + } + + /** + * Configures a rate limiter applied to each {@code sendMessage} call. + * @param config the rate limiter configuration + * @return this builder + */ + public Builder rateLimiterConfig(RateLimiterConfig config) { + if (config == null) { + throw new IllegalArgumentException("RateLimiterConfig must not be null"); + } + this.rateLimiterConfig = config; + return this; + } + + /** + * Sets the name for the rate limiter in its registry. + * @param name the rate limiter name + * @return this builder + */ + public Builder rateLimiterName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("RateLimiter name must not be blank"); + } + this.rateLimiterName = name; + return this; + } + + /** + * Provides an existing {@link RateLimiterRegistry}. + * @param registry the rate limiter registry + * @return this builder + */ + public Builder rateLimiterRegistry(RateLimiterRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("RateLimiterRegistry must not be null"); + } + this.rateLimiterRegistry = registry; + return this; + } + + /** + * Configures a bulkhead limiting concurrent in-flight {@code sendMessage} calls. + * @param config the bulkhead configuration + * @return this builder + */ + public Builder bulkheadConfig(BulkheadConfig config) { + if (config == null) { + throw new IllegalArgumentException("BulkheadConfig must not be null"); + } + this.bulkheadConfig = config; + return this; + } + + /** + * Sets the name for the bulkhead in its registry. + * @param name the bulkhead name + * @return this builder + */ + public Builder bulkheadName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("Bulkhead name must not be blank"); + } + this.bulkheadName = name; + return this; + } + + /** + * Provides an existing {@link BulkheadRegistry}. + * @param registry the bulkhead registry + * @return this builder + */ + public Builder bulkheadRegistry(BulkheadRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("BulkheadRegistry must not be null"); + } + this.bulkheadRegistry = registry; + return this; + } + + /** + * Builds the {@link ResilientMcpClientTransport}. + * @return a new instance wrapping the delegate with the configured policies + */ + public ResilientMcpClientTransport build() { + CircuitBreaker cb = resolveCircuitBreaker(); + Retry retry = resolveRetry(); + TimeLimiter timeLimiter = resolveTimeLimiter(); + RateLimiter rateLimiter = resolveRateLimiter(); + Bulkhead bulkhead = resolveBulkhead(); + return new ResilientMcpClientTransport(this.delegate, cb, retry, timeLimiter, rateLimiter, bulkhead); + } + + private CircuitBreaker resolveCircuitBreaker() { + if (this.circuitBreakerConfig == null && this.circuitBreakerRegistry == null) { + return null; + } + CircuitBreakerRegistry registry = (this.circuitBreakerRegistry != null) ? this.circuitBreakerRegistry + : CircuitBreakerRegistry.of(this.circuitBreakerConfig); + if (this.circuitBreakerConfig != null && this.circuitBreakerRegistry != null + && registry.find(this.circuitBreakerName).isPresent()) { + logger.warn( + "Circuit breaker '{}' already exists in the provided registry; the supplied config is ignored. Use a unique name to register a distinct instance.", + this.circuitBreakerName); + } + CircuitBreakerConfig config = (this.circuitBreakerConfig != null) ? this.circuitBreakerConfig + : CircuitBreakerConfig.ofDefaults(); + return registry.circuitBreaker(this.circuitBreakerName, config); + } + + private Retry resolveRetry() { + if (this.retryConfig == null && this.retryRegistry == null) { + return null; + } + RetryRegistry registry = (this.retryRegistry != null) ? this.retryRegistry + : RetryRegistry.of(this.retryConfig); + if (this.retryConfig != null && this.retryRegistry != null && registry.find(this.retryName).isPresent()) { + logger.warn( + "Retry '{}' already exists in the provided registry; the supplied config is ignored. Use a unique name to register a distinct instance.", + this.retryName); + } + RetryConfig config = (this.retryConfig != null) ? this.retryConfig : RetryConfig.ofDefaults(); + return registry.retry(this.retryName, config); + } + + private TimeLimiter resolveTimeLimiter() { + if (this.timeLimiterConfig == null && this.timeLimiterRegistry == null) { + return null; + } + TimeLimiterRegistry registry = (this.timeLimiterRegistry != null) ? this.timeLimiterRegistry + : TimeLimiterRegistry.of(this.timeLimiterConfig); + if (this.timeLimiterConfig != null && this.timeLimiterRegistry != null + && registry.find(this.timeLimiterName).isPresent()) { + logger.warn( + "TimeLimiter '{}' already exists in the provided registry; the supplied config is ignored. Use a unique name to register a distinct instance.", + this.timeLimiterName); + } + TimeLimiterConfig config = (this.timeLimiterConfig != null) ? this.timeLimiterConfig + : TimeLimiterConfig.ofDefaults(); + return registry.timeLimiter(this.timeLimiterName, config); + } + + private RateLimiter resolveRateLimiter() { + if (this.rateLimiterConfig == null && this.rateLimiterRegistry == null) { + return null; + } + RateLimiterRegistry registry = (this.rateLimiterRegistry != null) ? this.rateLimiterRegistry + : RateLimiterRegistry.of(this.rateLimiterConfig); + if (this.rateLimiterConfig != null && this.rateLimiterRegistry != null + && registry.find(this.rateLimiterName).isPresent()) { + logger.warn( + "RateLimiter '{}' already exists in the provided registry; the supplied config is ignored. Use a unique name to register a distinct instance.", + this.rateLimiterName); + } + RateLimiterConfig config = (this.rateLimiterConfig != null) ? this.rateLimiterConfig + : RateLimiterConfig.ofDefaults(); + return registry.rateLimiter(this.rateLimiterName, config); + } + + private Bulkhead resolveBulkhead() { + if (this.bulkheadConfig == null && this.bulkheadRegistry == null) { + return null; + } + BulkheadRegistry registry = (this.bulkheadRegistry != null) ? this.bulkheadRegistry + : BulkheadRegistry.of(this.bulkheadConfig); + if (this.bulkheadConfig != null && this.bulkheadRegistry != null + && registry.find(this.bulkheadName).isPresent()) { + logger.warn( + "Bulkhead '{}' already exists in the provided registry; the supplied config is ignored. Use a unique name to register a distinct instance.", + this.bulkheadName); + } + BulkheadConfig config = (this.bulkheadConfig != null) ? this.bulkheadConfig : BulkheadConfig.ofDefaults(); + return registry.bulkhead(this.bulkheadName, config); + } + + } + +} diff --git a/mcp-resilience4j/src/test/java/io/modelcontextprotocol/resilience/ResilientMcpClientTransportTests.java b/mcp-resilience4j/src/test/java/io/modelcontextprotocol/resilience/ResilientMcpClientTransportTests.java new file mode 100644 index 000000000..f21a5d7b8 --- /dev/null +++ b/mcp-resilience4j/src/test/java/io/modelcontextprotocol/resilience/ResilientMcpClientTransportTests.java @@ -0,0 +1,213 @@ +/* + * Copyright 2024-2026 the original author or authors. + */ + +package io.modelcontextprotocol.resilience; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; + +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; +import io.github.resilience4j.retry.RetryConfig; +import io.github.resilience4j.timelimiter.TimeLimiterConfig; +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.spec.McpClientTransport; +import io.modelcontextprotocol.spec.McpSchema; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link ResilientMcpClientTransport}. + */ +@ExtendWith(MockitoExtension.class) +class ResilientMcpClientTransportTests { + + @Mock + private McpClientTransport delegateTransport; + + private McpSchema.JSONRPCMessage testMessage; + + @BeforeEach + void setUp() { + this.testMessage = new McpSchema.JSONRPCRequest("tools/call", "1", null); + } + + @Test + void builderRequiresNonNullDelegate() { + assertThatThrownBy(() -> ResilientMcpClientTransport.builder(null).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Delegate transport must not be null"); + } + + @Test + void sendMessageDelegatesToUnderlyingTransportWhenNoPolicies() { + when(this.delegateTransport.sendMessage(any())).thenReturn(Mono.empty()); + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport).build(); + + StepVerifier.create(transport.sendMessage(this.testMessage)).verifyComplete(); + + verify(this.delegateTransport).sendMessage(this.testMessage); + } + + @Test + void sendMessageAppliesRetryOnTransientFailure() { + AtomicInteger callCount = new AtomicInteger(0); + when(this.delegateTransport.sendMessage(any())).thenAnswer(inv -> { + int count = callCount.incrementAndGet(); + if (count < 3) { + return Mono.error(new RuntimeException("transient error")); + } + return Mono.empty(); + }); + + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport) + .retryConfig(RetryConfig.custom().maxAttempts(3).waitDuration(Duration.ZERO).build()) + .build(); + + StepVerifier.create(transport.sendMessage(this.testMessage)).verifyComplete(); + + assertThat(callCount.get()).isEqualTo(3); + } + + @Test + void sendMessageFailsAfterMaxRetryAttempts() { + when(this.delegateTransport.sendMessage(any())) + .thenReturn(Mono.error(new RuntimeException("persistent error"))); + + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport) + .retryConfig(RetryConfig.custom().maxAttempts(2).waitDuration(Duration.ZERO).build()) + .build(); + + StepVerifier.create(transport.sendMessage(this.testMessage)).verifyError(RuntimeException.class); + + verify(this.delegateTransport, times(2)).sendMessage(this.testMessage); + } + + @Test + void sendMessageOpensCircuitBreakerAfterThresholdExceeded() { + when(this.delegateTransport.sendMessage(any())).thenReturn(Mono.error(new RuntimeException("server error"))); + + CircuitBreakerConfig cbConfig = CircuitBreakerConfig.custom() + .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) + .slidingWindowSize(4) + .minimumNumberOfCalls(4) + .failureRateThreshold(100) + .build(); + + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport) + .circuitBreakerConfig(cbConfig) + .build(); + + // Force the circuit breaker open by exhausting the sliding window + for (int i = 0; i < 4; i++) { + StepVerifier.create(transport.sendMessage(this.testMessage)).verifyError(RuntimeException.class); + } + + // Circuit should now be OPEN — next call must short-circuit + assertThat(transport.getCircuitBreaker().getState()).isEqualTo(CircuitBreaker.State.OPEN); + + StepVerifier.create(transport.sendMessage(this.testMessage)).verifyError(CallNotPermittedException.class); + } + + @Test + void sendMessageTimeLimiterCancelsSlowOperations() { + // thenAnswer defers Mono.delay creation to call time so it is constructed inside + // the withVirtualTime supplier and uses the virtual scheduler, not real time. + when(this.delegateTransport.sendMessage(any())).thenAnswer(inv -> Mono.delay(Duration.ofSeconds(10)).then()); + + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport) + .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofMillis(100)).build()) + .build(); + + StepVerifier.withVirtualTime(() -> transport.sendMessage(this.testMessage)) + .thenAwait(Duration.ofSeconds(1)) + .verifyError(java.util.concurrent.TimeoutException.class); + } + + @Test + void connectDelegatesToUnderlyingTransport() { + when(this.delegateTransport.connect(any())).thenReturn(Mono.empty()); + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport).build(); + Function, Mono> handler = m -> m; + + StepVerifier.create(transport.connect(handler)).verifyComplete(); + + verify(this.delegateTransport).connect(handler); + } + + @Test + void closeGracefullyDelegatesToUnderlyingTransport() { + when(this.delegateTransport.closeGracefully()).thenReturn(Mono.empty()); + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport).build(); + + StepVerifier.create(transport.closeGracefully()).verifyComplete(); + + verify(this.delegateTransport).closeGracefully(); + } + + @Test + void setExceptionHandlerDelegatesToUnderlyingTransport() { + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport).build(); + Consumer handler = ex -> { + }; + + transport.setExceptionHandler(handler); + + verify(this.delegateTransport).setExceptionHandler(handler); + } + + @Test + void unmarshalFromDelegatesToUnderlyingTransport() { + TypeRef typeRef = new TypeRef<>() { + }; + when(this.delegateTransport.unmarshalFrom(any(), any())).thenReturn("result"); + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport).build(); + + String result = transport.unmarshalFrom("data", typeRef); + + assertThat(result).isEqualTo("result"); + verify(this.delegateTransport).unmarshalFrom("data", typeRef); + } + + @Test + void protocolVersionsDelegatesToUnderlyingTransport() { + when(this.delegateTransport.protocolVersions()).thenReturn(List.of("2025-03-26")); + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport).build(); + + List versions = transport.protocolVersions(); + + assertThat(versions).containsExactly("2025-03-26"); + } + + @Test + void getCircuitBreakerReturnsNullWhenNotConfigured() { + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport).build(); + assertThat(transport.getCircuitBreaker()).isNull(); + } + + @Test + void getCircuitBreakerReturnsInstanceWhenConfigured() { + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport) + .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults()) + .build(); + assertThat(transport.getCircuitBreaker()).isNotNull(); + } + +} diff --git a/mcp-test/pom.xml b/mcp-test/pom.xml index 40cf42d36..4ef5043bc 100644 --- a/mcp-test/pom.xml +++ b/mcp-test/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT mcp-test jar @@ -24,7 +24,7 @@ io.modelcontextprotocol.sdk mcp-core - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT @@ -159,7 +159,7 @@ io.modelcontextprotocol.sdk mcp-json-jackson3 - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT test @@ -170,7 +170,7 @@ io.modelcontextprotocol.sdk mcp-json-jackson2 - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT test diff --git a/mcp/pom.xml b/mcp/pom.xml index 8749bb0d2..63b5b9f93 100644 --- a/mcp/pom.xml +++ b/mcp/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT mcp jar @@ -25,13 +25,13 @@ io.modelcontextprotocol.sdk mcp-json-jackson3 - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT io.modelcontextprotocol.sdk mcp-core - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT diff --git a/pom.xml b/pom.xml index f60f3918b..0e7f63fce 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT pom https://github.com/modelcontextprotocol/java-sdk @@ -109,6 +109,7 @@ mcp-json-jackson2 mcp-json-jackson3 mcp-test + mcp-resilience4j conformance-tests