Skip to content

增加一个 s13_mcp_connector ,MCP接入的代码 (FastMCP);added a file for mcp connect ,using FastMCP ; #253

@sky92archangel

Description

@sky92archangel

added a file for mcp connect ,
增加一个 s13_mcp_connector ,MCP接入的代码,我这已经测试通过了;

attach fastmcp to requirement.txt 加依赖包

fastmcp>=0.1.0
httpx>=0.25.0

create a config file 建一个配置文件 .claude/mcp.json

{
    "mcpServers": {
        "excel-process": {
            "type": "remote",
            "url": "http://10.193.40.137:8017/mcp",
            "enabled": true
        },
        "word-document-sepd": {
            "type": "remote",
            "url": "http://10.193.40.137:7010/sse",
            "enabled": true
        }
    }
}

创建文件 s13_mcp_connector.py 代码如下 , create file named s13_mcp_connector.py ,as follow:

#!/usr/bin/env python3
# =============================================================================
# s13_mcp_connector.py - MCP Connector / MCP 连接器
# =============================================================================
#
# 本文件展示如何使用 FastMCP 连接 MCP 服务器,让 agent 调用 MCP 工具
# This file shows how to use FastMCP to connect to MCP servers and call MCP tools
#
# 核心洞察 / Key Insight:
#   "FastMCP 简化了 MCP 客户端的实现,支持本地和远程两种模式"
#   "FastMCP simplifies MCP client implementation, supporting both local and remote modes"
#
# 流程图 / Flow Diagram:
#   Agent                      FastMCPClient              MCP Server
#   +------------------+       +------------------+       +------------------+
#   | messages=[...]   |       |                  |       |                  |
#   |                  |       |  connect()       |       |                  |
#   | tool: mcp_hello  | ----->|  call_tool()     | ----->|  execute tool    |
#   |                  |       |                  |       |                  |
#   |   result = "..." |<------|  return result   |<------|  return result   |
#   +------------------+       +------------------+       +------------------+
#
# MCP 连接模式 / MCP Connection Modes:
#   - 本地模式 (stdio): 通过子进程启动 MCP 服务器
#   - 远程模式 (HTTP): 通过 HTTP URL 连接远程 MCP 服务器
#
# =============================================================================

import os
import asyncio
import json
import subprocess
from pathlib import Path
from typing import Any, Callable

from anthropic import Anthropic
from dotenv import load_dotenv

# =============================================================================
# 环境配置 / Environment Configuration
# =============================================================================

load_dotenv(override=True)

if os.getenv("ANTHROPIC_BASE_URL"):
    os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]

# 主代理系统提示词
# Parent system prompt
SYSTEM = f"You are a coding agent at {WORKDIR}. You have access to MCP tools."


# =============================================================================
# 基础工具实现 / Base Tool Implementations
# =============================================================================

def safe_path(p: str) -> Path:
    """路径安全检查 / Path security check"""
    path = (WORKDIR / p).resolve()
    if not path.is_relative_to(WORKDIR):
        raise ValueError(f"Path escapes workspace: {p}")
    return path


def run_bash(command: str) -> str:
    """执行bash命令 / Execute bash command"""
    dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
    if any(d in command for d in dangerous):
        return "Error: Dangerous command blocked"
    try:
        r = subprocess.run(
            command, 
            shell=True, 
            cwd=WORKDIR,
            capture_output=True, 
            text=True, 
            timeout=120
        )
        out = (r.stdout + r.stderr).strip()
        return out[:50000] if out else "(no output)"
    except subprocess.TimeoutExpired:
        return "Error: Timeout (120s)"


def run_read(path: str, limit: int = None) -> str:
    """读取文件 / Read file"""
    try:
        lines = safe_path(path).read_text().splitlines()
        if limit and limit < len(lines):
            lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
        return "\n".join(lines)[:50000]
    except Exception as e:
        return f"Error: {e}"


def run_write(path: str, content: str) -> str:
    """写入文件 / Write file"""
    try:
        fp = safe_path(path)
        fp.parent.mkdir(parents=True, exist_ok=True)
        fp.write_text(content)
        return f"Wrote {len(content)} bytes"
    except Exception as e:
        return f"Error: {e}"


def run_edit(path: str, old_text: str, new_text: str) -> str:
    """编辑文件 / Edit file"""
    try:
        fp = safe_path(path)
        content = fp.read_text()
        if old_text not in content:
            return f"Error: Text not found in {path}"
        fp.write_text(content.replace(old_text, new_text, 1))
        return f"Edited {path}"
    except Exception as e:
        return f"Error: {e}"


# =============================================================================
# MCP 连接器 / MCP Connector
# =============================================================================

class MCPConnector:
    """
    MCP 连接器,支持本地和远程模式
    MCP Connector, supports local and remote modes
    
    本地模式: server = ["python", "/path/to/server.py"]
    远程模式: server = "http://localhost:8000/mcp"
    """
    
    def __init__(self, server):
        """
        Args:
            server: 本地模式为命令列表,远程模式为 URL
        """
        self.server = server
        self.process = None
        self._connected = False
        
    def connect(self) -> bool:
        """建立连接 / Establish connection"""
        if isinstance(self.server, str):
            # 远程模式:HTTP 连接不需要持久连接,直接标记已连接
            # Remote mode: HTTP doesn't need persistent connection
            self._connected = True
            return True
        else:
            # 本地模式:启动子进程
            # Local mode: start subprocess
            try:
                self.process = subprocess.Popen(
                    self.server,
                    stdin=subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE
                )
                self._connected = True
                return True
            except Exception as e:
                return False
    
    def disconnect(self):
        """断开连接 / Disconnect"""
        if self.process:
            self.process.terminate()
            self.process = None
        self._connected = False
    
    def _send_request(self, method: str, params: dict) -> dict:
        """发送 JSON-RPC 2.0 请求 / Send JSON-RPC 2.0 request"""
        if not self._connected:
            self.connect()
            
        request = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": method,
            "params": params
        }
        
        if isinstance(self.server, str):
            # 远程模式:使用 httpx
            # Remote mode: use httpx
            import httpx
            try:
                response = httpx.post(
                    self.server,
                    json=request,
                    timeout=30.0
                )
                return response.json()
            except Exception as e:
                return {"error": str(e)}
        else:
            # 本地模式:通过 stdin/stdout
            # Local mode: via stdin/stdout
            try:
                self.process.stdin.write(json.dumps(request).encode() + b"\n")
                self.process.stdin.flush()
                
                # 读取响应
                response_line = self.process.stdout.readline()
                return json.loads(response_line)
            except Exception as e:
                return {"error": str(e)}
    
    def list_tools(self) -> list:
        """获取工具列表 / Get tool list"""
        result = self._send_request("tools/list", {})
        if "error" in result:
            return []
        return result.get("result", {}).get("tools", [])
    
    def call_tool(self, name: str, arguments: dict) -> str:
        """调用工具 / Call tool"""
        result = self._send_request("tools/call", {
            "name": name,
            "arguments": arguments
        })
        if "error" in result:
            return f"Error: {result['error']}"
        
        # 解析结果
        tool_result = result.get("result", {})
        content = tool_result.get("content", [])
        
        if isinstance(content, list):
            return "\n".join(
                c.get("text", str(c)) for c in content if isinstance(c, dict)
            )
        return str(tool_result)


# =============================================================================
# MCP 工具创建 / MCP Tool Creation
# =============================================================================

def create_mcp_tools(server) -> list:
    """
    从 MCP 服务器获取工具列表并转换为 agent 工具格式
    Get tool list from MCP server and convert to agent tool format
    
    Args:
        server: 本地模式为命令列表,远程模式为 URL
        
    Returns:
        工具定义列表 / Tool definition list
    """
    connector = MCPConnector(server)
    try:
        connector.connect()
        mcp_tools_raw = connector.list_tools()
        
        mcp_tools = []
        for tool in mcp_tools_raw:
            tool_name = tool.get("name", "")
            mcp_tools.append({
                "name": f"mcp_{tool_name}",
                "description": f"{tool.get('description', '')} [MCP]",
                "input_schema": tool.get("inputSchema", {"type": "object", "properties": {}})
            })
        return mcp_tools
    finally:
        connector.disconnect()


def run_mcp_tool(server, name: str, arguments: dict) -> str:
    """
    调用 MCP 工具 / Call MCP tool
    
    Args:
        server: MCP 服务器配置
        name: 工具名称(不含 mcp_ 前缀)
        arguments: 工具参数
        
    Returns:
        工具执行结果 / Tool execution result
    """
    connector = MCPConnector(server)
    try:
        connector.connect()
        return connector.call_tool(name, arguments)
    finally:
        connector.disconnect()


# =============================================================================
# 工具分发映射 / Tool Dispatch Map
# =============================================================================

# 基础工具处理器
# Base tool handlers
BASE_TOOL_HANDLERS = {
    "bash":       lambda **kw: run_bash(kw["command"]),
    "read_file":  lambda **kw: run_read(kw["path"], kw.get("limit")),
    "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
    "edit_file":  lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
}


# =============================================================================
# Agent 主循环 / Agent Main Loop
# =============================================================================

def agent_loop(messages: list, mcp_servers: list = None):
    """
    Agent 主循环,支持 MCP 工具
    Agent main loop with MCP tool support
    
    Args:
        messages: 对话历史
        mcp_servers: MCP 服务器配置列表
    """
    # 创建 MCP 工具
    mcp_tools = []
    mcp_handlers = {}
    
    if mcp_servers:
        for server in mcp_servers:
            try:
                tools = create_mcp_tools(server)
                for tool in tools:
                    tool_name = tool["name"]
                    mcp_tools.append(tool)
                    # 提取原始 MCP 工具名(去掉 mcp_ 前缀)
                    mcp_name = tool_name[4:] if tool_name.startswith("mcp_") else tool_name
                    mcp_handlers[tool_name] = lambda n=mcp_name, s=server, **kw: run_mcp_tool(s, n, kw)
            except Exception as e:
                print(f"Failed to connect to MCP server: {e}")
    
    # 合并所有工具
    all_tools = [
        {
            "name": "bash", 
            "description": "Run a shell command.",
            "input_schema": {
                "type": "object", 
                "properties": {"command": {"type": "string"}}, 
                "required": ["command"]
            }
        },
        {
            "name": "read_file", 
            "description": "Read file contents.",
            "input_schema": {
                "type": "object", 
                "properties": {
                    "path": {"type": "string"}, 
                    "limit": {"type": "integer"}
                }, 
                "required": ["path"]
            }
        },
        {
            "name": "write_file", 
            "description": "Write content to file.",
            "input_schema": {
                "type": "object", 
                "properties": {
                    "path": {"type": "string"}, 
                    "content": {"type": "string"}
                }, 
                "required": ["path", "content"]
            }
        },
        {
            "name": "edit_file", 
            "description": "Replace exact text in file.",
            "input_schema": {
                "type": "object", 
                "properties": {
                    "path": {"type": "string"}, 
                    "old_text": {"type": "string"}, 
                    "new_text": {"type": "string"}
                }, 
                "required": ["path", "old_text", "new_text"]
            }
        },
    ] + mcp_tools
    
    # 合并所有处理器
    all_handlers = dict(BASE_TOOL_HANDLERS)
    all_handlers.update(mcp_handlers)
    
    # 主循环
    while True:
        response = client.messages.create(
            model=MODEL, 
            system=SYSTEM,
            messages=messages,
            tools=all_tools, 
            max_tokens=8000,
        )
        messages.append({"role": "assistant", "content": response.content})
        
        if response.stop_reason != "tool_use":
            return
        
        results = []
        for block in response.content:
            if block.type == "tool_use":
                handler = all_handlers.get(block.name)
                if handler:
                    try:
                        output = handler(**block.input)
                    except Exception as e:
                        output = f"Error: {e}"
                else:
                    output = f"Unknown tool: {block.name}"
                
                print(f"  {str(output)[:200]}")
                results.append({
                    "type": "tool_result", 
                    "tool_use_id": block.id, 
                    "content": str(output)
                })
        messages.append({"role": "user", "content": results})


# =============================================================================
# MCP 配置加载 / MCP Configuration Loading
# =============================================================================

def load_mcp_servers(config_path: str = None) -> list:
    """
    从配置文件加载 MCP 服务器配置
    Load MCP server configurations from config file
    
    支持从以下位置加载:
    - ~/.claude/mcp.json
    - 指定路径的配置文件
    
    Args:
        config_path: 配置文件路径,默认为 ~/.claude/mcp.json
        
    Returns:
        MCP 服务器配置列表
    """
    if config_path is None:
        config_path = Path.home() / ".claude" / "mcp.json"
    else:
        config_path = Path(config_path)
    
    if not config_path.exists():
        return []
    
    try:
        with open(config_path) as f:
            config = json.load(f)
        
        servers = []
        for name, server_config in config.get("mcpServers", {}).items():
            if not server_config.get("enabled", True):
                continue
                
            if "url" in server_config:
                # 远程模式
                servers.append(server_config["url"])
            elif "command" in server_config:
                # 本地模式
                servers.append([server_config["command"]] + server_config.get("args", []))
        
        return servers
    except Exception as e:
        print(f"Failed to load MCP config: {e}")
        return []


# =============================================================================
# 主程序入口 / Main Entry Point
# =============================================================================

if __name__ == "__main__":
    # 加载 MCP 服务器配置
    mcp_servers = load_mcp_servers()
    
    if mcp_servers:
        print(f"Loaded {len(mcp_servers)} MCP server(s)")
        for i, server in enumerate(mcp_servers):
            print(f"  {i+1}. {server}")
        print()
    
    history = []
    
    while True:
        try:
            query = input("\033[36ms13 >> \033[0m")
        except (EOFError, KeyboardInterrupt):
            break
        
        if query.strip().lower() in ("q", "exit", ""):
            break
        
        history.append({"role": "user", "content": query})
        agent_loop(history, mcp_servers)
        print()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions