网站首页 > 教程文章 正文
1. 概述
Model Context Protocol (MCP) 是一个标准化的协议,使 AI 模型能够以结构化的方式与外部工具和资源交互。在 Spring AI 中,
HttpClientSseClientTransport 是实现 MCP 客户端通过 HTTP SSE(Server-Sent Events)与服务器通信的核心组件。
2. 核心架构
2.1 双向通信模式
MCP 采用双向通信模式:
- HTTP GET:客户端 → 服务器(请求建立)
- SSE 连接:服务器 → 客户端(推送事件)
- HTTP POST:客户端 → 服务器(按需发送)
2.2 核心组件关系
McpSyncClient (同步客户端)
↓ 委托
McpAsyncClient (异步客户端)
↓ 使用
McpClientSession (会话管理)
↓ 使用
HttpClientSseClientTransport (SSE 传输层)
↓ 使用
FlowSseClient (SSE 客户端)
3. 详细流程分析
3.1客户端创建阶段
3.1.1 传输层配置
// 1. HTTP 客户端配置
HttpClient.Builder httpClientBuilder = HttpClient.newBuilder();
httpClientBuilder.connectTimeout(Duration.ofSeconds(5));
// 2. HTTP 请求构建器配置
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder();
// 3. SSE 传输层构建
HttpClientSseClientTransport transport = HttpClientSseClientTransport.builder("http://localhost:8080")
.requestBuilder(requestBuilder)
.sseEndpoint("/customer/mcp/sse")
.objectMapper(objectMapper)
.clientBuilder(httpClientBuilder).build();
关键配置说明:
- connectTimeout(Duration.ofSeconds(5)):设置连接超时为 5 秒
- sseEndpoint("/customer/mcp/sse"):指定 SSE 端点路径
- objectMapper:用于 JSON 序列化/反序列化
3.1.2 客户端构建
// 4. MCP 同步客户端创建
McpSyncClient mcpSyncClient = McpClient.sync(transport)
.requestTimeout(Duration.ofSeconds(10))
.build();
构建过程详解:
在 McpClient.SyncSpec.build() 方法中:
public McpSyncClient build() {
McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(
this.clientInfo,
this.capabilities,
this.roots,
this.toolsChangeConsumers,
this.resourcesChangeConsumers,
this.promptsChangeConsumers,
this.loggingConsumers,
this.samplingHandler
);
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
return new McpSyncClient(
new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, asyncFeatures)
);
}
3.2 客户端初始化阶段
3.2.1 会话创建
在 McpAsyncClient 构造函数中:
McpAsyncClient(McpClientTransport transport, Duration requestTimeout, Duration initializationTimeout,
McpClientFeatures.Async features) {
// 配置请求处理器
Map<String, RequestHandler<?>> requestHandlers = new HashMap<>();
// 配置通知处理器
Map<String, NotificationHandler> notificationHandlers = new HashMap<>();
// 创建 MCP 会话
this.mcpSession = new McpClientSession(requestTimeout, transport, requestHandlers, notificationHandlers);
}
3.2.2 传输层连接建立
在 McpClientSession 构造函数中:
public McpClientSession(Duration requestTimeout, McpClientTransport transport,
Map<String, RequestHandler<?>> requestHandlers, Map<String, NotificationHandler> notificationHandlers) {
// 建立传输层连接
this.connection = this.transport.connect(mono -> mono.doOnNext(this::handle)).subscribe();
}
3.3SSE连接建立过程
3.3.1 连接初始化
在
HttpClientSseClientTransport.connect() 方法中:
@Override
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
CompletableFuture<Void> future = new CompletableFuture<>();
connectionFuture.set(future);
// 构建 SSE 连接 URI
URI clientUri = Utils.resolveUri(this.baseUri, this.sseEndpoint);
// 订阅 SSE 事件流
sseClient.subscribe(clientUri.toString(), new FlowSseClient.SseEventHandler() {
@Override
public void onEvent(SseEvent event) {
if (isClosing) {
return;
}
try {
if (ENDPOINT_EVENT_TYPE.equals(event.type())) {
// 处理端点发现事件
String endpoint = event.data();
messageEndpoint.set(endpoint);
closeLatch.countDown();
future.complete(null);
}
else if (MESSAGE_EVENT_TYPE.equals(event.type())) {
// 处理 JSON-RPC 消息
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, event.data());
handler.apply(Mono.just(message)).subscribe();
}
else {
logger.error("Received unrecognized SSE event type: {}", event.type());
}
}
catch (IOException e) {
logger.error("Error processing SSE event", e);
future.completeExceptionally(e);
}
}
@Override
public void onError(Throwable error) {
if (!isClosing) {
logger.error("SSE connection error", error);
future.completeExceptionally(error);
}
}
});
return Mono.fromFuture(future);
}
3.3.2 SSE 事件流处理
在 FlowSseClient.subscribe() 方法中:
public void subscribe(String url, SseEventHandler eventHandler) {
// 构建 SSE 请求
HttpRequest request = this.requestBuilder.uri(URI.create(url))
.header("Accept", "text/event-stream")
.header("Cache-Control", "no-cache")
.GET()
.build();
StringBuilder eventBuilder = new StringBuilder();
AtomicReference<String> currentEventId = new AtomicReference<>();
AtomicReference<String> currentEventType = new AtomicReference<>("message");
// 创建 Flow.Subscriber 处理 SSE 流
Flow.Subscriber<String> lineSubscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(String line) {
if (line.isEmpty()) {
// 空行表示事件结束,处理完整事件
if (eventBuilder.length() > 0) {
String eventData = eventBuilder.toString();
SseEvent event = new SseEvent(
currentEventId.get(),
currentEventType.get(),
eventData.trim()
);
eventHandler.onEvent(event);
eventBuilder.setLength(0);
}
}
else {
// 解析 SSE 事件字段
if (line.startsWith("data:")) {
var matcher = EVENT_DATA_PATTERN.matcher(line);
if (matcher.find()) {
eventBuilder.append(matcher.group(1).trim()).append("\n");
}
}
else if (line.startsWith("id:")) {
var matcher = EVENT_ID_PATTERN.matcher(line);
if (matcher.find()) {
currentEventId.set(matcher.group(1).trim());
}
}
else if (line.startsWith("event:")) {
var matcher = EVENT_TYPE_PATTERN.matcher(line);
if (matcher.find()) {
currentEventType.set(matcher.group(1).trim());
}
}
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
eventHandler.onError(throwable);
}
@Override
public void onComplete() {
// 处理剩余的事件数据
if (eventBuilder.length() > 0) {
String eventData = eventBuilder.toString();
SseEvent event = new SseEvent(
currentEventId.get(),
currentEventType.get(),
eventData.trim()
);
eventHandler.onEvent(event);
}
}
};
// 发送异步请求
Function<Flow.Subscriber<String>, HttpResponse.BodySubscriber<Void>> subscriberFactory =
subscriber -> HttpResponse.BodySubscribers.fromLineSubscriber(subscriber);
CompletableFuture<HttpResponse<Void>> future = this.httpClient.sendAsync(request,
info -> subscriberFactory.apply(lineSubscriber));
future.thenAccept(response -> {
int status = response.statusCode();
if (status != 200 && status != 201 && status != 202 && status != 206) {
throw new RuntimeException("Failed to connect to SSE stream. Unexpected status code: " + status);
}
}).exceptionally(throwable -> {
eventHandler.onError(throwable);
return null;
});
}
3.4 客户端初始化
3.4.1初始化调用
// 5. 客户端初始化
mcpSyncClient.initialize();
3.4.2 初始化流程
在 McpAsyncClient.initialize() 方法中:
public Mono<McpSchema.InitializeResult> initialize() {
String latestVersion = this.protocolVersions.get(this.protocolVersions.size() - 1);
// 构建初始化请求
McpSchema.InitializeRequest initializeRequest = new McpSchema.InitializeRequest(
latestVersion,
this.clientCapabilities,
this.clientInfo
);
// 发送初始化请求
Mono<McpSchema.InitializeResult> result = this.mcpSession.sendRequest(
McpSchema.METHOD_INITIALIZE,
initializeRequest,
new TypeReference<McpSchema.InitializeResult>() {}
);
return result.flatMap(initializeResult -> {
// 保存服务器信息
this.serverCapabilities = initializeResult.capabilities();
this.serverInstructions = initializeResult.instructions();
this.serverInfo = initializeResult.serverInfo();
logger.info("Server response with Protocol: {}, Capabilities: {}, Info: {} and Instructions {}",
initializeResult.protocolVersion(), initializeResult.capabilities(),
initializeResult.serverInfo(), initializeResult.instructions());
// 验证协议版本
if (!this.protocolVersions.contains(initializeResult.protocolVersion())) {
return Mono.error(new McpError(
"Unsupported protocol version from the server: " + initializeResult.protocolVersion()));
}
// 发送初始化完成通知
return this.mcpSession.sendNotification(McpSchema.METHOD_NOTIFICATION_INITIALIZED, null)
.doOnSuccess(v -> {
this.initialized.set(true);
this.initializedSink.tryEmitValue(initializeResult);
})
.thenReturn(initializeResult);
});
}
3.5 请求发送机制
3.5.1 请求发送流程
在
McpClientSession.sendRequest() 方法中:
@Override
public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReference<T> typeRef) {
String requestId = this.generateRequestId();
return Mono.deferContextual(ctx -> Mono.<McpSchema.JSONRPCResponse>create(sink -> {
// 保存待处理的响应
this.pendingResponses.put(requestId, sink);
// 构建 JSON-RPC 请求
McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(
McpSchema.JSONRPC_VERSION,
method,
requestId,
requestParams
);
// 发送请求
this.transport.sendMessage(jsonrpcRequest)
.contextWrite(ctx)
.subscribe(v -> {
}, error -> {
this.pendingResponses.remove(requestId);
sink.error(error);
});
})).timeout(this.requestTimeout).handle((jsonRpcResponse, sink) -> {
if (jsonRpcResponse.error() != null) {
logger.error("Error handling request: {}", jsonRpcResponse.error());
sink.error(new McpError(jsonRpcResponse.error()));
}
else {
if (typeRef.getType().equals(Void.class)) {
sink.complete();
}
else {
sink.next(this.transport.unmarshalFrom(jsonRpcResponse.result(), typeRef));
}
}
});
}
3.5.2消息发送实现
在
HttpClientSseClientTransport.sendMessage() 方法中:
@Override
public Mono<Void> sendMessage(JSONRPCMessage message) {
if (isClosing) {
return Mono.empty();
}
try {
// 等待端点发现
if (!closeLatch.await(10, TimeUnit.SECONDS)) {
return Mono.error(new McpError("Failed to wait for the message endpoint"));
}
}
catch (InterruptedException e) {
return Mono.error(new McpError("Failed to wait for the message endpoint"));
}
String endpoint = messageEndpoint.get();
if (endpoint == null) {
return Mono.error(new McpError("No message endpoint available"));
}
try {
// 序列化消息
String jsonText = this.objectMapper.writeValueAsString(message);
// 构建请求 URI
URI requestUri = Utils.resolveUri(baseUri, endpoint);
// 构建 HTTP POST 请求
HttpRequest request = this.requestBuilder.uri(requestUri)
.POST(HttpRequest.BodyPublishers.ofString(jsonText))
.build();
// 发送异步请求
return Mono.fromFuture(
httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding())
.thenAccept(response -> {
if (response.statusCode() != 200 && response.statusCode() != 201 &&
response.statusCode() != 202 && response.statusCode() != 206) {
logger.error("Error sending message: {}", response.statusCode());
}
})
);
}
catch (IOException e) {
if (!isClosing) {
return Mono.error(new RuntimeException("Failed to serialize message", e));
}
return Mono.empty();
}
}
4.关键机制总结
4.1双向通信机制
- SSE 连接:建立持久连接接收服务器推送
- 端点发现:通过 SSE 接收消息发送端点
- HTTP POST:使用发现的端点发送客户端消息
4.2事件处理机制
- endpoint 事件:服务器发送消息端点 URL
- message 事件:服务器发送 JSON-RPC 消息
- 错误处理:连接错误和消息处理错误
4.3会话管理机制
- 请求 ID 生成:确保请求-响应匹配
- 超时处理:请求超时和连接超时
- 状态管理:初始化状态和连接状态
4.4 错误处理机制
- 连接错误:SSE 连接失败处理
- 序列化错误:JSON 序列化/反序列化错误
- 超时错误:请求超时和端点发现超时
猜你喜欢
- 2025-08-05 浅谈ActiveMQ与使用
- 2025-08-05 Chinese premier calls for forging example of openness, development cooperation with ASEAN, GCC
- 2025-08-05 西门子博途有关通过 PUT/GET 指令通信的基本信息
- 2025-08-05 数据源连接池的原理及 Tomcat 中的应用
- 2025-08-05 ActiveMQ实现站内消息提醒功能
- 2025-08-05 Android 传统(经典)蓝牙框架
- 2025-08-05 前端测试新范式:Vitest+Playwright 如何让测试效率提升 400%
- 2025-08-05 亚马逊云代理商:怎样使用Glue构建ETL管道?
- 2025-08-05 一个基础又很重要的知识点:JDBC原理(基本案例和面试知识点)
- 2025-08-05 Python教程(三十):网络编程基础
- 08-05 Docker Compose 编排实战:一键部署多容器应用!
- 08-05 Docker 命令入门实战:搞懂这些才算真正入门!
- 08-05Docker 镜像构建加速与镜像瘦身实战,一篇就够!
- 08-05Docker 常用命令手册
- 08-05Docker命令大全详解(39个常用命令)
- 08-05Docker镜像越来越大?我用这 3 个技巧直接瘦身 80%!附实战优化对比
- 08-05Docker容器与镜像详解(6大核心区别)
- 08-05docker镜像操作
- 最近发表
- 标签列表
-
- location.href (44)
- document.ready (36)
- git checkout -b (34)
- 跃点数 (35)
- 阿里云镜像地址 (33)
- qt qmessagebox (36)
- mybatis plus page (35)
- vue @scroll (38)
- 堆栈区别 (33)
- 什么是容器 (33)
- sha1 md5 (33)
- navicat导出数据 (34)
- 阿里云acp考试 (33)
- 阿里云 nacos (34)
- redhat官网下载镜像 (36)
- srs服务器 (33)
- pico开发者 (33)
- https的端口号 (34)
- vscode更改主题 (35)
- 阿里云资源池 (34)
- os.path.join (33)
- redis aof rdb 区别 (33)
- 302跳转 (33)
- http method (35)
- js array splice (33)