Documentation Index
Fetch the complete documentation index at: https://startai.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Model Context Protocol (MCP) 中的传输为客户端和服务器之间的通信提供了基础。传输层负责处理消息如何发送和接收的底层机制。
消息格式
MCP 使用 JSON-RPC 2.0 作为其传输格式。传输层负责将 MCP 协议消息转换为 JSON-RPC 格式以进行传输,并将接收到的 JSON-RPC 消息转换回 MCP 协议消息。
使用了三种类型的 JSON-RPC 消息:
{
jsonrpc: "2.0",
id: number | string,
method: string,
params?: object
}
{
jsonrpc: "2.0",
id: number | string,
result?: object,
error?: {
code: number,
message: string,
data?: unknown
}
}
{
jsonrpc: "2.0",
method: string,
params?: object
}
内置传输类型
MCP 包含两种标准传输实现:
标准输入/输出(stdio)
stdio 传输通过标准输入和输出流实现通信。这对于本地集成和命令行工具特别有用。
在以下情况下使用 stdio:
- 构建命令行工具
- 实现本地集成
- 需要简单的进程通信
- 使用 shell 脚本
TypeScript (Server)
TypeScript (Client)
Python (Server)
Python (Client)
const server = new Server({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new StdioServerTransport();
await server.connect(transport);
const client = new Client({
name: "example-client",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new StdioClientTransport({
command: "./server",
args: ["--option", "value"]
});
await client.connect(transport);
app = Server("example-server")
async with stdio_server() as streams:
await app.run(
streams[0],
streams[1],
app.create_initialization_options()
)
params = StdioServerParameters(
command="./server",
args=["--option", "value"]
)
async with stdio_client(params) as streams:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()
服务器发送事件(SSE)
SSE 传输通过 HTTP POST 请求实现服务器到客户端的流式传输和客户端到服务器的通信。
在以下情况下使用 SSE:
- 只需要服务器到客户端的流式传输
- 在受限网络环境中工作
- 实现简单的更新
安全警告:DNS 重绑定攻击
如果没有适当的安全措施,SSE 传输可能容易受到 DNS 重绑定攻击。为了防止这种情况:
- 始终验证传入 SSE 连接的 Origin 头,确保它们来自预期的来源
- 避免在本地运行时将服务器绑定到所有网络接口(0.0.0.0)- 而是只绑定到 localhost(127.0.0.1)
- 为所有 SSE 连接实现适当的身份验证
如果没有这些保护措施,攻击者可能会使用 DNS 重绑定从远程网站与本地 MCP 服务器进行交互。
TypeScript (Server)
TypeScript (Client)
Python (Server)
Python (Client)
import express from "express";
const app = express();
const server = new Server({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {}
});
let transport: SSEServerTransport | null = null;
app.get("/sse", (req, res) => {
transport = new SSEServerTransport("/messages", res);
server.connect(transport);
});
app.post("/messages", (req, res) => {
if (transport) {
transport.handlePostMessage(req, res);
}
});
app.listen(3000);
const client = new Client({
name: "example-client",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new SSEClientTransport(
new URL("http://localhost:3000/sse")
);
await client.connect(transport);
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route
app = Server("example-server")
sse = SseServerTransport("/messages")
async def handle_sse(scope, receive, send):
async with sse.connect_sse(scope, receive, send) as streams:
await app.run(streams[0], streams[1], app.create_initialization_options())
async def handle_messages(scope, receive, send):
await sse.handle_post_message(scope, receive, send)
starlette_app = Starlette(
routes=[
Route("/sse", endpoint=handle_sse),
Route("/messages", endpoint=handle_messages, methods=["POST"]),
]
)
async with sse_client("http://localhost:8000/sse") as streams:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()
自定义传输
MCP 使实现自定义传输变得容易。任何传输实现只需要符合传输接口:
你可以实现自定义传输用于:
- 自定义网络协议
- 专用通信通道
- 与现有系统集成
- 性能优化
interface Transport {
// 开始处理消息
start(): Promise<void>;
// 发送 JSON-RPC 消息
send(message: JSONRPCMessage): Promise<void>;
// 关闭连接
close(): Promise<void>;
// 回调
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
}
注意:虽然 MCP 服务器通常使用 asyncio 实现,但我们建议使用 anyio 实现低级接口(如传输)以获得更广泛的兼容性。@contextmanager
async def create_transport(
read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception],
write_stream: MemoryObjectSendStream[JSONRPCMessage]
):
"""
MCP 的传输接口。
参数:
read_stream: 用于读取传入消息的流
write_stream: 用于写入传出消息的流
"""
async with anyio.create_task_group() as tg:
try:
# 开始处理消息
tg.start_soon(lambda: process_messages(read_stream))
# 发送消息
async with write_stream:
yield write_stream
except Exception as exc:
# 处理错误
raise exc
finally:
# 清理
tg.cancel_scope.cancel()
await write_stream.aclose()
await read_stream.aclose()
错误处理
传输实现应该处理各种错误场景:
- 连接错误
- 消息解析错误
- 协议错误
- 网络超时
- 资源清理
错误处理示例:
class ExampleTransport implements Transport {
async start() {
try {
// 连接逻辑
} catch (error) {
this.onerror?.(new Error(`连接失败:${error}`));
throw error;
}
}
async send(message: JSONRPCMessage) {
try {
// 发送逻辑
} catch (error) {
this.onerror?.(new Error(`发送消息失败:${error}`));
throw error;
}
}
}
注意:虽然 MCP 服务器通常使用 asyncio 实现,但我们建议使用 anyio 实现低级接口(如传输)以获得更广泛的兼容性。@contextmanager
async def example_transport(scope: Scope, receive: Receive, send: Send):
try:
# 创建双向通信的流
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
async def message_handler():
try:
async with read_stream_writer:
# 消息处理逻辑
pass
except Exception as exc:
logger.error(f"处理消息失败:{exc}")
raise exc
async with anyio.create_task_group() as tg:
tg.start_soon(message_handler)
try:
# 提供流用于通信
yield read_stream, write_stream
except Exception as exc:
logger.error(f"传输错误:{exc}")
raise exc
finally:
tg.cancel_scope.cancel()
await write_stream.aclose()
await read_stream.aclose()
except Exception as exc:
logger.error(f"初始化传输失败:{exc}")
raise exc
最佳实践
在实现或使用 MCP 传输时:
- 正确处理连接生命周期
- 实现适当的错误处理
- 在连接关闭时清理资源
- 使用适当的超时机制
- 在发送前验证消息
- 记录传输事件以进行调试
- 在适当的情况下实现重连逻辑
- 处理消息队列中的背压
- 监控连接健康状况
- 实现适当的安全措施
安全考虑
在实现传输时:
身份验证和授权
- 实现适当的身份验证机制
- 验证客户端凭据
- 使用安全的令牌处理
- 实现授权检查
数据安全
- 对网络传输使用 TLS
- 加密敏感数据
- 验证消息完整性
- 实现消息大小限制
- 净化输入数据
网络安全
- 实现速率限制
- 使用适当的超时
- 处理拒绝服务场景
- 监控异常模式
- 实现适当的防火墙规则
- 对于 SSE 传输,验证 Origin 头以防止 DNS 重绑定攻击
- 对于本地 SSE 服务器,只绑定到 localhost(127.0.0.1)而不是所有接口(0.0.0.0)
调试传输
调试传输问题的技巧:
- 启用调试日志
- 监控消息流
- 检查连接状态
- 验证消息格式
- 测试错误场景
- 使用网络分析工具
- 实现健康检查
- 监控资源使用
- 测试边缘情况
- 使用适当的错误跟踪