您现在的位置是:首页 > 文章详情

万字长文解读MCP框架,让你掌握mark3labs/mcp-go

日期:2025-03-27点击:378

一、引言

《万字长文,带你读懂 Anthropic MCP》中我们介绍了MCP的基本框架和组件,并初步说了在golang中的框架metoro-io/mcp-golang和mark3labs/mcp-go。本文将通过实践和源码的方式先解读mark3labs/mcp-go。

二、MCP-Server的简述

MCP Server一般为轻量的服务端程序,通过一种标准的协议(MCP)暴露出特定资源的一些特定的能力。

2.1 连接生命周期

2.1.1 初始化连接

  1. 客户端发送带有协议版本和功能initialize请求。
  2. 服务器以其协议版本和功能进行响应
  3. 客户端发送initialized通知作为确认
  4. 开始正常信息交换

2.1.2 信息交换

初始化后,支持以下模式:
  1. 请求-响应:客户端或服务器发送请求,对方响应
  2. 通知:任何一方发送单向消息

2.1.3 终止

enum ErrorCode {
  // Standard JSON-RPC error codes
  ParseError = -32700,
  InvalidRequest = -32600,
  MethodNotFound = -32601,
  InvalidParams = -32602,
  InternalError = -32603
}
任何一方都可以终止连接:
  1. 通过close()干净关闭
  2. 传输断开
  3. 错误情况
    1. 请求的错误响应
    2. 传输中的错误事件
    3. 协议级错误处理程序
常见错误码:
 

2.2 MCP Server的业务能力

Request Method 发起方 响应方 描述
initialized Client Server 初始化会话
tools-list Client Server 发现可用的工具
tools/call Client Server 调用工具
resources/list Client Server 发现可用的资源
resources/read Client Server 获取资源内容
resources/templates Client Server 发现可用的参数化资源
resources/subscribe Client Server 订阅特定资源,监听其变化事件
prompts/list Client Server 发现可用的提示词
prompts/get Client Server 获取特定提示词
roots/list Server Client 列出服务器有权访问的客户端文件系统根节点(暴露目录和文件)
sampling/create Server Client 启用服务器的AI生成能力( sampling creation )

三、mark3labs/mcp-go示例

我们就以框架中的官方示例代码为引子一步步解读流程和框架代码。
package main

import (
    "context"
    "errors"
    "fmt"

    "github.com/mark3labs/mcp-go/mcp"
    "github.com/mark3labs/mcp-go/server"
)

func main() {
    // 创建MCP服务器
    s := server.NewMCPServer(
       "Demo 🚀", // 服务器名称
       "1.0.0",  // 服务器版本
    )

    // 添加工具
    tool := mcp.NewTool("hello_world", // 工具名称
       mcp.WithDescription("Say hello to someone"), // 工具描述
       mcp.WithString("name", // 参数名称
          mcp.Required(), // 参数是必需的
          mcp.Description("Name of the person to greet"), // 参数描述
       ),
    )

    // 为工具添加处理器
    s.AddTool(tool, helloHandler)

    // 启动标准输入输出服务器
    if err := server.ServeStdio(s); err != nil {
       fmt.Printf("Server error: %v\n", err) // 打印服务器错误
    }
}

func helloHandler(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    // 从请求参数中获取名字参数,并断言为字符串类型
    name, ok := request.Params.Arguments["name"].(string)
    if !ok {
       // 如果断言失败,返回错误
       return nil, errors.New("name must be a string")
    }

    // 返回包含问候语的结果
    return mcp.NewToolResultText(fmt.Sprintf("Hello, %s!", name)), nil
}

 

3.1 搭建一个联调环境

就以上面的代码为例,将上面的代码编译为二进制命令:
$ go build -v -o server

再启动mcp inspetor:

$ npx -y @modelcontextprotocol/inspector ./server

这样一个简单的MCP Client和MCP Server就搭建好了,后续也为我们开发测试构建好了环境。

四、源码解读

在上面的代码中main函数中的第一个代码就是server.NewMCPServer,那我们就从MCPServer这个结构体入手。

4.1 MCPServer结构体

代码地址:https://github.com/mark3labs/mcp-go/blob/e183dd17cfec07072a188f6169033bf61f7bf37d/server/server.go#L135
type MCPServer struct {
    mu                   sync.RWMutex                       // 用于保护共享资源,确保并发访问时的数据一致性
    name                 string                             // 服务器的名称,用于标识服务器
    version              string                             // 服务器的版本,用于跟踪和管理服务器的不同版本
    instructions         string                             // 服务器的指令,通常在初始化响应中返回给客户端,提供使用指南或帮助信息
    resources            map[string]resourceEntry           // 存储服务器支持的资源及其处理函数
    resourceTemplates    map[string]resourceTemplateEntry   // 存储资源模板及其处理函数,支持URI模板匹配多类似资源
    prompts              map[string]mcp.Prompt              // 存储服务器支持的提示,用于与用户交互
    promptHandlers       map[string]PromptHandlerFunc       // 存储处理提示请求的函数,每个提示对应一个处理函数
    tools                map[string]ServerTool              // 存储服务器支持的工具及其处理函数
    notificationHandlers map[string]NotificationHandlerFunc // 存储处理传入通知的函数,接收客户端通知并处理
    capabilities         serverCapabilities                 // 定义服务器支持的功能特性,包括资源、提示、工具和日志记录等
    sessions             sync.Map                           // 存储当前活跃的客户端会话,用于跟踪用户交互
    initialized          atomic.Bool                        // 使用原子操作标记服务器是否已初始化,确保线程安全
    hooks                *Hooks                             // 存储服务器钩子,允许在请求处理前后或返回错误前执行自定义逻辑
}

MCPServer 是 Model Control Protocol (MCP) 服务器的实现,用于处理包括资源、提示和工具在内的各种类型的请求。

     

4.2 MCPServer初始化

看看如何创建一个MCPServer对象。
func NewMCPServer(
    name, version string,
    opts ...ServerOption,
) *MCPServer {
    s := &MCPServer{
       resources:            make(map[string]resourceEntry),
       resourceTemplates:    make(map[string]resourceTemplateEntry),
       prompts:              make(map[string]mcp.Prompt),
       promptHandlers:       make(map[string]PromptHandlerFunc),
       tools:                make(map[string]ServerTool),
       name:                 name,
       version:              version,
       notificationHandlers: make(map[string]NotificationHandlerFunc),
       capabilities: serverCapabilities{
          tools:     nil,
          resources: nil,
          prompts:   nil,
          logging:   false,
       },
    }

    for _, opt := range opts {
       opt(s)
    }

    return s
}

在NewMCPServer()方法中,我们需要关注的opts ...ServerOption,进一步看看ServerOption有哪些选项:

选项 功能 使用方式
WithResourceCapabilities 配置资源相关的服务器功能,如订阅和资源列表变化通知 WithResourceCapabilities(subscribe, listChanged bool)
WithHooks 添加钩子函数,用于在请求处理前后执行特定逻辑 WithHooks(hooks *Hooks)
WithPromptCapabilities 配置提示相关的服务器功能,如提示列表变化通知 WithPromptCapabilities(listChanged bool)
WithToolCapabilities 配置工具相关的服务器功能,如工具列表变化通知 WithToolCapabilities(listChanged bool)
WithLogging 启用服务器日志记录功能 WithLogging()
WithInstructions 设置服务器指令,用于在初始化响应中返回给客户端 WithInstructions(instructions string)
它接受一个 Hooks 类型的指针作为参数。允许在创建 MCPServer 实例时,为服务器添加自定义的钩子函数,这些钩子函数可以在请求处理前后或返回错误给客户端之前执行。
hooks机制对开发和流程是非常有效的。框架中给的hooks能力有:
字段名 类型 描述
OnBeforeAny []BeforeAnyHookFunc 在请求被解析后但方法调用前执行的钩子函数。
OnSuccess []OnSuccessHookFunc 在请求成功生成结果但结果尚未发送给客户端之前执行的钩子函数。
OnError []OnErrorHookFunc 在请求解析或方法执行过程中发生错误时执行的钩子函数。
OnBeforeInitialize []OnBeforeInitializeFunc 在处理初始化请求前执行的钩子函数。
OnAfterInitialize []OnAfterInitializeFunc 在处理初始化请求后执行的钩子函数。
OnBeforePing []OnBeforePingFunc 在处理 Ping 请求前执行的钩子函数。
OnAfterPing []OnAfterPingFunc 在处理 Ping 请求后执行的钩子函数。
OnBeforeListResources []OnBeforeListResourcesFunc 在处理列出资源请求前执行的钩子函数。
OnAfterListResources []OnAfterListResourcesFunc 在处理列出资源请求后执行的钩子函数。
OnBeforeListResourceTemplates []OnBeforeListResourceTemplatesFunc 在处理列出资源模板请求前执行的钩子函数。
OnAfterListResourceTemplates []OnAfterListResourceTemplatesFunc 在处理列出资源模板请求后执行的钩子函数。
OnBeforeReadResource []OnBeforeReadResourceFunc 在处理读取资源请求前执行的钩子函数。
OnAfterReadResource []OnAfterReadResourceFunc 在处理读取资源请求后执行的钩子函数。
OnBeforeListPrompts []OnBeforeListPromptsFunc 在处理列出提示请求前执行的钩子函数。
OnAfterListPrompts []OnAfterListPromptsFunc 在处理列出提示请求后执行的钩子函数。
OnBeforeGetPrompt []OnBeforeGetPromptFunc 在处理获取提示请求前执行的钩子函数。
OnAfterGetPrompt []OnAfterGetPromptFunc 在处理获取提示请求后执行的钩子函数。
OnBeforeListTools []OnBeforeListToolsFunc 在处理列出工具请求前执行的钩子函数。
OnAfterListTools []OnAfterListToolsFunc 在处理列出工具请求后执行的钩子函数。
OnBeforeCallTool []OnBeforeCallToolFunc 在处理调用工具请求前执行的钩子函数。
OnAfterCallTool []OnAfterCallToolFunc 在处理调用工具请求后执行的钩子函数。
现在模拟创建一个完整的MCPServer:
hooks := &server.Hooks{}

hooks.AddBeforeAny(func(id any, method mcp.MCPMethod, message any) {
    fmt.Printf("beforeAny: %s, %v, %v\n", method, id, message)
})
hooks.AddOnSuccess(func(id any, method mcp.MCPMethod, message any, result any) {
    fmt.Printf("onSuccess: %s, %v, %v, %v\n", method, id, message, result)
})
hooks.AddOnError(func(id any, method mcp.MCPMethod, message any, err error) {
    fmt.Printf("onError: %s, %v, %v, %v\n", method, id, message, err)
})
hooks.AddBeforeInitialize(func(id any, message *mcp.InitializeRequest) {
    fmt.Printf("beforeInitialize: %v, %v\n", id, message)
})
hooks.AddAfterInitialize(func(id any, message *mcp.InitializeRequest, result *mcp.InitializeResult) {
    fmt.Printf("afterInitialize: %v, %v, %v\n", id, message, result)
})
hooks.AddAfterCallTool(func(id any, message *mcp.CallToolRequest, result *mcp.CallToolResult) {
    fmt.Printf("afterCallTool: %v, %v, %v\n", id, message, result)
})
hooks.AddBeforeCallTool(func(id any, message *mcp.CallToolRequest) {
    fmt.Printf("beforeCallTool: %v, %v\n", id, message)
})
// 创建MCP服务器
s := server.NewMCPServer(
    "Demo 🚀", // 服务器名称
    "1.0.0",  // 服务器版本
    server.WithLogging(),
    server.WithToolCapabilities(true),
    server.WithResourceCapabilities(true, true),
    server.WithPromptCapabilities(true),
    server.WithInstructions("initialized"),
    server.WithHooks(hooks),
)

 

4.3 Tools模块

4.3.1 创建Tool

mark3labs/mcp-go框架中创建Tool有两个方式:
  1. mcp.NewTool(name string, opts ...ToolOption) Tool
  2. mcp.NewToolWithRawSchema(name, description string, schema json.RawMessage) Tool
下面通过一段代码进行对比:
方式一:mcp.NewTool()
tool := mcp.NewTool("hello_world", // 工具名称
    mcp.WithDescription("Say hello to someone"), // 工具描述
    mcp.WithString("name", // 参数名称
       mcp.Required(), // 参数是必需的
       mcp.Description("Name of the person to greet"), // 参数描述
    ),
)

方式二:mcp.NewToolWithRawSchema()
rawSchema := json.RawMessage(`{
    "type": "object",
    "properties": {
       "name": {"type": "string", "description": "Name of the person to greet"}
    },
    "required": ["name"]
}`)

// Create a tool with raw schema
toolRS := mcp.NewToolWithRawSchema("hello_world_1", "Say hello to someone", rawSchema)

其中rawSchema的结构需要符合ToolInputSchema结构体:

type ToolInputSchema struct {
    Type       string                 `json:"type"`
    Properties map[string]interface{} `json:"properties,omitempty"`
    Required   []string               `json:"required,omitempty"`
}

需要注意的是Properties来源于jsonSchema,所以具备的需要校验属性,比如default、maximum、minimum、maxLength、minLength、enum等等。其中key为请求传入的参数字段,interface{}为对key的各种属性校验。

更建议使用方式一:mcp.NewTool()更加符合编码方式,也更好控制器生成的 jsonSchema

4.3.2 存放Tool

创建好Tool值之后,调用server的方法AddTool()将Tool添加进入,相当于web框架中的添加路由与相关handle的关系。相关代码如下:
// AddTool registers a new tool and its handler
func (s *MCPServer) AddTool(tool mcp.Tool, handler ToolHandlerFunc) {
    s.AddTools(ServerTool{Tool: tool, Handler: handler})
}

// AddTools registers multiple tools at once
func (s *MCPServer) AddTools(tools ...ServerTool) {
    // 检查工具
    if s.capabilities.tools == nil {
       s.capabilities.tools = &toolCapabilities{}
    }

    // 加锁
    s.mu.Lock()
    // 遍历工具
    for _, entry := range tools {
       s.tools[entry.Tool.Name] = entry
    }
    // 获取初始化状态
    initialized := s.initialized.Load()
    s.mu.Unlock()

    // 发送通知
    if initialized {
       s.sendNotificationToAllClients("notifications/tools/list_changed", nil)
    }
}

可以看出Tool是放入到MCPServer的tools字段中,使用name作为key,ServerTool作为value,其中ServerTool结构如下:

type ServerTool struct {
    Tool    mcp.Tool
    Handler ToolHandlerFunc
}

框架还提供了:

  • DeleteTools(names... string)作为删除Tool关联的方法。
  • SetTools(tools ...ServerTool)可以设置当前所有的Tool 列表。

4.4 Resource模块

老规矩先来一个demo,再看器源码实现:
// Static resource example - exposing a README file
resource := mcp.NewResource(
    "docs://readme",
    "Project README",
    mcp.WithResourceDescription("The project's README file"),
    mcp.WithMIMEType("text/markdown"),
)

// Add resource with its handler
s.AddResource(resource, func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
    content, err := os.ReadFile("main.go")
    if err != nil {
       return nil, err
    }

    return []mcp.ResourceContents{
       mcp.TextResourceContents{
          URI:      "docs://readme",
          MIMEType: "text/markdown",
          Text:     string(content),
       },
    }, nil
})

 

在MCP中Resource是指允许Server公开可供客户端读取并用作交互上下文的数据和内容。有很多类型的Resource:
  • File contents 文件内容
  • Database records 数据库记录
  • API responses API响应
  • Live system data 实时系统数据
  • images 图像
  • Log files 日志文件
  • 等等... ...
框架中Resource的结构体如下:

type Resource struct {
    Annotated // 包含可选注解,用于告知客户端如何使用或显示对象

    // 资源的URI
    URI string `json:"uri"` // 资源的唯一标识符,用于定位和访问资源

    // 资源的可读名称
    //
    // 客户端可以使用此名称来填充UI元素
    Name string `json:"name"` // 资源的显示名称,便于用户理解和界面展示

    // 对此资源所代表内容的描述
    //
    // 客户端可以使用此描述来帮助大型语言模型(LLM)理解可用资源
    // 这可以看作是对模型的“提示”
    Description string `json:"description,omitempty"` // 资源的详细描述,为模型提供上下文信息

    // 如果已知,此资源的MIME类型
    MIMEType string `json:"mimeType,omitempty"` // 资源的媒体类型,如text/plain、image/jpeg等,用于指示资源的内容格式
}

type Annotated struct {
    Annotations *struct {
       // 描述此对象或数据的预期客户是谁
       //
       // 它可以包含多个条目,以指示对多个受众有用的内容(例如,`["user", "assistant"]`)
       Audience []Role `json:"audience,omitempty"` // 受众群体,指示数据对哪些角色或用户群体有用

       // 描述此数据对服务器操作的重要性
       //
       // 值为1表示“最重要”,并指示数据实际上是必需的,而0表示“最不重要”,并指示数据完全是可选的
       Priority float64 `json:"priority,omitempty"` // 优先级,表示数据对服务器操作的重要程度,范围从0(最不重要)到1(最重要)
    } `json:"annotations,omitempty"`
}

其中:

  • URI用于定位一个具体资源的标识,场景的有http://、file://、postgres://等等还可以去https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml 看看。
  • MIMEType表示文件的类型,常见的有 text/html、image/png,这里也可以查看到更多:https://www.iana.org/assignments/media-types/media-types.xhtml。
func (s *MCPServer) AddResource(
    resource mcp.Resource,
    handler ResourceHandlerFunc,
) {
    // 检查资源
    if s.capabilities.resources == nil {
       s.capabilities.resources = &resourceCapabilities{}
    }

    // 加锁
    s.mu.Lock()
    // 解锁(defer)
    defer s.mu.Unlock()

    // 存储资源
    s.resources[resource.URI] = resourceEntry{
       resource: resource,
       handler:  handler,
    }
}

resource是放入到MCPServer的resources字段中,使用URI作为key,resourceEntry作为value,其中resourceEntry结构如下:

type resourceEntry struct {
    resource mcp.Resource
    handler  ResourceHandlerFunc
}

框架还提供了添加Resource templates的功能,主要是针对动态资源,服务器可以公开 URI 模板 ,客户端可以使用它来构建有效的资源 URI。

// Dynamic resource example - user profiles by ID
template := mcp.NewResourceTemplate(
    "users://{id}/profile",
    "User Profile",
    mcp.WithTemplateDescription("Returns user profile information"),
    mcp.WithTemplateMIMEType("application/json"),
)

// Add template with its handler
s.AddResourceTemplate(template, func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
    // Extract ID from the URI using regex matching
    // The server automatically matches URIs to templates
    userID := extractIDFromURI(request.Params.URI)

    profile := fmt.Sprintf("Hello %s", userID) // Your DB/API call here

    return []mcp.ResourceContents{
       mcp.TextResourceContents{
          URI:      request.Params.URI,
          MIMEType: "application/json",
          Text:     profile,
       },
    }, nil
})

// extractIDFromURI 从给定的 URI 中提取用户 ID。
// 假设 URI 的格式为 "users://{id}/profile"。
func extractIDFromURI(uri string) string {
    // 定义正则表达式来匹配 URI 中的 ID
    re := regexp.MustCompile(`users://([^/]+)/profile`)

    // 使用正则表达式查找匹配项
    matches := re.FindStringSubmatch(uri)

    // 如果找到了匹配项,并且匹配项的数量正确,则返回 ID
    if len(matches) == 2 {
       return matches[1]
    }

    // 如果没有找到匹配项,或者匹配项的数量不正确,则返回空字符串
    return ""
}

 

4.5 Prompts添加

Prompts创建可重复使用的提示模板和工作流程,提示使服务器能够定义可重复使用的提示模板和工作流程。
// Simple greeting prompt
s.AddPrompt(mcp.NewPrompt("greeting",
    mcp.WithPromptDescription("A friendly greeting prompt"),
    mcp.WithArgument("name",
       mcp.ArgumentDescription("Name of the person to greet"),
    ),
), func(ctx context.Context, request mcp.GetPromptRequest) (*mcp.GetPromptResult, error) {
    name := request.Params.Arguments["name"]
    if name == "" {
       name = "friend"
    }

    return mcp.NewGetPromptResult(
       "A friendly greeting",
       []mcp.PromptMessage{
          mcp.NewPromptMessage(
             mcp.RoleAssistant,
             mcp.NewTextContent(fmt.Sprintf("Hello, %s! How can I help you today?", name)),
          ),
       },
    ), nil
})

 

Prompt的结构体如下:
// Prompt 表示服务器提供的提示或提示模板。
// 如果 Arguments 非空且包含元素,则表示该提示是一个模板,
// 在调用 prompts/get 时需要提供参数值。
// 如果 Arguments 为空或为 nil,则这是一个不需要参数的静态提示。
type Prompt struct {
    // 提示或提示模板的名称。
    Name string `json:"name"`
    // 提示提供内容的可选描述。
    Description string `json:"description,omitempty"`
    // 用于模板化提示的参数列表。
    // 参数的存在表明这是一个模板提示。
    Arguments []PromptArgument `json:"arguments,omitempty"`
}

// PromptArgument 描述提示模板可以接受的参数。
// 当提示包含参数时,客户端在发出 prompts/get 请求时
// 必须为所有必需参数提供值。
type PromptArgument struct {
    // 参数的名称。
    Name string `json:"name"`
    // 参数的可读描述。
    Description string `json:"description,omitempty"`
    // 此参数是否必须提供。
    // 如果为 true,则客户端在调用 prompts/get 时必须包含此参数。
    Required bool `json:"required,omitempty"`
}

可以通过mcp.NewPrompt方法来生成Prompt对象:

func NewPrompt(name string, opts ...PromptOption) Prompt {
    prompt := Prompt{
       Name: name,
    }

    for _, opt := range opts {
       opt(&prompt)
    }

    return prompt
}

又见到了opts ...PromptOption,看看有哪些选项:

  • WithPromptDescription用于设置description;
  • WithArgument用于设置arguments;

4.6 选择传输方式

现在代码已经来到了:
// 启动标准输入输出服务器
if err := server.ServeStdio(s); err != nil {
    fmt.Printf("Server error: %v\n", err) // 打印服务器错误
}

MCP当前主要提供两类Stdio transport和HTTP with SSE transport 。

4.6.1 Stdio

创建StdioServer

StdioServer的结构体:
type StdioServer struct {
    server      *MCPServer
    errLogger   *log.Logger
    contextFunc StdioContextFunc
}

其中在,重要的字段有server和contextFunc,给MCPServer进来就是为了具备MCP的能力,只是使用stdio的传输方式。contextFunc是为了让外部自定义的context可以进入到StdioServer,可以用于结束服务和控制超时.


// 为MCP Server 启用stdio
func ServeStdio(server *MCPServer, opts ...StdioOption) error {
    // 创建Stdio服务器
    s := NewStdioServer(server)
    // 设置错误日志
    s.SetErrorLogger(log.New(os.Stderr, "", log.LstdFlags))

    // 应用选项
    for _, opt := range opts {
       opt(s)
    }

    // 创建上下文
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 设置信号通道
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)

    // 监听信号
    go func() {
       <-sigChan
       cancel()
    }()

    // 开始监听
    return s.Listen(ctx, os.Stdin, os.Stdout)
}

其中最关键的是Listen方法,它是 StdioServer 类型的一个关键方法,用于监听标准输入输出的 JSON-RPC 消息。

// Listen 启动监听,从提供的输入读取 JSON-RPC 消息,并将响应写入提供的输出。
// 它将持续运行,直到上下文被取消或发生错误。
// 如果在读取输入或写入输出时遇到问题,将返回错误。
func (s *StdioServer) Listen(
    ctx context.Context, // 监听过程的上下文,用于控制生命周期和传递请求范围的信息
    stdin io.Reader, // 标准输入流,用于读取客户端发送的 JSON-RPC 消息
    stdout io.Writer, // 标准输出流,用于将响应写回客户端
) error {
    // 由于标准输入输出只有一个客户端,因此设置一个静态客户端上下文,SessionId为stdio
    if err := s.server.RegisterSession(&stdioSessionInstance); err != nil {
       // 如果会话注册失败,返回错误
       return fmt.Errorf("register session: %w", err)
    }
    // 确保在函数结束时注销会话
    defer s.server.UnregisterSession(stdioSessionInstance.SessionID())
    // 更新上下文,将会话信息加入
    ctx = s.server.WithContext(ctx, &stdioSessionInstance)

    // 如果存在自定义上下文函数,则应用该函数修改上下文
    if s.contextFunc != nil {
       ctx = s.contextFunc(ctx)
    }

    // 创建一个带缓冲的读取器,用于从标准输入流高效读取数据
    reader := bufio.NewReader(stdin)

    // 启动一个协程专门处理通知
    go func() {
       for {
          select {
          case notification := <-stdioSessionInstance.notifications:
             // 收到通知时,调用 writeResponse 方法将通知写入标准输出
             err := s.writeResponse(notification, stdout)
             if err != nil {
                // 如果写入通知时出错,记录错误日志
                s.errLogger.Printf("Error writing notification: %v", err)
             }
          case <-ctx.Done():
             // 如果上下文完成,退出协程
             return
          }
       }
    }()

    // 主循环,用于处理输入消息
    for {
       select {
       case <-ctx.Done():
          // 如果上下文完成,返回上下文错误
          return ctx.Err()
       default:
          // 使用协程使读取操作可取消
          readChan := make(chan string, 1) // 用于接收读取到的行
          errChan := make(chan error, 1)   // 用于接收读取错误

          go func() {
             line, err := reader.ReadString('\n') // 读取一行输入
             if err != nil {
                errChan <- err // 发送读取错误
                return
             }
             readChan <- line // 发送读取到的行
          }()

          select {
          case <-ctx.Done():
             // 如果上下文完成,返回上下文错误
             return ctx.Err()
          case err := <-errChan:
             // 处理读取错误
             if err == io.EOF {
                // 如果是文件结束符,表示输入结束,返回 nil
                return nil
             }
             // 其他错误则记录日志并返回
             s.errLogger.Printf("Error reading input: %v", err)
             return err
          case line := <-readChan:
             // 处理读取到的行
             if err := s.processMessage(ctx, line, stdout); err != nil {
                // 如果处理消息时出错
                if err == io.EOF {
                   // 如果是文件结束符,返回 nil
                   return nil
                }
                // 其他错误则记录日志并返回
                s.errLogger.Printf("Error handling message: %v", err)
                return err
             }
          }
       }
    }
}

 

4.6.2 SSE

SSE模式凭借其分布式能力、实时性和架构灵活性,成为MCP在 企业级应用、云端协作、动态数据流处理等场景的首选。所以对SSE的支持程度和易用程度,对于一个MCP框架而言是非常重要的。
mark3labs/mcp-go如何支持SSE呢?看看如下代码:
mcpServer := NewMCPServer()
sseServer := server.NewSSEServer(mcpServer, server.WithBaseURL("http://localhost:8080"))
log.Printf("SSE server listening on :8080")
if err := sseServer.Start(":8080"); err != nil {
    log.Fatalf("Server error: %v", err)
}

现在已经知道如何在mark3labs/mcp-go中启用SSE了,现在来分析一下,它是如何实现的。

创建SSEServer

SSEServer的结构体如下:
// SSEServer implements a Server-Sent Events (SSE) based MCP server.
// It provides real-time communication capabilities over HTTP using the SSE protocol.
type SSEServer struct {
    server          *MCPServer       // MCPServer 实例,用于处理实际的消息传递和通信逻辑
    baseURL         string           // SSE 服务器的基础 URL,用于构建完整的端点路径
    basePath        string           // SSE 服务器的基础路径,通常用于区分不同的服务或版本
    messageEndpoint string           // 消息端点的路径,客户端通过此端点发送 JSON-RPC 消息
    sseEndpoint     string           // SSE 端点的路径,客户端通过此端点建立 SSE 连接
    sessions        sync.Map         // 存储活动 SSE 会话的同步映射,用于跟踪和管理客户端连接
    srv             *http.Server     // 内部的 HTTP 服务器实例,用于处理 HTTP 请求和响应
    contextFunc     SSEContextFunc   // 可选的上下文函数,用于根据请求内容自定义上下文
}

与之前的StdioServer相比,SSEServer多了用于http中使用的URI、path,还有srv这是一个httpServer的类型,用于支持HTTP请求和响应。继续查看如何构建SSEServer:

// NewSSEServer creates a new SSE server instance with the given MCP server and options.
func NewSSEServer(server *MCPServer, opts ...SSEOption) *SSEServer {
    s := &SSEServer{
       server:          server,
       sseEndpoint:     "/sse",
       messageEndpoint: "/message",
    }

    // Apply all options
    for _, opt := range opts {
       opt(s)
    }

    return s
}

还是采用的才是经典Option模式,继续看看SSEOption有哪些选项:

名称 功能 使用方式
WithBaseURL 设置 SSE 服务器的基础 URL WithBaseURL("https://example.com")
WithBasePath 设置 SSE 服务器的基础路径 WithBasePath("/v1")
WithMessageEndpoint 设置消息端点的路径 WithMessageEndpoint("/custom-message")
WithSSEEndpoint 设置 SSE 端点的路径 WithSSEEndpoint("/custom-sse")
WithHTTPServer 设置 HTTP 服务器实例(通常用于测试或自定义服务器配置) WithHTTPServer(customHttpServer)
WithContextFunc 设置一个函数,用于根据请求内容自定义上下文 WithContextFunc(func(ctx context.Context, r *http.Request) context.Context { ... })
WithHTTPServer适用于需要自定义服务器配置的场景,为后续替换更好性能的http服务实例打下基础,也体现了mark3labs/mcp-go扩展性。
剩下的Start()就是常见的启动http服务实例的功能了。
func (s *SSEServer) Start(addr string) error {
    s.srv = &http.Server{
       Addr:    addr,
       Handler: s,
    }

    return s.srv.ListenAndServe()
}

还可以调用Shutdown()实现对服务器的关闭。

4.6.3 集成到gin框架

在实际开发中,很多公司内部的业务有自己的框架,集成了许许多多的独特功能。总不能为了使用MCP重写一套Web框架,此时就需要使用到mark3labs/mcp-go集成到Web框架的能力了。下面以gin框架为例:
// 创建一个新的 Gin 引擎
r := gin.Default()

// 创建一个新的 MCPServer 实例(假设这是 SSEServer 所需的)
mcpServer := server.NewMCPServer("gin-mcp-server", "1.0.0") // 根据你的实际代码调整
// mcpServer 新加Tool、Resource、Prompt
// ... ...
// 创建一个新的 SSEServer 实例,并传入 MCPServer
sseServer := server.NewSSEServer(mcpServer)

// 将 SSEServer 的 SSE 端点和处理函数集成到 Gin 路由中
r.GET(sseServer.CompleteSsePath(), func(c *gin.Context) {
    sseServer.ServeHTTP(c.Writer, c.Request)
})

// 将 SSEServer 的消息端点和处理函数集成到 Gin 路由中
r.POST(sseServer.CompleteMessagePath(), func(c *gin.Context) {
    sseServer.ServeHTTP(c.Writer, c.Request)
})

// 启动 Gin 服务器
if err := r.Run("localhost:8081"); err != nil {
    log.Fatalf("Gin server startup failed: %v", err)
}

上述的代码会生成两个路由:

路由
方法
作用
实例
/sse
GET
  • 获取SessionID
  • 接受Server响应
请求:
curl 'http://localhost:3000/sse?transportType=sse&url=http%3A%2F%2Flocalhost%3A8081%2Fsse' \
  -H 'Accept: */*' \
  -H 'Accept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7' \
  -H 'Cache-Control: no-cache' \
  -H 'Connection: keep-alive' \
  -H 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36' 
  
  响应:
  event: endpoint
data: /message?sessionId=fee6d6df-d394-4b4d-a748-fddcc73fb766

 

/message
POST
  • 使用SessionID保持会话
  • 发起功能请求
请求:
curl 'http://localhost:3000/message?sessionId=fee6d6df-d394-4b4d-a748-fddcc73fb766' \
  -H 'Accept: */*' \
  -H 'Cache-Control: no-cache' \
  -H 'Connection: keep-alive' \
  -H 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36' \
  -H 'content-type: application/json' 
  --data-raw '{"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{"sampling":{},"roots":{"listChanged":true}},"clientInfo":{"name":"mcp-inspector","version":"0.7.0"}},"jsonrpc":"2.0","id":0}'
 
/message的响应:
{"jsonrpc":"2.0","id":0,"result":{"protocolVersion":"2024-11-05","capabilities":{"logging":{},"prompts":{"listChanged":true},"resources":{"subscribe":true,"listChanged":true},"tools":{}},"serverInfo":{"name":"example-servers/everything","version":"1.0.0"}}}

/sse收到的响应:
event: message
data: {"jsonrpc":"2.0","id":0,"result":{"protocolVersion":"2024-11-05","capabilities":{"logging":{},"prompts":{"listChanged":true},"resources":{"subscribe":true,"listChanged":true},"tools":{}},"serverInfo":{"name":"example-servers/everything","version":"1.0.0"}}}

 

4.7 处理请求

之前的内容,解析了如何构建MCP Server的实践和背后的实现。下面我们还需要了解mark3labs/mcp-go如何接受请求并进行响应的。

4.7.1 路由入口

从SSEServer结构体中已经知道使用的http.Server,所以其接受请求的入口为ServeHTTP方法,实现了 http.Handler 接口,用于处理 HTTP 请求。根据请求的路径,它会将请求分发到不同的处理方法(handleSSE 或 handleMessage),或者返回 404 未找到。
func (s *SSEServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // 获取请求的路径
    path := r.URL.Path

    // 使用精确路径匹配,而不是模糊包含
    ssePath := s.CompleteSsePath() // 获取完整的 SSE 路径
    if ssePath != "" && path == ssePath {
       // 如果请求路径与 SSE 路径匹配,则处理 SSE 请求
       s.handleSSE(w, r)
       return // 处理完成后直接返回,不再继续后续逻辑
    }

    // 获取消息处理的完整路径
    messagePath := s.CompleteMessagePath()
    if messagePath != "" && path == messagePath {
       // 如果请求路径与消息处理路径匹配,则处理消息请求
       s.handleMessage(w, r)
       return // 处理完成后直接返回,不再继续后续逻辑
    }

    // 如果请求路径不匹配任何已知路径,则返回 404 未找到
    http.NotFound(w, r)
}

其中:

  • SSE路径为:s.baseURL + s.basePath + s.sseEndpoint
  • Message路径为:s.baseURL + s.basePath + s.messageEndpoint
这些信息都是可以在NewSSEServer()方法中设置。
需要详细查看的是 s.handleSSE(w, r)和s.handleMessage(w, r)方法,他们分别处理/see和/message的请求。

4.7.2 handleSSE

handleSSE实现了一个处理服务器发送事件(SSE)的HTTP处理器。SSE是一种允许服务器向客户端发送自动更新的技术。主要流程:
  1. 请求方法检查:只允许GET请求。
  2. 设置响应头:设置适当的SSE响应头。
  3. 创建会话:为每个客户端创建一个新的SSE会话。
  4. 注册和注销会话:在服务器中注册会话,并在处理完成后注销。
  5. 通知处理器:启动一个goroutine处理来自通知通道的事件,并将其发送到客户端。
  6. 主事件循环:处理来自事件队列的事件,并将其发送到客户端。
// 它设置适当的头信息并为客户端创建一个新的会话。
func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) {
    // 1. 检查请求方法是否为GET,如果不是,返回405 Method Not Allowed错误
    if r.Method != http.MethodGet {
       http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
       return
    }

    // 2. 设置SSE相关的响应头
    w.Header().Set("Content-Type", "text/event-stream") // 设置内容类型为text/event-stream
    w.Header().Set("Cache-Control", "no-cache")         // 禁用缓存
    w.Header().Set("Connection", "keep-alive")          // 保持连接活跃
    w.Header().Set("Access-Control-Allow-Origin", "*")  // 允许所有域跨域请求

    // 检查ResponseWriter是否支持Flush,如果不支持,返回500 Internal Server Error错误
    flusher, ok := w.(http.Flusher)
    if !ok {
       http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
       return
    }

    // 3. 创建一个新的会话ID和会话对象
    sessionID := uuid.New().String() // 生成唯一的会话ID
    session := &sseSession{
       writer:              w,                          // 响应写入器
       flusher:             flusher,                    // 刷新器
       done:                make(chan struct{}),        // 用于通知会话结束的通道
       eventQueue:          make(chan string, 100),     // 事件队列,缓冲区大小为100
       sessionID:           sessionID,                  // 会话ID
       notificationChannel: make(chan mcp.JSONRPCNotification, 100), // 通知通道,缓冲区大小为100
    }

    // 4. 将会话存储到会话存储中,并在处理完成后删除
    s.sessions.Store(sessionID, session)
    defer s.sessions.Delete(sessionID)

    // 在服务器中注册会话,如果注册失败,返回500 Internal Server Error错误
    if err := s.server.RegisterSession(session); err != nil {
       http.Error(w, fmt.Sprintf("Session registration failed: %v", err), http.StatusInternalServerError)
       return
    }
    // 在处理完成后注销会话
    defer s.server.UnregisterSession(sessionID)

    // 5. 启动一个goroutine处理通知通道中的事件
    go func() {
       for {
          select {
          case notification := <-session.notificationChannel: // 从通知通道接收通知
             eventData, err := json.Marshal(notification) // 将通知序列化为JSON
             if err == nil {
                select {
                case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData): // 将事件发送到事件队列
                   // 事件成功入队
                case <-session.done: // 如果会话结束,退出goroutine
                   return
                }
             }
          case <-session.done: // 如果会话结束,退出goroutine
             return
          case <-r.Context().Done(): // 如果请求上下文被取消,退出goroutine
             return
          }
       }
    }()

    // 生成消息端点URL并发送初始的endpoint事件
    messageEndpoint := fmt.Sprintf("%s?sessionId=%s", s.CompleteMessageEndpoint(), sessionID)
    fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", messageEndpoint) // 发送endpoint事件
    flusher.Flush() // 刷新响应,确保事件立即发送到客户端

    // 6. 主事件循环,运行在HTTP处理器goroutine中
    for {
       select {
       case event := <-session.eventQueue: // 从事件队列接收事件
          fmt.Fprint(w, event) // 将事件写入响应
          flusher.Flush()      // 刷新响应,确保事件立即发送到客户端
       case <-r.Context().Done(): // 如果请求上下文被取消,关闭会话并退出
          close(session.done)
          return
       }
    }
}

第三阶段又看到Session了,与之前的stdioSession相比,sseSession明显复杂多了,它们都实现接口ClientSession:

type ClientSession interface {
    // NotificationChannel provides a channel suitable for sending notifications to client.
    NotificationChannel() chan<- mcp.JSONRPCNotification
    // SessionID is a unique identifier used to track user session.
    SessionID() string
}

sseSession是专门用于表示一个基于服务器发送事件(Server-Sent Events, SSE)协议的活跃连接。sseSession负责管理客户端与服务器之间的单向实时数据推送,并保持会话。其结构体如下:

type sseSession struct {
    writer              http.ResponseWriter // HTTP响应写入器,用于向客户端发送数据
    flusher             http.Flusher       // HTTP刷新器,用于刷新响应缓冲区,确保数据立即发送给客户端
    done                chan struct{}      // 用于通知会话结束的通道
    eventQueue          chan string        // 用于排队事件的通道,存储待发送给客户端的事件
    sessionID           string             // 会话的唯一标识符
    notificationChannel chan mcp.JSONRPCNotification // 用于接收JSON-RPC通知的通道
}

4.7.3 handleMessage

handleMessage 方法是 SSEServer 类型的一个方法,用于处理传入的 JSON-RPC 消息,并通过 SSE 连接和 HTTP 响应返回结果。其主要流程:
  1. 请求验证:检查请求方法
    1. 检查请求方法:方法首先检查请求方法是否为 HTTP POST。如果不是,返回 "Method not allowed" 错误,并终止处理。
    2. 验证 sessionId 参数:从请求的 URL 查询参数中获取 sessionId。如果缺失,返回 "Missing sessionId" 错误。
    3. 加载会话:使用 sessionId 从会话存储中加载会话。如果会话不存在,返回 "Invalid session ID" 错误。
  2. 设置上下文
    1. 调用 s.server.WithContext 方法,将请求上下文和会话信息合并,生成新的上下文。如果提供了 contextFunc,则进一步处理上下文。
  3. 解析 JSON-RPC 消息
    1. 使用 json.NewDecoder 解析请求体中的原始 JSON 消息。如果解析失败,返回 "Parse error" 错误。
  4. 处理消息
    1. 将解析后的消息传递给 s.server.HandleMessage 方法进行处理,并获取响应结果。
  5. 发送响应
      如果 HandleMessage 返回了响应(非通知),则:
      如果 HandleMessage 没有返回响应(通知),则仅设置 HTTP 响应状态码为 202 Accepted,不发送响应体。
    1. 将响应编码为 JSON 格式。
    2. 将响应事件加入会话的事件队列,供 SSE 连接发送。
    3. 设置 HTTP 响应头为 application/json,状态码为 202 Accepted,并发送响应体。
  6. 事件队列处理
尝试将事件加入会话的事件队列。如果队列已满或会话已关闭,则丢弃事件。
那么handleSSE和handleMessage的关系是怎样的呢?使用一张图来说明:
 
// handleMessage 处理来自客户端的JSON-RPC消息,并通过SSE连接和HTTP响应返回结果
func (s *SSEServer) handleMessage(w http.ResponseWriter, r *http.Request) {
    // 1. 如果请求方法不是POST,则返回JSON-RPC错误响应,表示方法不允许
    if r.Method != http.MethodPost {
       s.writeJSONRPCError(w, nil, mcp.INVALID_REQUEST, "Method not allowed")
       return
    }

    // 从请求的URL查询参数中获取sessionId
    sessionID := r.URL.Query().Get("sessionId")
    // 如果sessionId为空,则返回JSON-RPC错误响应,表示缺少sessionId参数
    if sessionID == "" {
       s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Missing sessionId")
       return
    }

    // 从session存储中加载与sessionId对应的session
    sessionI, ok := s.sessions.Load(sessionID)
    // 如果session不存在,则返回JSON-RPC错误响应,表示无效的session ID
    if !ok {
       s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Invalid session ID")
       return
    }
    session := sessionI.(*sseSession)

    // 在处理消息之前设置客户端上下文
    ctx := s.server.WithContext(r.Context(), session)
    // 如果提供了自定义的上下文函数,则应用它
    if s.contextFunc != nil {
       ctx = s.contextFunc(ctx, r)
    }

    // 将请求体解析为原始JSON消息
    var rawMessage json.RawMessage
    if err := json.NewDecoder(r.Body).Decode(&rawMessage); err != nil {
       // 如果解析失败,则返回JSON-RPC错误响应,表示解析错误
       s.writeJSONRPCError(w, nil, mcp.PARSE_ERROR, "Parse error")
       return
    }

    // 通过MCPServer处理消息
    response := s.server.HandleMessage(ctx, rawMessage)

    // 如果存在响应(非通知),则发送响应
    if response != nil {
       // 将响应编码为JSON格式
       eventData, _ := json.Marshal(response)

       // 将事件排队以通过SSE发送
       select {
       case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData):
          // 事件成功排队
       case <-session.done:
          // 会话已关闭,不尝试排队
       default:
          // 队列已满,可以记录此情况
       }

       // 发送HTTP响应
       w.Header().Set("Content-Type", "application/json")
       w.WriteHeader(http.StatusAccepted)
       json.NewEncoder(w).Encode(response)
    } else {
       // 对于通知,只发送202 Accepted状态码,无响应体
       w.WriteHeader(http.StatusAccepted)
    }
}

 

需要注意的是HandleMessage方法,这是通过server/internal/gen/request_handler.go.tmpl生成的,也根据MCP协议实现的模版,其流程如下:
  1. 尝试将原始消息解析为对应请求类型。
  2. 如果解析失败,记录错误信息。
  3. 执行请求前的钩子函数。
  4. 调用对应的处理函数处理请求。
  5. 如果处理过程中发生错误,执行错误钩子函数并返回错误响应。
  6. 执行请求后的钩子函数。
  7. 返回成功响应。
以initialize请求为例:
// 根据消息方法进行分情况处理
switch baseMessage.Method {
// 初始化请求处理
case mcp.MethodInitialize:
    var request mcp.InitializeRequest
    var result *mcp.InitializeResult
    // 尝试将原始消息解析为初始化请求类型
    if unmarshalErr := json.Unmarshal(message, &request); unmarshalErr != nil {
       // 如果解析失败,记录错误信息
       err = &requestError{
          id:   baseMessage.ID,
          code: mcp.INVALID_REQUEST,
          err:  &UnparseableMessageError{message: message, err: unmarshalErr, method: baseMessage.Method},
       }
    } else {
       // 执行初始化请求前的钩子函数
       s.hooks.beforeInitialize(baseMessage.ID, &request)
       // 处理初始化请求
       result, err = s.handleInitialize(ctx, baseMessage.ID, request)
    }
    // 如果处理过程中发生错误
    if err != nil {
       // 执行错误钩子函数
       s.hooks.onError(baseMessage.ID, baseMessage.Method, &request, err)
       // 返回错误响应
       return err.ToJSONRPCError()
    }
    // 执行初始化请求后的钩子函数
    s.hooks.afterInitialize(baseMessage.ID, &request, result)
    // 返回成功响应
    return createResponse(baseMessage.ID, *result)

// 其他方法处理逻辑类似,省略...

// 如果方法不匹配任何已知方法,返回方法未找到错误响应
default:
    return createErrorResponse(
       baseMessage.ID,
       mcp.METHOD_NOT_FOUND,
       fmt.Sprintf("Method %s not found", baseMessage.Method),
    )
}

代码比较多,可以自行查看:https://github.com/mark3labs/mcp-go/blob/e183dd17cfec07072a188f6169033bf61f7bf37d/server/request_handler.go#L12

五、总结

mark3labs/mcp-go框架是一个简单易用的MCP框架,基本上实现了MCP协议,提供对 MCP 核心规范的完整支持,包括资源(Resources)、工具(Tools)、提示(Prompts)等核心组件,确保与主流 LLM 客户端(如 Claude、Cline)的兼容性。尤其是mark3labs/mcp-go提供的hooks机制,可以让开发者更好的使用类似gin空间一样的中间件能力,比如实现统一鉴权等能力。除此之外,还可以与主流的Web框架,如gin框架进行集成,进一步扩展了mark3labs/mcp-go的适用性。
原文链接:https://my.oschina.net/qiangmzsx/blog/18014540
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章