您现在的位置是:首页 > 文章详情

万字长文解读MCP框架,让你掌握mark3labs/mcp-go

日期:2025-03-27点击:41

一、引言

《万字长文,带你读懂 Anthropic MCP》中我们介绍了MCP的基本框架和组件,并初步说了在golang中的框架metoro-io/mcp-golang和mark3labs/mcp-go。本文将通过实践和源码的方式先解读mark3labs/mcp-go。

二、MCP-Server的简述

MCP Server一般为轻量的服务端程序,通过一种标准的协议(MCP)暴露出特定资源的一些特定的能力。

2.1 连接生命周期

2.1.1 初始化连接

  1. 客户端发送带有协议版本和功能initialize请求。
  2. 服务器以其协议版本和功能进行响应
  3. 客户端发送initialized通知作为确认
  4. 开始正常信息交换

2.1.2 信息交换

初始化后,支持以下模式:
  1. 请求-响应:客户端或服务器发送请求,对方响应
  2. 通知:任何一方发送单向消息

2.1.3 终止

 enum ErrorCode { // Standard JSON-RPC error codes ParseError = -32700, InvalidRequest = -32600, MethodNotFound = -32601, InvalidParams = -32602, InternalError = -32603 }
任何一方都可以终止连接:
  1. 通过close()干净关闭
  2. 传输断开
  3. 错误情况
    1. 请求的错误响应
    2. 传输中的错误事件
    3. 协议级错误处理程序
常见错误码:
 

2.2 MCP Server的业务能力

Request Method 发起方 响应方 描述
initialized Client Server 初始化会话
tools-list Client Server 发现可用的工具
tools/call Client Server 调用工具
resources/list Client Server 发现可用的资源
resources/read Client Server 获取资源内容
resources/templates Client Server 发现可用的参数化资源
resources/subscribe Client Server 订阅特定资源,监听其变化事件
prompts/list Client Server 发现可用的提示词
prompts/get Client Server 获取特定提示词
roots/list Server Client 列出服务器有权访问的客户端文件系统根节点(暴露目录和文件)
sampling/create Server Client 启用服务器的AI生成能力( sampling creation )

三、mark3labs/mcp-go示例

我们就以框架中的官方示例代码为引子一步步解读流程和框架代码。
 package main import ( "context" "errors" "fmt" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" ) func main() { // 创建MCP服务器 s := server.NewMCPServer( "Demo 🚀", // 服务器名称 "1.0.0", // 服务器版本 ) // 添加工具 tool := mcp.NewTool("hello_world", // 工具名称 mcp.WithDescription("Say hello to someone"), // 工具描述 mcp.WithString("name", // 参数名称 mcp.Required(), // 参数是必需的 mcp.Description("Name of the person to greet"), // 参数描述 ), ) // 为工具添加处理器 s.AddTool(tool, helloHandler) // 启动标准输入输出服务器 if err := server.ServeStdio(s); err != nil { fmt.Printf("Server error: %v\n", err) // 打印服务器错误 } } func helloHandler(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { // 从请求参数中获取名字参数,并断言为字符串类型 name, ok := request.Params.Arguments["name"].(string) if !ok { // 如果断言失败,返回错误 return nil, errors.New("name must be a string") } // 返回包含问候语的结果 return mcp.NewToolResultText(fmt.Sprintf("Hello, %s!", name)), nil }

 

3.1 搭建一个联调环境

就以上面的代码为例,将上面的代码编译为二进制命令:
 $ go build -v -o server

再启动mcp inspetor:

 $ npx -y @modelcontextprotocol/inspector ./server

这样一个简单的MCP Client和MCP Server就搭建好了,后续也为我们开发测试构建好了环境。

四、源码解读

在上面的代码中main函数中的第一个代码就是server.NewMCPServer,那我们就从MCPServer这个结构体入手。

4.1 MCPServer结构体

代码地址:https://github.com/mark3labs/mcp-go/blob/e183dd17cfec07072a188f6169033bf61f7bf37d/server/server.go#L135
 type MCPServer struct { mu sync.RWMutex // 用于保护共享资源,确保并发访问时的数据一致性 name string // 服务器的名称,用于标识服务器 version string // 服务器的版本,用于跟踪和管理服务器的不同版本 instructions string // 服务器的指令,通常在初始化响应中返回给客户端,提供使用指南或帮助信息 resources map[string]resourceEntry // 存储服务器支持的资源及其处理函数 resourceTemplates map[string]resourceTemplateEntry // 存储资源模板及其处理函数,支持URI模板匹配多类似资源 prompts map[string]mcp.Prompt // 存储服务器支持的提示,用于与用户交互 promptHandlers map[string]PromptHandlerFunc // 存储处理提示请求的函数,每个提示对应一个处理函数 tools map[string]ServerTool // 存储服务器支持的工具及其处理函数 notificationHandlers map[string]NotificationHandlerFunc // 存储处理传入通知的函数,接收客户端通知并处理 capabilities serverCapabilities // 定义服务器支持的功能特性,包括资源、提示、工具和日志记录等 sessions sync.Map // 存储当前活跃的客户端会话,用于跟踪用户交互 initialized atomic.Bool // 使用原子操作标记服务器是否已初始化,确保线程安全 hooks *Hooks // 存储服务器钩子,允许在请求处理前后或返回错误前执行自定义逻辑 }

MCPServer 是 Model Control Protocol (MCP) 服务器的实现,用于处理包括资源、提示和工具在内的各种类型的请求。

     

4.2 MCPServer初始化

看看如何创建一个MCPServer对象。
 func NewMCPServer( name, version string, opts ...ServerOption, ) *MCPServer { s := &MCPServer{ resources: make(map[string]resourceEntry), resourceTemplates: make(map[string]resourceTemplateEntry), prompts: make(map[string]mcp.Prompt), promptHandlers: make(map[string]PromptHandlerFunc), tools: make(map[string]ServerTool), name: name, version: version, notificationHandlers: make(map[string]NotificationHandlerFunc), capabilities: serverCapabilities{ tools: nil, resources: nil, prompts: nil, logging: false, }, } for _, opt := range opts { opt(s) } return s }

在NewMCPServer()方法中,我们需要关注的opts ...ServerOption,进一步看看ServerOption有哪些选项:

选项 功能 使用方式
WithResourceCapabilities 配置资源相关的服务器功能,如订阅和资源列表变化通知 WithResourceCapabilities(subscribe, listChanged bool)
WithHooks 添加钩子函数,用于在请求处理前后执行特定逻辑 WithHooks(hooks *Hooks)
WithPromptCapabilities 配置提示相关的服务器功能,如提示列表变化通知 WithPromptCapabilities(listChanged bool)
WithToolCapabilities 配置工具相关的服务器功能,如工具列表变化通知 WithToolCapabilities(listChanged bool)
WithLogging 启用服务器日志记录功能 WithLogging()
WithInstructions 设置服务器指令,用于在初始化响应中返回给客户端 WithInstructions(instructions string)
它接受一个 Hooks 类型的指针作为参数。允许在创建 MCPServer 实例时,为服务器添加自定义的钩子函数,这些钩子函数可以在请求处理前后或返回错误给客户端之前执行。
hooks机制对开发和流程是非常有效的。框架中给的hooks能力有:
字段名 类型 描述
OnBeforeAny []BeforeAnyHookFunc 在请求被解析后但方法调用前执行的钩子函数。
OnSuccess []OnSuccessHookFunc 在请求成功生成结果但结果尚未发送给客户端之前执行的钩子函数。
OnError []OnErrorHookFunc 在请求解析或方法执行过程中发生错误时执行的钩子函数。
OnBeforeInitialize []OnBeforeInitializeFunc 在处理初始化请求前执行的钩子函数。
OnAfterInitialize []OnAfterInitializeFunc 在处理初始化请求后执行的钩子函数。
OnBeforePing []OnBeforePingFunc 在处理 Ping 请求前执行的钩子函数。
OnAfterPing []OnAfterPingFunc 在处理 Ping 请求后执行的钩子函数。
OnBeforeListResources []OnBeforeListResourcesFunc 在处理列出资源请求前执行的钩子函数。
OnAfterListResources []OnAfterListResourcesFunc 在处理列出资源请求后执行的钩子函数。
OnBeforeListResourceTemplates []OnBeforeListResourceTemplatesFunc 在处理列出资源模板请求前执行的钩子函数。
OnAfterListResourceTemplates []OnAfterListResourceTemplatesFunc 在处理列出资源模板请求后执行的钩子函数。
OnBeforeReadResource []OnBeforeReadResourceFunc 在处理读取资源请求前执行的钩子函数。
OnAfterReadResource []OnAfterReadResourceFunc 在处理读取资源请求后执行的钩子函数。
OnBeforeListPrompts []OnBeforeListPromptsFunc 在处理列出提示请求前执行的钩子函数。
OnAfterListPrompts []OnAfterListPromptsFunc 在处理列出提示请求后执行的钩子函数。
OnBeforeGetPrompt []OnBeforeGetPromptFunc 在处理获取提示请求前执行的钩子函数。
OnAfterGetPrompt []OnAfterGetPromptFunc 在处理获取提示请求后执行的钩子函数。
OnBeforeListTools []OnBeforeListToolsFunc 在处理列出工具请求前执行的钩子函数。
OnAfterListTools []OnAfterListToolsFunc 在处理列出工具请求后执行的钩子函数。
OnBeforeCallTool []OnBeforeCallToolFunc 在处理调用工具请求前执行的钩子函数。
OnAfterCallTool []OnAfterCallToolFunc 在处理调用工具请求后执行的钩子函数。
现在模拟创建一个完整的MCPServer:
 hooks := &server.Hooks{} hooks.AddBeforeAny(func(id any, method mcp.MCPMethod, message any) { fmt.Printf("beforeAny: %s, %v, %v\n", method, id, message) }) hooks.AddOnSuccess(func(id any, method mcp.MCPMethod, message any, result any) { fmt.Printf("onSuccess: %s, %v, %v, %v\n", method, id, message, result) }) hooks.AddOnError(func(id any, method mcp.MCPMethod, message any, err error) { fmt.Printf("onError: %s, %v, %v, %v\n", method, id, message, err) }) hooks.AddBeforeInitialize(func(id any, message *mcp.InitializeRequest) { fmt.Printf("beforeInitialize: %v, %v\n", id, message) }) hooks.AddAfterInitialize(func(id any, message *mcp.InitializeRequest, result *mcp.InitializeResult) { fmt.Printf("afterInitialize: %v, %v, %v\n", id, message, result) }) hooks.AddAfterCallTool(func(id any, message *mcp.CallToolRequest, result *mcp.CallToolResult) { fmt.Printf("afterCallTool: %v, %v, %v\n", id, message, result) }) hooks.AddBeforeCallTool(func(id any, message *mcp.CallToolRequest) { fmt.Printf("beforeCallTool: %v, %v\n", id, message) }) // 创建MCP服务器 s := server.NewMCPServer( "Demo 🚀", // 服务器名称 "1.0.0", // 服务器版本 server.WithLogging(), server.WithToolCapabilities(true), server.WithResourceCapabilities(true, true), server.WithPromptCapabilities(true), server.WithInstructions("initialized"), server.WithHooks(hooks), )

 

4.3 Tools模块

4.3.1 创建Tool

mark3labs/mcp-go框架中创建Tool有两个方式:
  1. mcp.NewTool(name string, opts ...ToolOption) Tool
  2. mcp.NewToolWithRawSchema(name, description string, schema json.RawMessage) Tool
下面通过一段代码进行对比:
 方式一:mcp.NewTool() tool := mcp.NewTool("hello_world", // 工具名称 mcp.WithDescription("Say hello to someone"), // 工具描述 mcp.WithString("name", // 参数名称 mcp.Required(), // 参数是必需的 mcp.Description("Name of the person to greet"), // 参数描述 ), ) 方式二:mcp.NewToolWithRawSchema() rawSchema := json.RawMessage(`{ "type": "object", "properties": { "name": {"type": "string", "description": "Name of the person to greet"} }, "required": ["name"] }`) // Create a tool with raw schema toolRS := mcp.NewToolWithRawSchema("hello_world_1", "Say hello to someone", rawSchema)

其中rawSchema的结构需要符合ToolInputSchema结构体:

 type ToolInputSchema struct { Type string `json:"type"` Properties map[string]interface{} `json:"properties,omitempty"` Required []string `json:"required,omitempty"` }

需要注意的是Properties来源于jsonSchema,所以具备的需要校验属性,比如default、maximum、minimum、maxLength、minLength、enum等等。其中key为请求传入的参数字段,interface{}为对key的各种属性校验。

更建议使用方式一:mcp.NewTool()更加符合编码方式,也更好控制器生成的 jsonSchema

4.3.2 存放Tool

创建好Tool值之后,调用server的方法AddTool()将Tool添加进入,相当于web框架中的添加路由与相关handle的关系。相关代码如下:
 // AddTool registers a new tool and its handler func (s *MCPServer) AddTool(tool mcp.Tool, handler ToolHandlerFunc) { s.AddTools(ServerTool{Tool: tool, Handler: handler}) } // AddTools registers multiple tools at once func (s *MCPServer) AddTools(tools ...ServerTool) { // 检查工具 if s.capabilities.tools == nil { s.capabilities.tools = &toolCapabilities{} } // 加锁 s.mu.Lock() // 遍历工具 for _, entry := range tools { s.tools[entry.Tool.Name] = entry } // 获取初始化状态 initialized := s.initialized.Load() s.mu.Unlock() // 发送通知 if initialized { s.sendNotificationToAllClients("notifications/tools/list_changed", nil) } }

可以看出Tool是放入到MCPServer的tools字段中,使用name作为key,ServerTool作为value,其中ServerTool结构如下:

 type ServerTool struct { Tool mcp.Tool Handler ToolHandlerFunc }

框架还提供了:

  • DeleteTools(names... string)作为删除Tool关联的方法。
  • SetTools(tools ...ServerTool)可以设置当前所有的Tool 列表。

4.4 Resource模块

老规矩先来一个demo,再看器源码实现:
 // Static resource example - exposing a README file resource := mcp.NewResource( "docs://readme", "Project README", mcp.WithResourceDescription("The project's README file"), mcp.WithMIMEType("text/markdown"), ) // Add resource with its handler s.AddResource(resource, func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) { content, err := os.ReadFile("main.go") if err != nil { return nil, err } return []mcp.ResourceContents{ mcp.TextResourceContents{ URI: "docs://readme", MIMEType: "text/markdown", Text: string(content), }, }, nil })

 

在MCP中Resource是指允许Server公开可供客户端读取并用作交互上下文的数据和内容。有很多类型的Resource:
  • File contents 文件内容
  • Database records 数据库记录
  • API responses API响应
  • Live system data 实时系统数据
  • images 图像
  • Log files 日志文件
  • 等等... ...
框架中Resource的结构体如下:
  type Resource struct { Annotated // 包含可选注解,用于告知客户端如何使用或显示对象 // 资源的URI URI string `json:"uri"` // 资源的唯一标识符,用于定位和访问资源 // 资源的可读名称 // // 客户端可以使用此名称来填充UI元素 Name string `json:"name"` // 资源的显示名称,便于用户理解和界面展示 // 对此资源所代表内容的描述 // // 客户端可以使用此描述来帮助大型语言模型(LLM)理解可用资源 // 这可以看作是对模型的“提示” Description string `json:"description,omitempty"` // 资源的详细描述,为模型提供上下文信息 // 如果已知,此资源的MIME类型 MIMEType string `json:"mimeType,omitempty"` // 资源的媒体类型,如text/plain、image/jpeg等,用于指示资源的内容格式 } type Annotated struct { Annotations *struct { // 描述此对象或数据的预期客户是谁 // // 它可以包含多个条目,以指示对多个受众有用的内容(例如,`["user", "assistant"]`) Audience []Role `json:"audience,omitempty"` // 受众群体,指示数据对哪些角色或用户群体有用 // 描述此数据对服务器操作的重要性 // // 值为1表示“最重要”,并指示数据实际上是必需的,而0表示“最不重要”,并指示数据完全是可选的 Priority float64 `json:"priority,omitempty"` // 优先级,表示数据对服务器操作的重要程度,范围从0(最不重要)到1(最重要) } `json:"annotations,omitempty"` }

其中:

  • URI用于定位一个具体资源的标识,场景的有http://、file://、postgres://等等还可以去https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml 看看。
  • MIMEType表示文件的类型,常见的有 text/html、image/png,这里也可以查看到更多:https://www.iana.org/assignments/media-types/media-types.xhtml。
 func (s *MCPServer) AddResource( resource mcp.Resource, handler ResourceHandlerFunc, ) { // 检查资源 if s.capabilities.resources == nil { s.capabilities.resources = &resourceCapabilities{} } // 加锁 s.mu.Lock() // 解锁(defer) defer s.mu.Unlock() // 存储资源 s.resources[resource.URI] = resourceEntry{ resource: resource, handler: handler, } }

resource是放入到MCPServer的resources字段中,使用URI作为key,resourceEntry作为value,其中resourceEntry结构如下:

 type resourceEntry struct { resource mcp.Resource handler ResourceHandlerFunc }

框架还提供了添加Resource templates的功能,主要是针对动态资源,服务器可以公开 URI 模板 ,客户端可以使用它来构建有效的资源 URI。

 // Dynamic resource example - user profiles by ID template := mcp.NewResourceTemplate( "users://{id}/profile", "User Profile", mcp.WithTemplateDescription("Returns user profile information"), mcp.WithTemplateMIMEType("application/json"), ) // Add template with its handler s.AddResourceTemplate(template, func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) { // Extract ID from the URI using regex matching // The server automatically matches URIs to templates userID := extractIDFromURI(request.Params.URI) profile := fmt.Sprintf("Hello %s", userID) // Your DB/API call here return []mcp.ResourceContents{ mcp.TextResourceContents{ URI: request.Params.URI, MIMEType: "application/json", Text: profile, }, }, nil }) // extractIDFromURI 从给定的 URI 中提取用户 ID。 // 假设 URI 的格式为 "users://{id}/profile"。 func extractIDFromURI(uri string) string { // 定义正则表达式来匹配 URI 中的 ID re := regexp.MustCompile(`users://([^/]+)/profile`) // 使用正则表达式查找匹配项 matches := re.FindStringSubmatch(uri) // 如果找到了匹配项,并且匹配项的数量正确,则返回 ID if len(matches) == 2 { return matches[1] } // 如果没有找到匹配项,或者匹配项的数量不正确,则返回空字符串 return "" }

 

4.5 Prompts添加

Prompts创建可重复使用的提示模板和工作流程,提示使服务器能够定义可重复使用的提示模板和工作流程。
 // Simple greeting prompt s.AddPrompt(mcp.NewPrompt("greeting", mcp.WithPromptDescription("A friendly greeting prompt"), mcp.WithArgument("name", mcp.ArgumentDescription("Name of the person to greet"), ), ), func(ctx context.Context, request mcp.GetPromptRequest) (*mcp.GetPromptResult, error) { name := request.Params.Arguments["name"] if name == "" { name = "friend" } return mcp.NewGetPromptResult( "A friendly greeting", []mcp.PromptMessage{ mcp.NewPromptMessage( mcp.RoleAssistant, mcp.NewTextContent(fmt.Sprintf("Hello, %s! How can I help you today?", name)), ), }, ), nil })

 

Prompt的结构体如下:
 // Prompt 表示服务器提供的提示或提示模板。 // 如果 Arguments 非空且包含元素,则表示该提示是一个模板, // 在调用 prompts/get 时需要提供参数值。 // 如果 Arguments 为空或为 nil,则这是一个不需要参数的静态提示。 type Prompt struct { // 提示或提示模板的名称。 Name string `json:"name"` // 提示提供内容的可选描述。 Description string `json:"description,omitempty"` // 用于模板化提示的参数列表。 // 参数的存在表明这是一个模板提示。 Arguments []PromptArgument `json:"arguments,omitempty"` } // PromptArgument 描述提示模板可以接受的参数。 // 当提示包含参数时,客户端在发出 prompts/get 请求时 // 必须为所有必需参数提供值。 type PromptArgument struct { // 参数的名称。 Name string `json:"name"` // 参数的可读描述。 Description string `json:"description,omitempty"` // 此参数是否必须提供。 // 如果为 true,则客户端在调用 prompts/get 时必须包含此参数。 Required bool `json:"required,omitempty"` }

可以通过mcp.NewPrompt方法来生成Prompt对象:

 func NewPrompt(name string, opts ...PromptOption) Prompt { prompt := Prompt{ Name: name, } for _, opt := range opts { opt(&prompt) } return prompt }

又见到了opts ...PromptOption,看看有哪些选项:

  • WithPromptDescription用于设置description;
  • WithArgument用于设置arguments;

4.6 选择传输方式

现在代码已经来到了:
 // 启动标准输入输出服务器 if err := server.ServeStdio(s); err != nil { fmt.Printf("Server error: %v\n", err) // 打印服务器错误 }

MCP当前主要提供两类Stdio transport和HTTP with SSE transport 。

4.6.1 Stdio

创建StdioServer

StdioServer的结构体:
 type StdioServer struct { server *MCPServer errLogger *log.Logger contextFunc StdioContextFunc }

其中在,重要的字段有server和contextFunc,给MCPServer进来就是为了具备MCP的能力,只是使用stdio的传输方式。contextFunc是为了让外部自定义的context可以进入到StdioServer,可以用于结束服务和控制超时.

  // 为MCP Server 启用stdio func ServeStdio(server *MCPServer, opts ...StdioOption) error { // 创建Stdio服务器 s := NewStdioServer(server) // 设置错误日志 s.SetErrorLogger(log.New(os.Stderr, "", log.LstdFlags)) // 应用选项 for _, opt := range opts { opt(s) } // 创建上下文 ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 设置信号通道 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) // 监听信号 go func() { <-sigChan cancel() }() // 开始监听 return s.Listen(ctx, os.Stdin, os.Stdout) }

其中最关键的是Listen方法,它是 StdioServer 类型的一个关键方法,用于监听标准输入输出的 JSON-RPC 消息。

 // Listen 启动监听,从提供的输入读取 JSON-RPC 消息,并将响应写入提供的输出。 // 它将持续运行,直到上下文被取消或发生错误。 // 如果在读取输入或写入输出时遇到问题,将返回错误。 func (s *StdioServer) Listen( ctx context.Context, // 监听过程的上下文,用于控制生命周期和传递请求范围的信息 stdin io.Reader, // 标准输入流,用于读取客户端发送的 JSON-RPC 消息 stdout io.Writer, // 标准输出流,用于将响应写回客户端 ) error { // 由于标准输入输出只有一个客户端,因此设置一个静态客户端上下文,SessionId为stdio if err := s.server.RegisterSession(&stdioSessionInstance); err != nil { // 如果会话注册失败,返回错误 return fmt.Errorf("register session: %w", err) } // 确保在函数结束时注销会话 defer s.server.UnregisterSession(stdioSessionInstance.SessionID()) // 更新上下文,将会话信息加入 ctx = s.server.WithContext(ctx, &stdioSessionInstance) // 如果存在自定义上下文函数,则应用该函数修改上下文 if s.contextFunc != nil { ctx = s.contextFunc(ctx) } // 创建一个带缓冲的读取器,用于从标准输入流高效读取数据 reader := bufio.NewReader(stdin) // 启动一个协程专门处理通知 go func() { for { select { case notification := <-stdioSessionInstance.notifications: // 收到通知时,调用 writeResponse 方法将通知写入标准输出 err := s.writeResponse(notification, stdout) if err != nil { // 如果写入通知时出错,记录错误日志 s.errLogger.Printf("Error writing notification: %v", err) } case <-ctx.Done(): // 如果上下文完成,退出协程 return } } }() // 主循环,用于处理输入消息 for { select { case <-ctx.Done(): // 如果上下文完成,返回上下文错误 return ctx.Err() default: // 使用协程使读取操作可取消 readChan := make(chan string, 1) // 用于接收读取到的行 errChan := make(chan error, 1) // 用于接收读取错误 go func() { line, err := reader.ReadString('\n') // 读取一行输入 if err != nil { errChan <- err // 发送读取错误 return } readChan <- line // 发送读取到的行 }() select { case <-ctx.Done(): // 如果上下文完成,返回上下文错误 return ctx.Err() case err := <-errChan: // 处理读取错误 if err == io.EOF { // 如果是文件结束符,表示输入结束,返回 nil return nil } // 其他错误则记录日志并返回 s.errLogger.Printf("Error reading input: %v", err) return err case line := <-readChan: // 处理读取到的行 if err := s.processMessage(ctx, line, stdout); err != nil { // 如果处理消息时出错 if err == io.EOF { // 如果是文件结束符,返回 nil return nil } // 其他错误则记录日志并返回 s.errLogger.Printf("Error handling message: %v", err) return err } } } } }

 

4.6.2 SSE

SSE模式凭借其分布式能力、实时性和架构灵活性,成为MCP在 企业级应用、云端协作、动态数据流处理等场景的首选。所以对SSE的支持程度和易用程度,对于一个MCP框架而言是非常重要的。
mark3labs/mcp-go如何支持SSE呢?看看如下代码:
 mcpServer := NewMCPServer() sseServer := server.NewSSEServer(mcpServer, server.WithBaseURL("http://localhost:8080")) log.Printf("SSE server listening on :8080") if err := sseServer.Start(":8080"); err != nil { log.Fatalf("Server error: %v", err) }

现在已经知道如何在mark3labs/mcp-go中启用SSE了,现在来分析一下,它是如何实现的。

创建SSEServer

SSEServer的结构体如下:
 // SSEServer implements a Server-Sent Events (SSE) based MCP server. // It provides real-time communication capabilities over HTTP using the SSE protocol. type SSEServer struct { server *MCPServer // MCPServer 实例,用于处理实际的消息传递和通信逻辑 baseURL string // SSE 服务器的基础 URL,用于构建完整的端点路径 basePath string // SSE 服务器的基础路径,通常用于区分不同的服务或版本 messageEndpoint string // 消息端点的路径,客户端通过此端点发送 JSON-RPC 消息 sseEndpoint string // SSE 端点的路径,客户端通过此端点建立 SSE 连接 sessions sync.Map // 存储活动 SSE 会话的同步映射,用于跟踪和管理客户端连接 srv *http.Server // 内部的 HTTP 服务器实例,用于处理 HTTP 请求和响应 contextFunc SSEContextFunc // 可选的上下文函数,用于根据请求内容自定义上下文 }

与之前的StdioServer相比,SSEServer多了用于http中使用的URI、path,还有srv这是一个httpServer的类型,用于支持HTTP请求和响应。继续查看如何构建SSEServer:

 // NewSSEServer creates a new SSE server instance with the given MCP server and options. func NewSSEServer(server *MCPServer, opts ...SSEOption) *SSEServer { s := &SSEServer{ server: server, sseEndpoint: "/sse", messageEndpoint: "/message", } // Apply all options for _, opt := range opts { opt(s) } return s }

还是采用的才是经典Option模式,继续看看SSEOption有哪些选项:

名称 功能 使用方式
WithBaseURL 设置 SSE 服务器的基础 URL WithBaseURL("https://example.com")
WithBasePath 设置 SSE 服务器的基础路径 WithBasePath("/v1")
WithMessageEndpoint 设置消息端点的路径 WithMessageEndpoint("/custom-message")
WithSSEEndpoint 设置 SSE 端点的路径 WithSSEEndpoint("/custom-sse")
WithHTTPServer 设置 HTTP 服务器实例(通常用于测试或自定义服务器配置) WithHTTPServer(customHttpServer)
WithContextFunc 设置一个函数,用于根据请求内容自定义上下文 WithContextFunc(func(ctx context.Context, r *http.Request) context.Context { ... })
WithHTTPServer适用于需要自定义服务器配置的场景,为后续替换更好性能的http服务实例打下基础,也体现了mark3labs/mcp-go扩展性。
剩下的Start()就是常见的启动http服务实例的功能了。
 func (s *SSEServer) Start(addr string) error { s.srv = &http.Server{ Addr: addr, Handler: s, } return s.srv.ListenAndServe() }

还可以调用Shutdown()实现对服务器的关闭。

4.6.3 集成到gin框架

在实际开发中,很多公司内部的业务有自己的框架,集成了许许多多的独特功能。总不能为了使用MCP重写一套Web框架,此时就需要使用到mark3labs/mcp-go集成到Web框架的能力了。下面以gin框架为例:
 // 创建一个新的 Gin 引擎 r := gin.Default() // 创建一个新的 MCPServer 实例(假设这是 SSEServer 所需的) mcpServer := server.NewMCPServer("gin-mcp-server", "1.0.0") // 根据你的实际代码调整 // mcpServer 新加Tool、Resource、Prompt // ... ... // 创建一个新的 SSEServer 实例,并传入 MCPServer sseServer := server.NewSSEServer(mcpServer) // 将 SSEServer 的 SSE 端点和处理函数集成到 Gin 路由中 r.GET(sseServer.CompleteSsePath(), func(c *gin.Context) { sseServer.ServeHTTP(c.Writer, c.Request) }) // 将 SSEServer 的消息端点和处理函数集成到 Gin 路由中 r.POST(sseServer.CompleteMessagePath(), func(c *gin.Context) { sseServer.ServeHTTP(c.Writer, c.Request) }) // 启动 Gin 服务器 if err := r.Run("localhost:8081"); err != nil { log.Fatalf("Gin server startup failed: %v", err) }

上述的代码会生成两个路由:

路由
方法
作用
实例
/sse
GET
  • 获取SessionID
  • 接受Server响应
 请求: curl 'http://localhost:3000/sse?transportType=sse&url=http%3A%2F%2Flocalhost%3A8081%2Fsse' \ -H 'Accept: */*' \ -H 'Accept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7' \ -H 'Cache-Control: no-cache' \ -H 'Connection: keep-alive' \ -H 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36' 响应: event: endpoint data: /message?sessionId=fee6d6df-d394-4b4d-a748-fddcc73fb766

 

/message
POST
  • 使用SessionID保持会话
  • 发起功能请求
 请求: curl 'http://localhost:3000/message?sessionId=fee6d6df-d394-4b4d-a748-fddcc73fb766' \ -H 'Accept: */*' \ -H 'Cache-Control: no-cache' \ -H 'Connection: keep-alive' \ -H 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36' \ -H 'content-type: application/json' --data-raw '{"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{"sampling":{},"roots":{"listChanged":true}},"clientInfo":{"name":"mcp-inspector","version":"0.7.0"}},"jsonrpc":"2.0","id":0}' /message的响应: {"jsonrpc":"2.0","id":0,"result":{"protocolVersion":"2024-11-05","capabilities":{"logging":{},"prompts":{"listChanged":true},"resources":{"subscribe":true,"listChanged":true},"tools":{}},"serverInfo":{"name":"example-servers/everything","version":"1.0.0"}}} /sse收到的响应: event: message data: {"jsonrpc":"2.0","id":0,"result":{"protocolVersion":"2024-11-05","capabilities":{"logging":{},"prompts":{"listChanged":true},"resources":{"subscribe":true,"listChanged":true},"tools":{}},"serverInfo":{"name":"example-servers/everything","version":"1.0.0"}}} 

 

4.7 处理请求

之前的内容,解析了如何构建MCP Server的实践和背后的实现。下面我们还需要了解mark3labs/mcp-go如何接受请求并进行响应的。

4.7.1 路由入口

从SSEServer结构体中已经知道使用的http.Server,所以其接受请求的入口为ServeHTTP方法,实现了 http.Handler 接口,用于处理 HTTP 请求。根据请求的路径,它会将请求分发到不同的处理方法(handleSSE 或 handleMessage),或者返回 404 未找到。
 func (s *SSEServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { // 获取请求的路径 path := r.URL.Path // 使用精确路径匹配,而不是模糊包含 ssePath := s.CompleteSsePath() // 获取完整的 SSE 路径 if ssePath != "" && path == ssePath { // 如果请求路径与 SSE 路径匹配,则处理 SSE 请求 s.handleSSE(w, r) return // 处理完成后直接返回,不再继续后续逻辑 } // 获取消息处理的完整路径 messagePath := s.CompleteMessagePath() if messagePath != "" && path == messagePath { // 如果请求路径与消息处理路径匹配,则处理消息请求 s.handleMessage(w, r) return // 处理完成后直接返回,不再继续后续逻辑 } // 如果请求路径不匹配任何已知路径,则返回 404 未找到 http.NotFound(w, r) }

其中:

  • SSE路径为:s.baseURL + s.basePath + s.sseEndpoint
  • Message路径为:s.baseURL + s.basePath + s.messageEndpoint
这些信息都是可以在NewSSEServer()方法中设置。
需要详细查看的是 s.handleSSE(w, r)和s.handleMessage(w, r)方法,他们分别处理/see和/message的请求。

4.7.2 handleSSE

handleSSE实现了一个处理服务器发送事件(SSE)的HTTP处理器。SSE是一种允许服务器向客户端发送自动更新的技术。主要流程:
  1. 请求方法检查:只允许GET请求。
  2. 设置响应头:设置适当的SSE响应头。
  3. 创建会话:为每个客户端创建一个新的SSE会话。
  4. 注册和注销会话:在服务器中注册会话,并在处理完成后注销。
  5. 通知处理器:启动一个goroutine处理来自通知通道的事件,并将其发送到客户端。
  6. 主事件循环:处理来自事件队列的事件,并将其发送到客户端。
 // 它设置适当的头信息并为客户端创建一个新的会话。 func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) { // 1. 检查请求方法是否为GET,如果不是,返回405 Method Not Allowed错误 if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // 2. 设置SSE相关的响应头 w.Header().Set("Content-Type", "text/event-stream") // 设置内容类型为text/event-stream w.Header().Set("Cache-Control", "no-cache") // 禁用缓存 w.Header().Set("Connection", "keep-alive") // 保持连接活跃 w.Header().Set("Access-Control-Allow-Origin", "*") // 允许所有域跨域请求 // 检查ResponseWriter是否支持Flush,如果不支持,返回500 Internal Server Error错误 flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported", http.StatusInternalServerError) return } // 3. 创建一个新的会话ID和会话对象 sessionID := uuid.New().String() // 生成唯一的会话ID session := &sseSession{ writer: w, // 响应写入器 flusher: flusher, // 刷新器 done: make(chan struct{}), // 用于通知会话结束的通道 eventQueue: make(chan string, 100), // 事件队列,缓冲区大小为100 sessionID: sessionID, // 会话ID notificationChannel: make(chan mcp.JSONRPCNotification, 100), // 通知通道,缓冲区大小为100 } // 4. 将会话存储到会话存储中,并在处理完成后删除 s.sessions.Store(sessionID, session) defer s.sessions.Delete(sessionID) // 在服务器中注册会话,如果注册失败,返回500 Internal Server Error错误 if err := s.server.RegisterSession(session); err != nil { http.Error(w, fmt.Sprintf("Session registration failed: %v", err), http.StatusInternalServerError) return } // 在处理完成后注销会话 defer s.server.UnregisterSession(sessionID) // 5. 启动一个goroutine处理通知通道中的事件 go func() { for { select { case notification := <-session.notificationChannel: // 从通知通道接收通知 eventData, err := json.Marshal(notification) // 将通知序列化为JSON if err == nil { select { case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData): // 将事件发送到事件队列 // 事件成功入队 case <-session.done: // 如果会话结束,退出goroutine return } } case <-session.done: // 如果会话结束,退出goroutine return case <-r.Context().Done(): // 如果请求上下文被取消,退出goroutine return } } }() // 生成消息端点URL并发送初始的endpoint事件 messageEndpoint := fmt.Sprintf("%s?sessionId=%s", s.CompleteMessageEndpoint(), sessionID) fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", messageEndpoint) // 发送endpoint事件 flusher.Flush() // 刷新响应,确保事件立即发送到客户端 // 6. 主事件循环,运行在HTTP处理器goroutine中 for { select { case event := <-session.eventQueue: // 从事件队列接收事件 fmt.Fprint(w, event) // 将事件写入响应 flusher.Flush() // 刷新响应,确保事件立即发送到客户端 case <-r.Context().Done(): // 如果请求上下文被取消,关闭会话并退出 close(session.done) return } } }

第三阶段又看到Session了,与之前的stdioSession相比,sseSession明显复杂多了,它们都实现接口ClientSession:

 type ClientSession interface { // NotificationChannel provides a channel suitable for sending notifications to client. NotificationChannel() chan<- mcp.JSONRPCNotification // SessionID is a unique identifier used to track user session. SessionID() string }

sseSession是专门用于表示一个基于服务器发送事件(Server-Sent Events, SSE)协议的活跃连接。sseSession负责管理客户端与服务器之间的单向实时数据推送,并保持会话。其结构体如下:

 type sseSession struct { writer http.ResponseWriter // HTTP响应写入器,用于向客户端发送数据 flusher http.Flusher // HTTP刷新器,用于刷新响应缓冲区,确保数据立即发送给客户端 done chan struct{} // 用于通知会话结束的通道 eventQueue chan string // 用于排队事件的通道,存储待发送给客户端的事件 sessionID string // 会话的唯一标识符 notificationChannel chan mcp.JSONRPCNotification // 用于接收JSON-RPC通知的通道 }

4.7.3 handleMessage

handleMessage 方法是 SSEServer 类型的一个方法,用于处理传入的 JSON-RPC 消息,并通过 SSE 连接和 HTTP 响应返回结果。其主要流程:
  1. 请求验证:检查请求方法
    1. 检查请求方法:方法首先检查请求方法是否为 HTTP POST。如果不是,返回 "Method not allowed" 错误,并终止处理。
    2. 验证 sessionId 参数:从请求的 URL 查询参数中获取 sessionId。如果缺失,返回 "Missing sessionId" 错误。
    3. 加载会话:使用 sessionId 从会话存储中加载会话。如果会话不存在,返回 "Invalid session ID" 错误。
  2. 设置上下文
    1. 调用 s.server.WithContext 方法,将请求上下文和会话信息合并,生成新的上下文。如果提供了 contextFunc,则进一步处理上下文。
  3. 解析 JSON-RPC 消息
    1. 使用 json.NewDecoder 解析请求体中的原始 JSON 消息。如果解析失败,返回 "Parse error" 错误。
  4. 处理消息
    1. 将解析后的消息传递给 s.server.HandleMessage 方法进行处理,并获取响应结果。
  5. 发送响应
      如果 HandleMessage 返回了响应(非通知),则:
      如果 HandleMessage 没有返回响应(通知),则仅设置 HTTP 响应状态码为 202 Accepted,不发送响应体。
    1. 将响应编码为 JSON 格式。
    2. 将响应事件加入会话的事件队列,供 SSE 连接发送。
    3. 设置 HTTP 响应头为 application/json,状态码为 202 Accepted,并发送响应体。
  6. 事件队列处理
尝试将事件加入会话的事件队列。如果队列已满或会话已关闭,则丢弃事件。
那么handleSSE和handleMessage的关系是怎样的呢?使用一张图来说明:
 
 // handleMessage 处理来自客户端的JSON-RPC消息,并通过SSE连接和HTTP响应返回结果 func (s *SSEServer) handleMessage(w http.ResponseWriter, r *http.Request) { // 1. 如果请求方法不是POST,则返回JSON-RPC错误响应,表示方法不允许 if r.Method != http.MethodPost { s.writeJSONRPCError(w, nil, mcp.INVALID_REQUEST, "Method not allowed") return } // 从请求的URL查询参数中获取sessionId sessionID := r.URL.Query().Get("sessionId") // 如果sessionId为空,则返回JSON-RPC错误响应,表示缺少sessionId参数 if sessionID == "" { s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Missing sessionId") return } // 从session存储中加载与sessionId对应的session sessionI, ok := s.sessions.Load(sessionID) // 如果session不存在,则返回JSON-RPC错误响应,表示无效的session ID if !ok { s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Invalid session ID") return } session := sessionI.(*sseSession) // 在处理消息之前设置客户端上下文 ctx := s.server.WithContext(r.Context(), session) // 如果提供了自定义的上下文函数,则应用它 if s.contextFunc != nil { ctx = s.contextFunc(ctx, r) } // 将请求体解析为原始JSON消息 var rawMessage json.RawMessage if err := json.NewDecoder(r.Body).Decode(&rawMessage); err != nil { // 如果解析失败,则返回JSON-RPC错误响应,表示解析错误 s.writeJSONRPCError(w, nil, mcp.PARSE_ERROR, "Parse error") return } // 通过MCPServer处理消息 response := s.server.HandleMessage(ctx, rawMessage) // 如果存在响应(非通知),则发送响应 if response != nil { // 将响应编码为JSON格式 eventData, _ := json.Marshal(response) // 将事件排队以通过SSE发送 select { case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData): // 事件成功排队 case <-session.done: // 会话已关闭,不尝试排队 default: // 队列已满,可以记录此情况 } // 发送HTTP响应 w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusAccepted) json.NewEncoder(w).Encode(response) } else { // 对于通知,只发送202 Accepted状态码,无响应体 w.WriteHeader(http.StatusAccepted) } }

 

需要注意的是HandleMessage方法,这是通过server/internal/gen/request_handler.go.tmpl生成的,也根据MCP协议实现的模版,其流程如下:
  1. 尝试将原始消息解析为对应请求类型。
  2. 如果解析失败,记录错误信息。
  3. 执行请求前的钩子函数。
  4. 调用对应的处理函数处理请求。
  5. 如果处理过程中发生错误,执行错误钩子函数并返回错误响应。
  6. 执行请求后的钩子函数。
  7. 返回成功响应。
以initialize请求为例:
 // 根据消息方法进行分情况处理 switch baseMessage.Method { // 初始化请求处理 case mcp.MethodInitialize: var request mcp.InitializeRequest var result *mcp.InitializeResult // 尝试将原始消息解析为初始化请求类型 if unmarshalErr := json.Unmarshal(message, &request); unmarshalErr != nil { // 如果解析失败,记录错误信息 err = &requestError{ id: baseMessage.ID, code: mcp.INVALID_REQUEST, err: &UnparseableMessageError{message: message, err: unmarshalErr, method: baseMessage.Method}, } } else { // 执行初始化请求前的钩子函数 s.hooks.beforeInitialize(baseMessage.ID, &request) // 处理初始化请求 result, err = s.handleInitialize(ctx, baseMessage.ID, request) } // 如果处理过程中发生错误 if err != nil { // 执行错误钩子函数 s.hooks.onError(baseMessage.ID, baseMessage.Method, &request, err) // 返回错误响应 return err.ToJSONRPCError() } // 执行初始化请求后的钩子函数 s.hooks.afterInitialize(baseMessage.ID, &request, result) // 返回成功响应 return createResponse(baseMessage.ID, *result) // 其他方法处理逻辑类似,省略... // 如果方法不匹配任何已知方法,返回方法未找到错误响应 default: return createErrorResponse( baseMessage.ID, mcp.METHOD_NOT_FOUND, fmt.Sprintf("Method %s not found", baseMessage.Method), ) }

代码比较多,可以自行查看:https://github.com/mark3labs/mcp-go/blob/e183dd17cfec07072a188f6169033bf61f7bf37d/server/request_handler.go#L12

五、总结

mark3labs/mcp-go框架是一个简单易用的MCP框架,基本上实现了MCP协议,提供对 MCP 核心规范的完整支持,包括资源(Resources)、工具(Tools)、提示(Prompts)等核心组件,确保与主流 LLM 客户端(如 Claude、Cline)的兼容性。尤其是mark3labs/mcp-go提供的hooks机制,可以让开发者更好的使用类似gin空间一样的中间件能力,比如实现统一鉴权等能力。除此之外,还可以与主流的Web框架,如gin框架进行集成,进一步扩展了mark3labs/mcp-go的适用性。
原文链接:https://my.oschina.net/qiangmzsx/blog/18014540
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章