Skip to content

Commit bea3244

Browse files
Poggeccicopybara-github
authored andcommitted
feat: add support for Streamable HTTP Connections to MCP Tools
PiperOrigin-RevId: 802274756
1 parent 117f36d commit bea3244

4 files changed

Lines changed: 145 additions & 4 deletions

File tree

core/src/main/java/com/google/adk/tools/mcp/DefaultMcpTransportBuilder.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@
22

33
import com.google.common.collect.ImmutableMap;
44
import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
5+
import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport;
56
import io.modelcontextprotocol.client.transport.ServerParameters;
67
import io.modelcontextprotocol.client.transport.StdioClientTransport;
78
import io.modelcontextprotocol.spec.McpClientTransport;
89
import java.util.Collection;
910
import java.util.Optional;
11+
import reactor.core.publisher.Mono;
1012

1113
/**
1214
* The default builder for creating MCP client transports. Supports StdioClientTransport based on
13-
* {@link ServerParameters} and the standard HttpClientSseClientTransport based on {@link
14-
* SseServerParameters}.
15+
* {@link ServerParameters}, HttpClientSseClientTransport based on {@link SseServerParameters}, and
16+
* HttpClientStreamableHttpTransport based on {@link StreamableHttpServerParameters}.
1517
*/
1618
public class DefaultMcpTransportBuilder implements McpTransportBuilder {
1719

@@ -37,10 +39,21 @@ public McpClientTransport build(Object connectionParams) {
3739
.map(Object::toString)
3840
.orElse(""))))
3941
.build();
42+
} else if (connectionParams instanceof StreamableHttpServerParameters streamableParams) {
43+
return HttpClientStreamableHttpTransport.builder(streamableParams.url())
44+
.connectTimeout(streamableParams.timeout())
45+
// HttpClientStreamableHttpTransport uses connectTimeout for general HTTP ops
46+
// and sseReadTimeout for the SSE stream part.
47+
.asyncHttpRequestCustomizer(
48+
(builder, method, uri, body) -> {
49+
streamableParams.headers().forEach((key, value) -> builder.header(key, value));
50+
return Mono.just(builder);
51+
})
52+
.build();
4053
} else {
4154
throw new IllegalArgumentException(
42-
"DefaultMcpTransportBuilder supports only ServerParameters or SseServerParameters, but"
43-
+ " got "
55+
"DefaultMcpTransportBuilder supports only ServerParameters, SseServerParameters, or"
56+
+ " StreamableHttpServerParameters, but got "
4457
+ connectionParams.getClass().getName());
4558
}
4659
}

core/src/main/java/com/google/adk/tools/mcp/McpSessionManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ public static McpSyncClient initializeSession(
6565
if (connectionParams instanceof SseServerParameters sseServerParams) {
6666
initializationTimeout = sseServerParams.timeout();
6767
requestTimeout = sseServerParams.sseReadTimeout();
68+
} else if (connectionParams instanceof StreamableHttpServerParameters streamableParams) {
69+
initializationTimeout = streamableParams.timeout();
70+
requestTimeout = streamableParams.sseReadTimeout();
6871
}
6972
McpSyncClient client =
7073
McpClient.sync(transport)
@@ -95,6 +98,9 @@ public static McpAsyncClient initializeAsyncSession(
9598
if (connectionParams instanceof SseServerParameters sseServerParams) {
9699
initializationTimeout = sseServerParams.timeout();
97100
requestTimeout = sseServerParams.sseReadTimeout();
101+
} else if (connectionParams instanceof StreamableHttpServerParameters streamableParams) {
102+
initializationTimeout = streamableParams.timeout();
103+
requestTimeout = streamableParams.sseReadTimeout();
98104
}
99105
return McpClient.async(transport)
100106
.initializationTimeout(

core/src/main/java/com/google/adk/tools/mcp/McpToolset.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,34 @@ public McpToolset(
165165
this.toolFilter = toolFilter;
166166
}
167167

168+
/**
169+
* Initializes the McpToolset with Steamable HTTP server parameters.
170+
*
171+
* @param connectionParams The Streamable HTTP connection parameters to the MCP server.
172+
* @param objectMapper An ObjectMapper instance for parsing schemas.
173+
* @param toolFilter An Optional containing either a ToolPredicate or a List of tool names.
174+
*/
175+
public McpToolset(
176+
StreamableHttpServerParameters connectionParams,
177+
ObjectMapper objectMapper,
178+
Optional<Object> toolFilter) {
179+
Objects.requireNonNull(connectionParams);
180+
Objects.requireNonNull(objectMapper);
181+
this.objectMapper = objectMapper;
182+
this.mcpSessionManager = new McpSessionManager(connectionParams);
183+
this.toolFilter = toolFilter;
184+
}
185+
186+
/**
187+
* Initializes the McpToolset with Streamable HTTP server parameters, using the ObjectMapper used
188+
* across the ADK and no tool filter.
189+
*
190+
* @param connectionParams The Streamable HTTP connection parameters to the MCP server.
191+
*/
192+
public McpToolset(StreamableHttpServerParameters connectionParams) {
193+
this(connectionParams, JsonBaseModel.getMapper(), Optional.empty());
194+
}
195+
168196
@Override
169197
public Flowable<BaseTool> getTools(ReadonlyContext readonlyContext) {
170198
return Flowable.fromCallable(
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.adk.tools.mcp;
18+
19+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
20+
import io.modelcontextprotocol.util.Assert;
21+
import java.time.Duration;
22+
import java.util.Collections;
23+
import java.util.Map;
24+
25+
/**
26+
* Server parameters for Streamable HTTP client transport.
27+
*
28+
* @param url The base URL for the MCP Streamable HTTP server.
29+
* @param headers Optional headers to include in requests.
30+
* @param timeout Timeout for HTTP operations (default: 30 seconds).
31+
* @param sseReadTimeout Timeout for reading data from the SSE stream (default: 5 minutes).
32+
* @param terminateOnClose Whether to terminate the session on close (default: true).
33+
*/
34+
public record StreamableHttpServerParameters(
35+
String url,
36+
Map<String, String> headers,
37+
Duration timeout,
38+
Duration sseReadTimeout,
39+
boolean terminateOnClose) {
40+
41+
public StreamableHttpServerParameters {
42+
Assert.hasText(url, "url must not be empty");
43+
headers = headers == null ? Collections.emptyMap() : headers;
44+
timeout = timeout == null ? Duration.ofSeconds(30) : timeout;
45+
sseReadTimeout = sseReadTimeout == null ? Duration.ofMinutes(5) : sseReadTimeout;
46+
}
47+
48+
public static Builder builder(String url) {
49+
return new Builder(url);
50+
}
51+
52+
/** Builder for {@link StreamableHttpServerParameters}. */
53+
public static class Builder {
54+
private final String url;
55+
private Map<String, String> headers;
56+
private Duration timeout = Duration.ofSeconds(30);
57+
private Duration sseReadTimeout = Duration.ofMinutes(5);
58+
private boolean terminateOnClose = true;
59+
60+
private Builder(String url) {
61+
Assert.hasText(url, "url must not be empty");
62+
this.url = url;
63+
}
64+
65+
@CanIgnoreReturnValue
66+
public Builder headers(Map<String, String> headers) {
67+
this.headers = headers;
68+
return this;
69+
}
70+
71+
@CanIgnoreReturnValue
72+
public Builder timeout(Duration timeout) {
73+
this.timeout = timeout;
74+
return this;
75+
}
76+
77+
@CanIgnoreReturnValue
78+
public Builder sseReadTimeout(Duration sseReadTimeout) {
79+
this.sseReadTimeout = sseReadTimeout;
80+
return this;
81+
}
82+
83+
@CanIgnoreReturnValue
84+
public Builder terminateOnClose(boolean terminateOnClose) {
85+
this.terminateOnClose = terminateOnClose;
86+
return this;
87+
}
88+
89+
public StreamableHttpServerParameters build() {
90+
return new StreamableHttpServerParameters(
91+
url, headers, timeout, sseReadTimeout, terminateOnClose);
92+
}
93+
}
94+
}

0 commit comments

Comments
 (0)