网站首页 > 教程文章 正文
本文为 MCP 系列的第二篇。在前文《MCP:连接大模型和外部世界的 USB-C 接口》,我们介绍了 MCP(模型上下文协议)的核心概念、架构和使用场景。本文详细介绍 MCP 客户端与客户端的交互流程,并以 MCP 的官方 python sdk 代码为例进行代码详解,以帮助我们更好地理解相关知识。
核心功能对象
- 会话管理:管理用户与 LLM 的交互对话,维护对话的历史消息;关联 MCP 客户端。
- MCP 客户端:保持与 MCP 服务端 1:1 连接的协议客户端。
- MCP 服务端:通过标准化的 MCP(模型上下文协议)暴露特定功能的轻量级程序(如,暴露工具,并提供工具调用服务)
- 工具集:MCP 服务端管理的工具集。
客户端与服务端交互流程
MCP 服务端可以提供资源、工具等 API,供客户端调用。这里以 MCP 的官方 python sdk 的示例代码 mcp_simple_chatbot (https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/clients/simple-chatbot/mcp_simple_chatbot/main.py)为例,说明 MCP 客户端与 MCP 服务端交互的交互流程,实现 LLM 对外部工具的调用。
- 用户输入问题
- 会话管理(ChatSession) 将问题发送给 LLMClient
- LLMClient 调用 LLM API,获取 LLM 的响应
- 根据 LLM 响应判断是否需要使用工具,若要使用工具,执行以下流程:
- 请求 Server 执行工具(请求内容包含工具名、参数等信息)
- Server 调用具体的 Tool,并返回工具执行结果
- ChatSession 将工具结果发送给 LLM
- LLM 根据工具执行结果,返回最终响应
- 会话管理(ChatSession)将最终响应返回给用户
mcp_simple_chatbot 代码详解
官方的 mcp_simple_chatbot 示例(https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/clients/simple-chatbot/mcp_simple_chatbot/main.py),实现了基于 MCP 协议的外部工具调用。以下介绍其核心代码。
Server
Server 类是 MCP 客户端中负责管理与 MCP 服务器连接和工具执行的核心组件。
1 初始化与配置管理
- 管理服务器名称和配置信息
- 维护客户端会话状态
- 使用锁机制确保资源清理的线程安全
- 使用 AsyncExitStack 管理异步资源
def __init__(self, name: str, config: dict[str, Any]) -> None:
self.name: str = name
self.config: dict[str, Any] = config
self.stdio_context: Any | None = None
self.session: ClientSession | None = None
self._cleanup_lock: asyncio.Lock = asyncio.Lock()
self.exit_stack: AsyncExitStack = AsyncExitStack()
2 服务器连接初始化
这里的 MCP 服务端是在本地机器上运行的(运行相关命令 command,启动 MCP 服务端)
- 解析并验证服务器启动命令
- 配置服务器参数(命令、参数、环境变量)
- 建立与服务器的通信通道
- 初始化客户端会话
async def initialize(self) -> None:
"""Initialize the server connection."""
command = (
shutil.which("npx")
if self.config["command"] == "npx"
else self.config["command"]
)
if command is None:
raise ValueError("The command must be a valid string and cannot be None.")
server_params = StdioServerParameters(
command=command,
args=self.config["args"],
env={**os.environ, **self.config["env"]}
if self.config.get("env")
else None,
)
try:
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
read, write = stdio_transport
session = await self.exit_stack.enter_async_context(
ClientSession(read, write)
)
await session.initialize()
self.session = session
except Exception as e:
logging.error(f"Error initializing server {self.name}: {e}")
await self.cleanup()
raise
3 工具管理
- MCP 服务端提供相关的 API,暴露给其可提供的工具集信息(将外部资源工具,以接口服务的形式提供,这是 MCP 与 Function Call 的区别之一)。调用服务端 API,获取全部工具信息。
- 将工具信息转换为标准格式并返回
async def list_tools(self) -> list[Any]:
"""List available tools from the server.
Returns:
A list of available tools.
Raises:
RuntimeError: If the server is not initialized.
"""
if not self.session:
raise RuntimeError(f"Server {self.name} not initialized")
tools_response = await self.session.list_tools()
tools = []
for item in tools_response:
if isinstance(item, tuple) and item[0] == "tools":
for tool in item[1]:
tools.append(Tool(tool.name, tool.description, tool.inputSchema))
return tools
4 工具执行
- 执行指定的工具,并返回结果
- 提供重试机制处理临时故障,支持自定义重试次数和延迟
async def execute_tool(
self,
tool_name: str,
arguments: dict[str, Any],
retries: int = 2,
delay: float = 1.0,
) -> Any:
"""Execute a tool with retry mechanism.
Args:
tool_name: Name of the tool to execute.
arguments: Tool arguments.
retries: Number of retry attempts.
delay: Delay between retries in seconds.
Returns:
Tool execution result.
Raises:
RuntimeError: If server is not initialized.
Exception: If tool execution fails after all retries.
"""
if not self.session:
raise RuntimeError(f"Server {self.name} not initialized")
attempt = 0
while attempt < retries:
try:
logging.info(f"Executing {tool_name}...")
result = await self.session.call_tool(tool_name, arguments)
return result
except Exception as e:
attempt += 1
logging.warning(
f"Error executing tool: {e}. Attempt {attempt} of {retries}."
)
if attempt < retries:
logging.info(f"Retrying in {delay} seconds...")
await asyncio.sleep(delay)
else:
logging.error("Max retries reached. Failing.")
raise
Tool 类
Tool 类提供的工具的定义和格式化功能
1 定义
- 工具名称:唯一标识符
- 工具描述:说明工具的功能和用途
- 输入模式:定义工具的参数和类型
def __init__(
self, name: str, description: str, input_schema: dict[str, Any]
) -> None:
self.name: str = name
self.description: str = description
self.input_schema: dict[str, Any] = input_schema
2 格式化功能
为 LLM 格式化工具信息
- 生成参数描述,标记必需参数
- 提供清晰的工具文档
def format_for_llm(self) -> str:
"""Format tool information for LLM.
Returns:
A formatted string describing the tool.
"""
args_desc = []
if "properties" in self.input_schema:
for param_name, param_info in self.input_schema["properties"].items():
arg_desc = (
f"- {param_name}: {param_info.get('description', 'No description')}"
)
if param_name in self.input_schema.get("required", []):
arg_desc += " (required)"
args_desc.append(arg_desc)
return f"""
Tool: {self.name}
Description: {self.description}
Arguments:
{chr(10).join(args_desc)}
"""
LLMClient
LLMClient 负责管理与 LLM 的连接,向 LLM 发送请求,获取 LLM 返回的响应
try:
with httpx.Client() as client:
response = client.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
except httpx.RequestError as e:
error_message = f"Error getting LLM response: {str(e)}"
logging.error(error_message)
if isinstance(e, httpx.HTTPStatusError):
status_code = e.response.status_code
logging.error(f"Status code: {status_code}")
logging.error(f"Response details: {e.response.text}")
return (
f"I encountered an error: {error_message}. "
"Please try again or rephrase your request."
)
ChatSession 类(会话管理)
ChatSession 负责协调用户、LLM 和客户端之间交互
1 LLM 响应处理
- 解析 LLM 响应,判断 LLM 是否要求调用工具
- 若 LLM 响应包含调用工具信息,执行相应的工具,并返回工具执行结果
async def process_llm_response(self, llm_response: str) -> str:
"""Process the LLM response and execute tools if needed.
Args:
llm_response: The response from the LLM.
Returns:
The result of tool execution or the original response.
"""
import json
try:
tool_call = json.loads(llm_response)
if "tool" in tool_call and "arguments" in tool_call:
logging.info(f"Executing tool: {tool_call['tool']}")
logging.info(f"With arguments: {tool_call['arguments']}")
for server in self.servers:
tools = await server.list_tools()
if any(tool.name == tool_call["tool"] for tool in tools):
try:
result = await server.execute_tool(
tool_call["tool"], tool_call["arguments"]
)
if isinstance(result, dict) and "progress" in result:
progress = result["progress"]
total = result["total"]
percentage = (progress / total) * 100
logging.info(
f"Progress: {progress}/{total} "
f"({percentage:.1f}%)"
)
return f"Tool execution result: {result}"
except Exception as e:
error_msg = f"Error executing tool: {str(e)}"
logging.error(error_msg)
return error_msg
return f"No server found with tool: {tool_call['tool']}"
return llm_response
except json.JSONDecodeError:
return llm_response
2 对话管理
- 初始化服务器环境
- 收集 MCP 服务端提供的全部工具信息
- 构建系统提示(这里的系统 prompt 比较关键,要求 LLM 自行判断是否要执行工具。LLM 若判断要执行工具,需要按照规定的 Json 格式,返回工具调用请求信息)
主循环处理:
- 处理用户输入
- 管理对话上下文,将历史对话信息保存在 messages
- 协调工具执行,并将工具执行结果保存到对话上下文。
- 生成最终响应
async def start(self) -> None:
"""Main chat session handler."""
try:
for server in self.servers:
try:
await server.initialize()
except Exception as e:
logging.error(f"Failed to initialize server: {e}")
await self.cleanup_servers()
return
all_tools = []
for server in self.servers:
tools = await server.list_tools()
all_tools.extend(tools)
tools_description = "\n".join([tool.format_for_llm() for tool in all_tools])
system_message = (
"You are a helpful assistant with access to these tools:\n\n"
f"{tools_description}\n"
"Choose the appropriate tool based on the user's question. "
"If no tool is needed, reply directly.\n\n"
"IMPORTANT: When you need to use a tool, you must ONLY respond with "
"the exact JSON object format below, nothing else:\n"
"{\n"
' "tool": "tool-name",\n'
' "arguments": {\n'
' "argument-name": "value"\n'
" }\n"
"}\n\n"
"After receiving a tool's response:\n"
"1. Transform the raw data into a natural, conversational response\n"
"2. Keep responses concise but informative\n"
"3. Focus on the most relevant information\n"
"4. Use appropriate context from the user's question\n"
"5. Avoid simply repeating the raw data\n\n"
"Please use only the tools that are explicitly defined above."
)
messages = [{"role": "system", "content": system_message}]
while True:
try:
user_input = input("You: ").strip().lower()
if user_input in ["quit", "exit"]:
logging.info("\nExiting...")
break
messages.append({"role": "user", "content": user_input})
llm_response = self.llm_client.get_response(messages)
logging.info("\nAssistant: %s", llm_response)
result = await self.process_llm_response(llm_response)
if result != llm_response:
messages.append({"role": "assistant", "content": llm_response})
messages.append({"role": "system", "content": result})
final_response = self.llm_client.get_response(messages)
logging.info("\nFinal response: %s", final_response)
messages.append(
{"role": "assistant", "content": final_response}
)
else:
messages.append({"role": "assistant", "content": llm_response})
except KeyboardInterrupt:
logging.info("\nExiting...")
break
finally:
await self.cleanup_servers()
main 函数
- 设置必要的环境
- 初始化所有组件
- 建立必要的连接
- 启动交互式会话
async def main() -> None:
"""Initialize and run the chat session."""
config = Configuration()
server_config = config.load_config("servers_config.json")
servers = [
Server(name, srv_config)
for name, srv_config in server_config["mcpServers"].items()
]
llm_client = LLMClient(config.llm_api_key)
chat_session = ChatSession(servers, llm_client)
await chat_session.start()
以上是官方的 mcp_simple_chatbot 示例的代码详解。
若你对数据和 AI 感兴趣,欢迎关注我的公众号:数智脉动,一起探索数智之旅
猜你喜欢
- 2025-05-03 用Arduino mega2560及Ethernet扩展板做一个web简易服务器
- 2025-05-03 不买光盘 不用下载 Stadia:游戏要云起来
- 2025-05-03 linux命令 - fuser、lsof、pidof学习
- 2025-05-03 Linux与windows共享文件的神器:samba
- 2025-05-03 SpringBoot数据校验与优雅处理详解
- 2025-05-03 用活firewalld防火墙之service(防火墙运行命令)
- 2025-05-03 深度剖析 Spring Cloud Eureka 底层实现原理
- 2025-05-03 分享一下面试被问了负载均衡的这些问题
- 2025-05-03 一分钟了解SpringCloud中的ribbon到底是什么,原理是啥?
- 2025-05-03 内存最大,最省电NAS装机:34瓦128GB内存,14盘位NAS设置教程
- 最近发表
-
- 一课译词:一刀两断(一刀两断成语解释)
- 核心短语break up用法解析(breakd短语)
- HTML+CSS 实现商品图片列表放大视觉效果 复制完整代码即可马上调用
- 前端实现右键自定义菜单(前端实现右键自定义菜单怎么设置)
- Python中docx与docxcompose批量合并多个Word文档并添加分页符
- Java 将Excel转为XML(java将xls转换成xlsx)
- jq+ajax+bootstrap改了一个动态分页的表格
- css兼容性问题及一些常见问题汇总大全,赶快收藏!
- Java 的业务逻辑验证框架 之-fluent-validator
- 小程序cover-view踩坑系列2(微信小程序overflow)
- 标签列表
-
- location.href (44)
- document.ready (36)
- git checkout -b (34)
- 跃点数 (35)
- 阿里云镜像地址 (33)
- qt qmessagebox (36)
- mybatis plus page (35)
- vue @scroll (38)
- 堆栈区别 (33)
- 什么是容器 (33)
- sha1 md5 (33)
- navicat导出数据 (34)
- 阿里云acp考试 (33)
- 阿里云 nacos (34)
- redhat官网下载镜像 (36)
- srs服务器 (33)
- pico开发者 (33)
- https的端口号 (34)
- vscode更改主题 (35)
- 阿里云资源池 (34)
- os.path.join (33)
- redis aof rdb 区别 (33)
- 302跳转 (33)
- http method (35)
- js array splice (33)