fastmcp>=0.1.0
httpx>=0.25.0
#!/usr/bin/env python3
# =============================================================================
# s13_mcp_connector.py - MCP Connector / MCP 连接器
# =============================================================================
#
# 本文件展示如何使用 FastMCP 连接 MCP 服务器,让 agent 调用 MCP 工具
# This file shows how to use FastMCP to connect to MCP servers and call MCP tools
#
# 核心洞察 / Key Insight:
# "FastMCP 简化了 MCP 客户端的实现,支持本地和远程两种模式"
# "FastMCP simplifies MCP client implementation, supporting both local and remote modes"
#
# 流程图 / Flow Diagram:
# Agent FastMCPClient MCP Server
# +------------------+ +------------------+ +------------------+
# | messages=[...] | | | | |
# | | | connect() | | |
# | tool: mcp_hello | ----->| call_tool() | ----->| execute tool |
# | | | | | |
# | result = "..." |<------| return result |<------| return result |
# +------------------+ +------------------+ +------------------+
#
# MCP 连接模式 / MCP Connection Modes:
# - 本地模式 (stdio): 通过子进程启动 MCP 服务器
# - 远程模式 (HTTP): 通过 HTTP URL 连接远程 MCP 服务器
#
# =============================================================================
import os
import asyncio
import json
import subprocess
from pathlib import Path
from typing import Any, Callable
from anthropic import Anthropic
from dotenv import load_dotenv
# =============================================================================
# 环境配置 / Environment Configuration
# =============================================================================
load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
# 主代理系统提示词
# Parent system prompt
SYSTEM = f"You are a coding agent at {WORKDIR}. You have access to MCP tools."
# =============================================================================
# 基础工具实现 / Base Tool Implementations
# =============================================================================
def safe_path(p: str) -> Path:
"""路径安全检查 / Path security check"""
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_bash(command: str) -> str:
"""执行bash命令 / Execute bash command"""
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(
command,
shell=True,
cwd=WORKDIR,
capture_output=True,
text=True,
timeout=120
)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
def run_read(path: str, limit: int = None) -> str:
"""读取文件 / Read file"""
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"
def run_write(path: str, content: str) -> str:
"""写入文件 / Write file"""
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"
def run_edit(path: str, old_text: str, new_text: str) -> str:
"""编辑文件 / Edit file"""
try:
fp = safe_path(path)
content = fp.read_text()
if old_text not in content:
return f"Error: Text not found in {path}"
fp.write_text(content.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"
# =============================================================================
# MCP 连接器 / MCP Connector
# =============================================================================
class MCPConnector:
"""
MCP 连接器,支持本地和远程模式
MCP Connector, supports local and remote modes
本地模式: server = ["python", "/path/to/server.py"]
远程模式: server = "http://localhost:8000/mcp"
"""
def __init__(self, server):
"""
Args:
server: 本地模式为命令列表,远程模式为 URL
"""
self.server = server
self.process = None
self._connected = False
def connect(self) -> bool:
"""建立连接 / Establish connection"""
if isinstance(self.server, str):
# 远程模式:HTTP 连接不需要持久连接,直接标记已连接
# Remote mode: HTTP doesn't need persistent connection
self._connected = True
return True
else:
# 本地模式:启动子进程
# Local mode: start subprocess
try:
self.process = subprocess.Popen(
self.server,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self._connected = True
return True
except Exception as e:
return False
def disconnect(self):
"""断开连接 / Disconnect"""
if self.process:
self.process.terminate()
self.process = None
self._connected = False
def _send_request(self, method: str, params: dict) -> dict:
"""发送 JSON-RPC 2.0 请求 / Send JSON-RPC 2.0 request"""
if not self._connected:
self.connect()
request = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params
}
if isinstance(self.server, str):
# 远程模式:使用 httpx
# Remote mode: use httpx
import httpx
try:
response = httpx.post(
self.server,
json=request,
timeout=30.0
)
return response.json()
except Exception as e:
return {"error": str(e)}
else:
# 本地模式:通过 stdin/stdout
# Local mode: via stdin/stdout
try:
self.process.stdin.write(json.dumps(request).encode() + b"\n")
self.process.stdin.flush()
# 读取响应
response_line = self.process.stdout.readline()
return json.loads(response_line)
except Exception as e:
return {"error": str(e)}
def list_tools(self) -> list:
"""获取工具列表 / Get tool list"""
result = self._send_request("tools/list", {})
if "error" in result:
return []
return result.get("result", {}).get("tools", [])
def call_tool(self, name: str, arguments: dict) -> str:
"""调用工具 / Call tool"""
result = self._send_request("tools/call", {
"name": name,
"arguments": arguments
})
if "error" in result:
return f"Error: {result['error']}"
# 解析结果
tool_result = result.get("result", {})
content = tool_result.get("content", [])
if isinstance(content, list):
return "\n".join(
c.get("text", str(c)) for c in content if isinstance(c, dict)
)
return str(tool_result)
# =============================================================================
# MCP 工具创建 / MCP Tool Creation
# =============================================================================
def create_mcp_tools(server) -> list:
"""
从 MCP 服务器获取工具列表并转换为 agent 工具格式
Get tool list from MCP server and convert to agent tool format
Args:
server: 本地模式为命令列表,远程模式为 URL
Returns:
工具定义列表 / Tool definition list
"""
connector = MCPConnector(server)
try:
connector.connect()
mcp_tools_raw = connector.list_tools()
mcp_tools = []
for tool in mcp_tools_raw:
tool_name = tool.get("name", "")
mcp_tools.append({
"name": f"mcp_{tool_name}",
"description": f"{tool.get('description', '')} [MCP]",
"input_schema": tool.get("inputSchema", {"type": "object", "properties": {}})
})
return mcp_tools
finally:
connector.disconnect()
def run_mcp_tool(server, name: str, arguments: dict) -> str:
"""
调用 MCP 工具 / Call MCP tool
Args:
server: MCP 服务器配置
name: 工具名称(不含 mcp_ 前缀)
arguments: 工具参数
Returns:
工具执行结果 / Tool execution result
"""
connector = MCPConnector(server)
try:
connector.connect()
return connector.call_tool(name, arguments)
finally:
connector.disconnect()
# =============================================================================
# 工具分发映射 / Tool Dispatch Map
# =============================================================================
# 基础工具处理器
# Base tool handlers
BASE_TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
}
# =============================================================================
# Agent 主循环 / Agent Main Loop
# =============================================================================
def agent_loop(messages: list, mcp_servers: list = None):
"""
Agent 主循环,支持 MCP 工具
Agent main loop with MCP tool support
Args:
messages: 对话历史
mcp_servers: MCP 服务器配置列表
"""
# 创建 MCP 工具
mcp_tools = []
mcp_handlers = {}
if mcp_servers:
for server in mcp_servers:
try:
tools = create_mcp_tools(server)
for tool in tools:
tool_name = tool["name"]
mcp_tools.append(tool)
# 提取原始 MCP 工具名(去掉 mcp_ 前缀)
mcp_name = tool_name[4:] if tool_name.startswith("mcp_") else tool_name
mcp_handlers[tool_name] = lambda n=mcp_name, s=server, **kw: run_mcp_tool(s, n, kw)
except Exception as e:
print(f"Failed to connect to MCP server: {e}")
# 合并所有工具
all_tools = [
{
"name": "bash",
"description": "Run a shell command.",
"input_schema": {
"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"]
}
},
{
"name": "read_file",
"description": "Read file contents.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"limit": {"type": "integer"}
},
"required": ["path"]
}
},
{
"name": "write_file",
"description": "Write content to file.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"}
},
"required": ["path", "content"]
}
},
{
"name": "edit_file",
"description": "Replace exact text in file.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"old_text": {"type": "string"},
"new_text": {"type": "string"}
},
"required": ["path", "old_text", "new_text"]
}
},
] + mcp_tools
# 合并所有处理器
all_handlers = dict(BASE_TOOL_HANDLERS)
all_handlers.update(mcp_handlers)
# 主循环
while True:
response = client.messages.create(
model=MODEL,
system=SYSTEM,
messages=messages,
tools=all_tools,
max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = all_handlers.get(block.name)
if handler:
try:
output = handler(**block.input)
except Exception as e:
output = f"Error: {e}"
else:
output = f"Unknown tool: {block.name}"
print(f" {str(output)[:200]}")
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output)
})
messages.append({"role": "user", "content": results})
# =============================================================================
# MCP 配置加载 / MCP Configuration Loading
# =============================================================================
def load_mcp_servers(config_path: str = None) -> list:
"""
从配置文件加载 MCP 服务器配置
Load MCP server configurations from config file
支持从以下位置加载:
- ~/.claude/mcp.json
- 指定路径的配置文件
Args:
config_path: 配置文件路径,默认为 ~/.claude/mcp.json
Returns:
MCP 服务器配置列表
"""
if config_path is None:
config_path = Path.home() / ".claude" / "mcp.json"
else:
config_path = Path(config_path)
if not config_path.exists():
return []
try:
with open(config_path) as f:
config = json.load(f)
servers = []
for name, server_config in config.get("mcpServers", {}).items():
if not server_config.get("enabled", True):
continue
if "url" in server_config:
# 远程模式
servers.append(server_config["url"])
elif "command" in server_config:
# 本地模式
servers.append([server_config["command"]] + server_config.get("args", []))
return servers
except Exception as e:
print(f"Failed to load MCP config: {e}")
return []
# =============================================================================
# 主程序入口 / Main Entry Point
# =============================================================================
if __name__ == "__main__":
# 加载 MCP 服务器配置
mcp_servers = load_mcp_servers()
if mcp_servers:
print(f"Loaded {len(mcp_servers)} MCP server(s)")
for i, server in enumerate(mcp_servers):
print(f" {i+1}. {server}")
print()
history = []
while True:
try:
query = input("\033[36ms13 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history, mcp_servers)
print()
added a file for mcp connect ,
增加一个 s13_mcp_connector ,MCP接入的代码,我这已经测试通过了;
attach fastmcp to requirement.txt 加依赖包
create a config file 建一个配置文件 .claude/mcp.json
{ "mcpServers": { "excel-process": { "type": "remote", "url": "http://10.193.40.137:8017/mcp", "enabled": true }, "word-document-sepd": { "type": "remote", "url": "http://10.193.40.137:7010/sse", "enabled": true } } }创建文件 s13_mcp_connector.py 代码如下 , create file named s13_mcp_connector.py ,as follow: