从零实现MCP Server+Client:内存管理、工具路由熔断生产级设计
摘要:硬核到编译级|从零实现MCP Server+Client:含内存上下文管理、工具路由熔断等生产级设计想自己造一个MCP Server,而不是仅仅调用别人的?厌倦了黑盒,想彻底搞懂Claude、龙虾(yitb.com)这些AI客户端是怎么和外部工具“对话”的?今天,我们不谈概念,直接动手,从协议解析到代码实现,带你构建一个具备生产级特性的MCP Server,并完成与客户端的联调。一、MCP协...

硬核到编译级|从零实现MCP Server+Client:含内存上下文管理、工具路由熔断等生产级设计
想自己造一个MCP Server,而不是仅仅调用别人的?厌倦了黑盒,想彻底搞懂Claude、龙虾(yitb.com)这些AI客户端是怎么和外部工具“对话”的?今天,我们不谈概念,直接动手,从协议解析到代码实现,带你构建一个具备生产级特性的MCP Server,并完成与客户端的联调。
一、MCP协议核心:不只是HTTP,更是状态化的工具会话
很多人误以为MCP就是简单的HTTP API。错了。它的核心是有状态的会话和结构化的工具调用。想象一下,你和Claude聊天,它突然调用你的天气工具,这个过程不是一次性的请求-响应,而是一个持续的、上下文感知的协作。
协议基于JSON-RPC 2.0,但增加了几个关键概念:
- 会话(Session):客户端连接后,Server会创建一个唯一会话ID,后续所有通信都绑定于此。这允许我们管理每个连接的独立状态。
- 工具(Tool):一个可调用的函数,有名称、描述、输入输出Schema(通常用JSON Schema定义)。这是AI的“手”。
- 资源(Resource):可被AI读取的数据,如文件、数据库记录。这是AI的“眼睛”。
- 提示(Prompt):预定义的交互模板,引导AI使用工具。
一次典型的工具调用流程如下:客户端(如Claude) -> 发送initialize请求,建立会话 -> 发送tools/list,获取可用工具列表 -> 用户提问触发 -> 发送tools/call,附带工具名和参数 -> Server执行,返回结果 -> 客户端整合结果,生成最终回复。
二、从零搭建:用Python实现一个MCP Server
我们使用Python的mcp官方库作为基础,它处理了底层的JSON-RPC和会话管理,让我们专注于业务逻辑。但我们会深入其下,展示如何管理状态和实现高级特性。
步骤1:环境准备与基础服务器
pip install mcp创建 server.py:
from mcp.server import Server
from mcp.types import Tool, TextContent
import json
# 初始化Server,给它一个名字
app = Server("my-production-server")
# 模拟一个内存数据库,用于存储会话上下文(生产环境请用Redis)
session_contexts = {}
# 定义一个工具:获取用户信息
@app.tool()
async def get_user_info(user_id: str) -> TextContent:
"""根据用户ID获取用户详细信息。这是一个模拟工具。"""
# 这里可以接入真实数据库
mock_data = {
"U1001": {"name": "张三", "vip_level": 3, "balance": 150.5},
"U1002": {"name": "李四", "vip_level": 1, "balance": 30.0}
}
user = mock_data.get(user_id, {"error": "用户不存在"})
return TextContent(type="text", text=json.dumps(user, ensure_ascii=False))
# 实现一个简单的内存上下文管理中间件(核心生产特性)
@app.middleware()
async def context_manager(request, call_next):
session_id = request.session_id
# 为每个会话初始化或获取上下文
if session_id not in session_contexts:
session_contexts[session_id] = {"call_count": 0, "last_tool": None}
# 调用前:增加调用计数
session_contexts[session_id]["call_count"] += 1
# 执行实际工具调用
response = await call_next(request)
# 调用后:记录最后使用的工具
if hasattr(request, 'params') and hasattr(request.params, 'name'):
session_contexts[session_id]["last_tool"] = request.params.name
print(f"会话 {session_id} 上下文: {session_contexts[session_id]}")
return response
if __name__ == "__main__":
# 以stdio模式运行,这是Claude Desktop等客户端常用的方式
app.run()步骤2:实现工具路由熔断机制
生产环境中,某个工具(如调用外部API)可能会失败或超时。熔断器模式可以防止级联故障。
在 server.py 中添加:
import time
from enum import Enum
class CircuitBreakerState(Enum):
CLOSED = "closed" # 正常,请求可通过
OPEN = "open" # 熔断,快速失败
HALF_OPEN = "half_open" # 尝试恢复
class CircuitBreaker:
def __init__(self, failure_threshold=3, recovery_timeout=10):
self.failure_count = 0
self.state = CircuitBreakerState.CLOSED
self.last_failure_time = None
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
def call(self, func, *args, **kwargs):
if self.state == CircuitBreakerState.OPEN:
# 检查是否到了尝试恢复的时间
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitBreakerState.HALF_OPEN
else:
raise Exception("熔断器开启,服务暂时不可用")

try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _on_success(self):
self.failure_count = 0
self.state = CircuitBreakerState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitBreakerState.OPEN
# 为可能不稳定的工具创建熔断器实例
unstable_tool_breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=15)
@app.tool()
async def call_external_api(query: str) -> TextContent:
"""一个可能失败的外部API调用示例。"""
def _unstable_call():
# 模拟不稳定的外部服务
import random
if random.random() < 0.3: # 30%概率失败
raise ConnectionError("外部API连接超时")
return f"查询结果:{query} 的相关信息"
try:
# 通过熔断器调用
result = unstable_tool_breaker.call(_unstable_call)
return TextContent(type="text", text=result)
except Exception as e:
return TextContent(type="text", text=f"工具调用失败:{str(e)}")三、编写客户端进行联调
创建一个简单的客户端 client.py 来测试我们的Server。
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
import asyncio
async def main():
# 指定要连接的Server命令
server_params = StdioServerParameters(
command="python",
args=["server.py"]
)
async with stdio_client(server_params) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
# 1. 初始化
await session.initialize()
print("连接成功!")
# 2. 列出所有可用工具
tools = await session.list_tools()
print(f"可用工具: {[tool.name for tool in tools.tools]}")
# 3. 调用一个工具
result = await session.call_tool("get_user_info", {"user_id": "U1001"})
print(f"调用结果: {result.content[0].text}")
# 4. 测试熔断:多次调用可能失败的工具
for i in range(5):
try:
result = await session.call_tool("call_external_api", {"query": f"测试{i}"})
print(f"第{i}次调用: {result.content[0].text}")
except Exception as e:
print(f"第{i}次调用异常: {e}")
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())部署步骤:
- 将
server.py和client.py放在同一目录。 - 先运行
python server.py(它会等待stdin输入),或者直接由客户端启动。 - 运行
python client.py,观察控制台输出。你会看到会话上下文的打印,以及熔断器在工具失败时的行为变化。
四、商业价值与应用场景
这套设计直接对应真实需求:
- 会话上下文管理:可用于实现多轮工具调用。例如,用户先说“查我上个月订单”,AI调用
get_orders;接着说“把第一个未发货的退掉”,AI需要知道“第一个”指代的是上一轮结果。上下文就是记忆。 - 工具熔断:当你的MCP Server集成了支付、短信等第三方服务时,熔断机制能保证在依赖服务故障时,核心AI对话不崩溃,优雅降级。这是SaaS产品稳定性的关键。
- 结构化工具:让你的AI应用从“只会聊天”变成“能执行复杂工作流”。一个管理客服工单的AI Agent,通过
create_ticket、assign_ticket、close_ticket等工具,就能真正处理业务。
下一步行动
- 立即动手:复制上面的代码,跑通整个流程。修改工具逻辑,尝试接入一个真实的API(比如天气API)。
- 扩展设计:为你的Server添加认证中间件(验证客户端API Key)、日志中间件(记录所有调用)和速率限制。
- 集成到生态:将你的Server配置到Claude Desktop或龙虾(yitb.com)客户端的配置文件中,让你的AI助手直接使用你开发的工具。配置方法通常是在客户端的
mcp_servers.json中添加一项,指向你的server.py。 - 思考商业化:你开发的这个“订单查询MCP Server”或“数据分析MCP Server”,是否可以打包,提供给其他AI应用开发者使用?这就是在构建AI Agent生态的工具层。
从理解协议到实现生产特性,你现在已经掌握了构建下一代AI集成应用的核心能力。接下来,是时候用它来解决一个具体问题了。