一、引言
在
《万字长文,带你读懂 Anthropic MCP》中我们介绍了MCP的基本框架和组件,并初步说了在golang中的框架metoro-io/mcp-golang和mark3labs/mcp-go。本文将通过实践和源码的方式先解读mark3labs/mcp-go。
二、MCP-Server的简述
MCP Server一般为轻量的服务端程序,通过一种标准的协议(MCP)暴露出特定资源的一些特定的能力。
2.1 连接生命周期
2.1.1 初始化连接
-
客户端发送带有协议版本和功能initialize请求。
-
服务器以其协议版本和功能进行响应
-
客户端发送initialized通知作为确认
-
开始正常信息交换
2.1.2 信息交换
初始化后,支持以下模式:
-
请求-响应:客户端或服务器发送请求,对方响应
-
通知:任何一方发送单向消息
2.1.3 终止
enum ErrorCode {
// Standard JSON-RPC error codes
ParseError = -32700,
InvalidRequest = -32600,
MethodNotFound = -32601,
InvalidParams = -32602,
InternalError = -32603
}
任何一方都可以终止连接:
-
通过close()干净关闭
-
传输断开
-
错误情况
-
请求的错误响应
-
传输中的错误事件
-
协议级错误处理程序
常见错误码:
2.2 MCP Server的业务能力
| Request Method |
发起方 |
响应方 |
描述 |
| initialized |
Client |
Server |
初始化会话 |
| tools-list |
Client |
Server |
发现可用的工具 |
| tools/call |
Client |
Server |
调用工具 |
| resources/list |
Client |
Server |
发现可用的资源 |
| resources/read |
Client |
Server |
获取资源内容 |
| resources/templates |
Client |
Server |
发现可用的参数化资源 |
| resources/subscribe |
Client |
Server |
订阅特定资源,监听其变化事件 |
| prompts/list |
Client |
Server |
发现可用的提示词 |
| prompts/get |
Client |
Server |
获取特定提示词 |
| roots/list |
Server |
Client |
列出服务器有权访问的客户端文件系统根节点(暴露目录和文件) |
| sampling/create |
Server |
Client |
启用服务器的AI生成能力( sampling creation ) |
三、mark3labs/mcp-go示例
我们就以框架中的官方示例代码为引子一步步解读流程和框架代码。
package main
import (
"context"
"errors"
"fmt"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
func main() {
// 创建MCP服务器
s := server.NewMCPServer(
"Demo 🚀", // 服务器名称
"1.0.0", // 服务器版本
)
// 添加工具
tool := mcp.NewTool("hello_world", // 工具名称
mcp.WithDescription("Say hello to someone"), // 工具描述
mcp.WithString("name", // 参数名称
mcp.Required(), // 参数是必需的
mcp.Description("Name of the person to greet"), // 参数描述
),
)
// 为工具添加处理器
s.AddTool(tool, helloHandler)
// 启动标准输入输出服务器
if err := server.ServeStdio(s); err != nil {
fmt.Printf("Server error: %v\n", err) // 打印服务器错误
}
}
func helloHandler(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// 从请求参数中获取名字参数,并断言为字符串类型
name, ok := request.Params.Arguments["name"].(string)
if !ok {
// 如果断言失败,返回错误
return nil, errors.New("name must be a string")
}
// 返回包含问候语的结果
return mcp.NewToolResultText(fmt.Sprintf("Hello, %s!", name)), nil
}
3.1 搭建一个联调环境
就以上面的代码为例,将上面的代码编译为二进制命令:
$ go build -v -o server
再启动mcp inspetor:
$ npx -y @modelcontextprotocol/inspector ./server
![]()
这样一个简单的MCP Client和MCP Server就搭建好了,后续也为我们开发测试构建好了环境。
四、源码解读
在上面的代码中main函数中的第一个代码就是server.NewMCPServer,那我们就从MCPServer这个结构体入手。
4.1 MCPServer结构体
代码地址:https://github.com/mark3labs/mcp-go/blob/e183dd17cfec07072a188f6169033bf61f7bf37d/server/server.go#L135
type MCPServer struct {
mu sync.RWMutex // 用于保护共享资源,确保并发访问时的数据一致性
name string // 服务器的名称,用于标识服务器
version string // 服务器的版本,用于跟踪和管理服务器的不同版本
instructions string // 服务器的指令,通常在初始化响应中返回给客户端,提供使用指南或帮助信息
resources map[string]resourceEntry // 存储服务器支持的资源及其处理函数
resourceTemplates map[string]resourceTemplateEntry // 存储资源模板及其处理函数,支持URI模板匹配多类似资源
prompts map[string]mcp.Prompt // 存储服务器支持的提示,用于与用户交互
promptHandlers map[string]PromptHandlerFunc // 存储处理提示请求的函数,每个提示对应一个处理函数
tools map[string]ServerTool // 存储服务器支持的工具及其处理函数
notificationHandlers map[string]NotificationHandlerFunc // 存储处理传入通知的函数,接收客户端通知并处理
capabilities serverCapabilities // 定义服务器支持的功能特性,包括资源、提示、工具和日志记录等
sessions sync.Map // 存储当前活跃的客户端会话,用于跟踪用户交互
initialized atomic.Bool // 使用原子操作标记服务器是否已初始化,确保线程安全
hooks *Hooks // 存储服务器钩子,允许在请求处理前后或返回错误前执行自定义逻辑
}
MCPServer 是 Model Control Protocol (MCP) 服务器的实现,用于处理包括资源、提示和工具在内的各种类型的请求。
4.2 MCPServer初始化
看看如何创建一个MCPServer对象。
func NewMCPServer(
name, version string,
opts ...ServerOption,
) *MCPServer {
s := &MCPServer{
resources: make(map[string]resourceEntry),
resourceTemplates: make(map[string]resourceTemplateEntry),
prompts: make(map[string]mcp.Prompt),
promptHandlers: make(map[string]PromptHandlerFunc),
tools: make(map[string]ServerTool),
name: name,
version: version,
notificationHandlers: make(map[string]NotificationHandlerFunc),
capabilities: serverCapabilities{
tools: nil,
resources: nil,
prompts: nil,
logging: false,
},
}
for _, opt := range opts {
opt(s)
}
return s
}
在NewMCPServer()方法中,我们需要关注的opts ...ServerOption,进一步看看ServerOption有哪些选项:
| 选项 |
功能 |
使用方式 |
| WithResourceCapabilities |
配置资源相关的服务器功能,如订阅和资源列表变化通知 |
WithResourceCapabilities(subscribe, listChanged bool) |
| WithHooks |
添加钩子函数,用于在请求处理前后执行特定逻辑 |
WithHooks(hooks *Hooks) |
| WithPromptCapabilities |
配置提示相关的服务器功能,如提示列表变化通知 |
WithPromptCapabilities(listChanged bool) |
| WithToolCapabilities |
配置工具相关的服务器功能,如工具列表变化通知 |
WithToolCapabilities(listChanged bool) |
| WithLogging |
启用服务器日志记录功能 |
WithLogging() |
| WithInstructions |
设置服务器指令,用于在初始化响应中返回给客户端 |
WithInstructions(instructions string) |
它接受一个 Hooks 类型的指针作为参数。允许在创建 MCPServer 实例时,为服务器添加自定义的钩子函数,这些钩子函数可以在请求处理前后或返回错误给客户端之前执行。
hooks机制对开发和流程是非常有效的。框架中给的hooks能力有:
| 字段名 |
类型 |
描述 |
| OnBeforeAny |
[]BeforeAnyHookFunc |
在请求被解析后但方法调用前执行的钩子函数。 |
| OnSuccess |
[]OnSuccessHookFunc |
在请求成功生成结果但结果尚未发送给客户端之前执行的钩子函数。 |
| OnError |
[]OnErrorHookFunc |
在请求解析或方法执行过程中发生错误时执行的钩子函数。 |
| OnBeforeInitialize |
[]OnBeforeInitializeFunc |
在处理初始化请求前执行的钩子函数。 |
| OnAfterInitialize |
[]OnAfterInitializeFunc |
在处理初始化请求后执行的钩子函数。 |
| OnBeforePing |
[]OnBeforePingFunc |
在处理 Ping 请求前执行的钩子函数。 |
| OnAfterPing |
[]OnAfterPingFunc |
在处理 Ping 请求后执行的钩子函数。 |
| OnBeforeListResources |
[]OnBeforeListResourcesFunc |
在处理列出资源请求前执行的钩子函数。 |
| OnAfterListResources |
[]OnAfterListResourcesFunc |
在处理列出资源请求后执行的钩子函数。 |
| OnBeforeListResourceTemplates |
[]OnBeforeListResourceTemplatesFunc |
在处理列出资源模板请求前执行的钩子函数。 |
| OnAfterListResourceTemplates |
[]OnAfterListResourceTemplatesFunc |
在处理列出资源模板请求后执行的钩子函数。 |
| OnBeforeReadResource |
[]OnBeforeReadResourceFunc |
在处理读取资源请求前执行的钩子函数。 |
| OnAfterReadResource |
[]OnAfterReadResourceFunc |
在处理读取资源请求后执行的钩子函数。 |
| OnBeforeListPrompts |
[]OnBeforeListPromptsFunc |
在处理列出提示请求前执行的钩子函数。 |
| OnAfterListPrompts |
[]OnAfterListPromptsFunc |
在处理列出提示请求后执行的钩子函数。 |
| OnBeforeGetPrompt |
[]OnBeforeGetPromptFunc |
在处理获取提示请求前执行的钩子函数。 |
| OnAfterGetPrompt |
[]OnAfterGetPromptFunc |
在处理获取提示请求后执行的钩子函数。 |
| OnBeforeListTools |
[]OnBeforeListToolsFunc |
在处理列出工具请求前执行的钩子函数。 |
| OnAfterListTools |
[]OnAfterListToolsFunc |
在处理列出工具请求后执行的钩子函数。 |
| OnBeforeCallTool |
[]OnBeforeCallToolFunc |
在处理调用工具请求前执行的钩子函数。 |
| OnAfterCallTool |
[]OnAfterCallToolFunc |
在处理调用工具请求后执行的钩子函数。 |
现在模拟创建一个完整的MCPServer:
hooks := &server.Hooks{}
hooks.AddBeforeAny(func(id any, method mcp.MCPMethod, message any) {
fmt.Printf("beforeAny: %s, %v, %v\n", method, id, message)
})
hooks.AddOnSuccess(func(id any, method mcp.MCPMethod, message any, result any) {
fmt.Printf("onSuccess: %s, %v, %v, %v\n", method, id, message, result)
})
hooks.AddOnError(func(id any, method mcp.MCPMethod, message any, err error) {
fmt.Printf("onError: %s, %v, %v, %v\n", method, id, message, err)
})
hooks.AddBeforeInitialize(func(id any, message *mcp.InitializeRequest) {
fmt.Printf("beforeInitialize: %v, %v\n", id, message)
})
hooks.AddAfterInitialize(func(id any, message *mcp.InitializeRequest, result *mcp.InitializeResult) {
fmt.Printf("afterInitialize: %v, %v, %v\n", id, message, result)
})
hooks.AddAfterCallTool(func(id any, message *mcp.CallToolRequest, result *mcp.CallToolResult) {
fmt.Printf("afterCallTool: %v, %v, %v\n", id, message, result)
})
hooks.AddBeforeCallTool(func(id any, message *mcp.CallToolRequest) {
fmt.Printf("beforeCallTool: %v, %v\n", id, message)
})
// 创建MCP服务器
s := server.NewMCPServer(
"Demo 🚀", // 服务器名称
"1.0.0", // 服务器版本
server.WithLogging(),
server.WithToolCapabilities(true),
server.WithResourceCapabilities(true, true),
server.WithPromptCapabilities(true),
server.WithInstructions("initialized"),
server.WithHooks(hooks),
)
4.3 Tools模块
4.3.1 创建Tool
mark3labs/mcp-go框架中创建Tool有两个方式:
-
mcp.NewTool(name string, opts ...ToolOption) Tool
-
mcp.NewToolWithRawSchema(name, description string, schema json.RawMessage) Tool
下面通过一段代码进行对比:
方式一:mcp.NewTool()
tool := mcp.NewTool("hello_world", // 工具名称
mcp.WithDescription("Say hello to someone"), // 工具描述
mcp.WithString("name", // 参数名称
mcp.Required(), // 参数是必需的
mcp.Description("Name of the person to greet"), // 参数描述
),
)
方式二:mcp.NewToolWithRawSchema()
rawSchema := json.RawMessage(`{
"type": "object",
"properties": {
"name": {"type": "string", "description": "Name of the person to greet"}
},
"required": ["name"]
}`)
// Create a tool with raw schema
toolRS := mcp.NewToolWithRawSchema("hello_world_1", "Say hello to someone", rawSchema)
其中rawSchema的结构需要符合ToolInputSchema结构体:
type ToolInputSchema struct {
Type string `json:"type"`
Properties map[string]interface{} `json:"properties,omitempty"`
Required []string `json:"required,omitempty"`
}
需要注意的是Properties来源于jsonSchema,所以具备的需要校验属性,比如default、maximum、minimum、maxLength、minLength、enum等等。其中key为请求传入的参数字段,interface{}为对key的各种属性校验。
更建议使用方式一:mcp.NewTool()更加符合编码方式,也更好控制器生成的 jsonSchema 。
4.3.2 存放Tool
创建好Tool值之后,调用server的方法AddTool()将Tool添加进入,相当于web框架中的添加路由与相关handle的关系。相关代码如下:
// AddTool registers a new tool and its handler
func (s *MCPServer) AddTool(tool mcp.Tool, handler ToolHandlerFunc) {
s.AddTools(ServerTool{Tool: tool, Handler: handler})
}
// AddTools registers multiple tools at once
func (s *MCPServer) AddTools(tools ...ServerTool) {
// 检查工具
if s.capabilities.tools == nil {
s.capabilities.tools = &toolCapabilities{}
}
// 加锁
s.mu.Lock()
// 遍历工具
for _, entry := range tools {
s.tools[entry.Tool.Name] = entry
}
// 获取初始化状态
initialized := s.initialized.Load()
s.mu.Unlock()
// 发送通知
if initialized {
s.sendNotificationToAllClients("notifications/tools/list_changed", nil)
}
}
可以看出Tool是放入到MCPServer的tools字段中,使用name作为key,ServerTool作为value,其中ServerTool结构如下:
type ServerTool struct {
Tool mcp.Tool
Handler ToolHandlerFunc
}
框架还提供了:
4.4 Resource模块
老规矩先来一个demo,再看器源码实现:
// Static resource example - exposing a README file
resource := mcp.NewResource(
"docs://readme",
"Project README",
mcp.WithResourceDescription("The project's README file"),
mcp.WithMIMEType("text/markdown"),
)
// Add resource with its handler
s.AddResource(resource, func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
content, err := os.ReadFile("main.go")
if err != nil {
return nil, err
}
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: "docs://readme",
MIMEType: "text/markdown",
Text: string(content),
},
}, nil
})
在MCP中Resource是指允许Server公开可供客户端读取并用作交互上下文的数据和内容。有很多类型的Resource:
-
File contents 文件内容
-
Database records 数据库记录
-
API responses API响应
-
Live system data 实时系统数据
-
images 图像
-
Log files 日志文件
-
等等... ...
框架中Resource的结构体如下:
type Resource struct {
Annotated // 包含可选注解,用于告知客户端如何使用或显示对象
// 资源的URI
URI string `json:"uri"` // 资源的唯一标识符,用于定位和访问资源
// 资源的可读名称
//
// 客户端可以使用此名称来填充UI元素
Name string `json:"name"` // 资源的显示名称,便于用户理解和界面展示
// 对此资源所代表内容的描述
//
// 客户端可以使用此描述来帮助大型语言模型(LLM)理解可用资源
// 这可以看作是对模型的“提示”
Description string `json:"description,omitempty"` // 资源的详细描述,为模型提供上下文信息
// 如果已知,此资源的MIME类型
MIMEType string `json:"mimeType,omitempty"` // 资源的媒体类型,如text/plain、image/jpeg等,用于指示资源的内容格式
}
type Annotated struct {
Annotations *struct {
// 描述此对象或数据的预期客户是谁
//
// 它可以包含多个条目,以指示对多个受众有用的内容(例如,`["user", "assistant"]`)
Audience []Role `json:"audience,omitempty"` // 受众群体,指示数据对哪些角色或用户群体有用
// 描述此数据对服务器操作的重要性
//
// 值为1表示“最重要”,并指示数据实际上是必需的,而0表示“最不重要”,并指示数据完全是可选的
Priority float64 `json:"priority,omitempty"` // 优先级,表示数据对服务器操作的重要程度,范围从0(最不重要)到1(最重要)
} `json:"annotations,omitempty"`
}
其中:
-
URI用于定位一个具体资源的标识,场景的有http://、file://、postgres://等等还可以去https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml 看看。
-
MIMEType表示文件的类型,常见的有 text/html、image/png,这里也可以查看到更多:https://www.iana.org/assignments/media-types/media-types.xhtml。
func (s *MCPServer) AddResource(
resource mcp.Resource,
handler ResourceHandlerFunc,
) {
// 检查资源
if s.capabilities.resources == nil {
s.capabilities.resources = &resourceCapabilities{}
}
// 加锁
s.mu.Lock()
// 解锁(defer)
defer s.mu.Unlock()
// 存储资源
s.resources[resource.URI] = resourceEntry{
resource: resource,
handler: handler,
}
}
resource是放入到MCPServer的resources字段中,使用URI作为key,resourceEntry作为value,其中resourceEntry结构如下:
type resourceEntry struct {
resource mcp.Resource
handler ResourceHandlerFunc
}
框架还提供了添加Resource templates的功能,主要是针对动态资源,服务器可以公开 URI 模板 ,客户端可以使用它来构建有效的资源 URI。
// Dynamic resource example - user profiles by ID
template := mcp.NewResourceTemplate(
"users://{id}/profile",
"User Profile",
mcp.WithTemplateDescription("Returns user profile information"),
mcp.WithTemplateMIMEType("application/json"),
)
// Add template with its handler
s.AddResourceTemplate(template, func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
// Extract ID from the URI using regex matching
// The server automatically matches URIs to templates
userID := extractIDFromURI(request.Params.URI)
profile := fmt.Sprintf("Hello %s", userID) // Your DB/API call here
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: request.Params.URI,
MIMEType: "application/json",
Text: profile,
},
}, nil
})
// extractIDFromURI 从给定的 URI 中提取用户 ID。
// 假设 URI 的格式为 "users://{id}/profile"。
func extractIDFromURI(uri string) string {
// 定义正则表达式来匹配 URI 中的 ID
re := regexp.MustCompile(`users://([^/]+)/profile`)
// 使用正则表达式查找匹配项
matches := re.FindStringSubmatch(uri)
// 如果找到了匹配项,并且匹配项的数量正确,则返回 ID
if len(matches) == 2 {
return matches[1]
}
// 如果没有找到匹配项,或者匹配项的数量不正确,则返回空字符串
return ""
}
4.5 Prompts添加
Prompts创建可重复使用的提示模板和工作流程,提示使服务器能够定义可重复使用的提示模板和工作流程。
// Simple greeting prompt
s.AddPrompt(mcp.NewPrompt("greeting",
mcp.WithPromptDescription("A friendly greeting prompt"),
mcp.WithArgument("name",
mcp.ArgumentDescription("Name of the person to greet"),
),
), func(ctx context.Context, request mcp.GetPromptRequest) (*mcp.GetPromptResult, error) {
name := request.Params.Arguments["name"]
if name == "" {
name = "friend"
}
return mcp.NewGetPromptResult(
"A friendly greeting",
[]mcp.PromptMessage{
mcp.NewPromptMessage(
mcp.RoleAssistant,
mcp.NewTextContent(fmt.Sprintf("Hello, %s! How can I help you today?", name)),
),
},
), nil
})
Prompt的结构体如下:
// Prompt 表示服务器提供的提示或提示模板。
// 如果 Arguments 非空且包含元素,则表示该提示是一个模板,
// 在调用 prompts/get 时需要提供参数值。
// 如果 Arguments 为空或为 nil,则这是一个不需要参数的静态提示。
type Prompt struct {
// 提示或提示模板的名称。
Name string `json:"name"`
// 提示提供内容的可选描述。
Description string `json:"description,omitempty"`
// 用于模板化提示的参数列表。
// 参数的存在表明这是一个模板提示。
Arguments []PromptArgument `json:"arguments,omitempty"`
}
// PromptArgument 描述提示模板可以接受的参数。
// 当提示包含参数时,客户端在发出 prompts/get 请求时
// 必须为所有必需参数提供值。
type PromptArgument struct {
// 参数的名称。
Name string `json:"name"`
// 参数的可读描述。
Description string `json:"description,omitempty"`
// 此参数是否必须提供。
// 如果为 true,则客户端在调用 prompts/get 时必须包含此参数。
Required bool `json:"required,omitempty"`
}
可以通过mcp.NewPrompt方法来生成Prompt对象:
func NewPrompt(name string, opts ...PromptOption) Prompt {
prompt := Prompt{
Name: name,
}
for _, opt := range opts {
opt(&prompt)
}
return prompt
}
又见到了opts ...PromptOption,看看有哪些选项:
4.6 选择传输方式
现在代码已经来到了:
// 启动标准输入输出服务器
if err := server.ServeStdio(s); err != nil {
fmt.Printf("Server error: %v\n", err) // 打印服务器错误
}
MCP当前主要提供两类Stdio transport和HTTP with SSE transport 。
4.6.1 Stdio
创建StdioServer
StdioServer的结构体:
type StdioServer struct {
server *MCPServer
errLogger *log.Logger
contextFunc StdioContextFunc
}
其中在,重要的字段有server和contextFunc,给MCPServer进来就是为了具备MCP的能力,只是使用stdio的传输方式。contextFunc是为了让外部自定义的context可以进入到StdioServer,可以用于结束服务和控制超时.
// 为MCP Server 启用stdio
func ServeStdio(server *MCPServer, opts ...StdioOption) error {
// 创建Stdio服务器
s := NewStdioServer(server)
// 设置错误日志
s.SetErrorLogger(log.New(os.Stderr, "", log.LstdFlags))
// 应用选项
for _, opt := range opts {
opt(s)
}
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 设置信号通道
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
// 监听信号
go func() {
<-sigChan
cancel()
}()
// 开始监听
return s.Listen(ctx, os.Stdin, os.Stdout)
}
其中最关键的是Listen方法,它是 StdioServer 类型的一个关键方法,用于监听标准输入输出的 JSON-RPC 消息。
// Listen 启动监听,从提供的输入读取 JSON-RPC 消息,并将响应写入提供的输出。
// 它将持续运行,直到上下文被取消或发生错误。
// 如果在读取输入或写入输出时遇到问题,将返回错误。
func (s *StdioServer) Listen(
ctx context.Context, // 监听过程的上下文,用于控制生命周期和传递请求范围的信息
stdin io.Reader, // 标准输入流,用于读取客户端发送的 JSON-RPC 消息
stdout io.Writer, // 标准输出流,用于将响应写回客户端
) error {
// 由于标准输入输出只有一个客户端,因此设置一个静态客户端上下文,SessionId为stdio
if err := s.server.RegisterSession(&stdioSessionInstance); err != nil {
// 如果会话注册失败,返回错误
return fmt.Errorf("register session: %w", err)
}
// 确保在函数结束时注销会话
defer s.server.UnregisterSession(stdioSessionInstance.SessionID())
// 更新上下文,将会话信息加入
ctx = s.server.WithContext(ctx, &stdioSessionInstance)
// 如果存在自定义上下文函数,则应用该函数修改上下文
if s.contextFunc != nil {
ctx = s.contextFunc(ctx)
}
// 创建一个带缓冲的读取器,用于从标准输入流高效读取数据
reader := bufio.NewReader(stdin)
// 启动一个协程专门处理通知
go func() {
for {
select {
case notification := <-stdioSessionInstance.notifications:
// 收到通知时,调用 writeResponse 方法将通知写入标准输出
err := s.writeResponse(notification, stdout)
if err != nil {
// 如果写入通知时出错,记录错误日志
s.errLogger.Printf("Error writing notification: %v", err)
}
case <-ctx.Done():
// 如果上下文完成,退出协程
return
}
}
}()
// 主循环,用于处理输入消息
for {
select {
case <-ctx.Done():
// 如果上下文完成,返回上下文错误
return ctx.Err()
default:
// 使用协程使读取操作可取消
readChan := make(chan string, 1) // 用于接收读取到的行
errChan := make(chan error, 1) // 用于接收读取错误
go func() {
line, err := reader.ReadString('\n') // 读取一行输入
if err != nil {
errChan <- err // 发送读取错误
return
}
readChan <- line // 发送读取到的行
}()
select {
case <-ctx.Done():
// 如果上下文完成,返回上下文错误
return ctx.Err()
case err := <-errChan:
// 处理读取错误
if err == io.EOF {
// 如果是文件结束符,表示输入结束,返回 nil
return nil
}
// 其他错误则记录日志并返回
s.errLogger.Printf("Error reading input: %v", err)
return err
case line := <-readChan:
// 处理读取到的行
if err := s.processMessage(ctx, line, stdout); err != nil {
// 如果处理消息时出错
if err == io.EOF {
// 如果是文件结束符,返回 nil
return nil
}
// 其他错误则记录日志并返回
s.errLogger.Printf("Error handling message: %v", err)
return err
}
}
}
}
}
4.6.2 SSE
SSE模式凭借其分布式能力、实时性和架构灵活性,成为MCP在 企业级应用、云端协作、动态数据流处理等场景的首选。所以对SSE的支持程度和易用程度,对于一个MCP框架而言是非常重要的。
mark3labs/mcp-go如何支持SSE呢?看看如下代码:
mcpServer := NewMCPServer()
sseServer := server.NewSSEServer(mcpServer, server.WithBaseURL("http://localhost:8080"))
log.Printf("SSE server listening on :8080")
if err := sseServer.Start(":8080"); err != nil {
log.Fatalf("Server error: %v", err)
}
现在已经知道如何在mark3labs/mcp-go中启用SSE了,现在来分析一下,它是如何实现的。
创建SSEServer
SSEServer的结构体如下:
// SSEServer implements a Server-Sent Events (SSE) based MCP server.
// It provides real-time communication capabilities over HTTP using the SSE protocol.
type SSEServer struct {
server *MCPServer // MCPServer 实例,用于处理实际的消息传递和通信逻辑
baseURL string // SSE 服务器的基础 URL,用于构建完整的端点路径
basePath string // SSE 服务器的基础路径,通常用于区分不同的服务或版本
messageEndpoint string // 消息端点的路径,客户端通过此端点发送 JSON-RPC 消息
sseEndpoint string // SSE 端点的路径,客户端通过此端点建立 SSE 连接
sessions sync.Map // 存储活动 SSE 会话的同步映射,用于跟踪和管理客户端连接
srv *http.Server // 内部的 HTTP 服务器实例,用于处理 HTTP 请求和响应
contextFunc SSEContextFunc // 可选的上下文函数,用于根据请求内容自定义上下文
}
与之前的StdioServer相比,SSEServer多了用于http中使用的URI、path,还有srv这是一个httpServer的类型,用于支持HTTP请求和响应。继续查看如何构建SSEServer:
// NewSSEServer creates a new SSE server instance with the given MCP server and options.
func NewSSEServer(server *MCPServer, opts ...SSEOption) *SSEServer {
s := &SSEServer{
server: server,
sseEndpoint: "/sse",
messageEndpoint: "/message",
}
// Apply all options
for _, opt := range opts {
opt(s)
}
return s
}
还是采用的才是经典Option模式,继续看看SSEOption有哪些选项:
| 名称 |
功能 |
使用方式 |
| WithBaseURL |
设置 SSE 服务器的基础 URL |
WithBaseURL("https://example.com") |
| WithBasePath |
设置 SSE 服务器的基础路径 |
WithBasePath("/v1") |
| WithMessageEndpoint |
设置消息端点的路径 |
WithMessageEndpoint("/custom-message") |
| WithSSEEndpoint |
设置 SSE 端点的路径 |
WithSSEEndpoint("/custom-sse") |
| WithHTTPServer |
设置 HTTP 服务器实例(通常用于测试或自定义服务器配置) |
WithHTTPServer(customHttpServer) |
| WithContextFunc |
设置一个函数,用于根据请求内容自定义上下文 |
WithContextFunc(func(ctx context.Context, r *http.Request) context.Context { ... }) |
WithHTTPServer适用于需要自定义服务器配置的场景,为后续替换更好性能的http服务实例打下基础,也体现了mark3labs/mcp-go扩展性。
剩下的Start()就是常见的启动http服务实例的功能了。
func (s *SSEServer) Start(addr string) error {
s.srv = &http.Server{
Addr: addr,
Handler: s,
}
return s.srv.ListenAndServe()
}
还可以调用Shutdown()实现对服务器的关闭。
4.6.3 集成到gin框架
在实际开发中,很多公司内部的业务有自己的框架,集成了许许多多的独特功能。总不能为了使用MCP重写一套Web框架,此时就需要使用到mark3labs/mcp-go集成到Web框架的能力了。下面以gin框架为例:
// 创建一个新的 Gin 引擎
r := gin.Default()
// 创建一个新的 MCPServer 实例(假设这是 SSEServer 所需的)
mcpServer := server.NewMCPServer("gin-mcp-server", "1.0.0") // 根据你的实际代码调整
// mcpServer 新加Tool、Resource、Prompt
// ... ...
// 创建一个新的 SSEServer 实例,并传入 MCPServer
sseServer := server.NewSSEServer(mcpServer)
// 将 SSEServer 的 SSE 端点和处理函数集成到 Gin 路由中
r.GET(sseServer.CompleteSsePath(), func(c *gin.Context) {
sseServer.ServeHTTP(c.Writer, c.Request)
})
// 将 SSEServer 的消息端点和处理函数集成到 Gin 路由中
r.POST(sseServer.CompleteMessagePath(), func(c *gin.Context) {
sseServer.ServeHTTP(c.Writer, c.Request)
})
// 启动 Gin 服务器
if err := r.Run("localhost:8081"); err != nil {
log.Fatalf("Gin server startup failed: %v", err)
}
上述的代码会生成两个路由:
|
路由
|
方法
|
作用
|
实例
|
|
/sse
|
GET
|
|
请求:
curl 'http://localhost:3000/sse?transportType=sse&url=http%3A%2F%2Flocalhost%3A8081%2Fsse' \
-H 'Accept: */*' \
-H 'Accept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7' \
-H 'Cache-Control: no-cache' \
-H 'Connection: keep-alive' \
-H 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36'
响应:
event: endpoint
data: /message?sessionId=fee6d6df-d394-4b4d-a748-fddcc73fb766
|
|
/message
|
POST
|
|
请求:
curl 'http://localhost:3000/message?sessionId=fee6d6df-d394-4b4d-a748-fddcc73fb766' \
-H 'Accept: */*' \
-H 'Cache-Control: no-cache' \
-H 'Connection: keep-alive' \
-H 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36' \
-H 'content-type: application/json'
--data-raw '{"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{"sampling":{},"roots":{"listChanged":true}},"clientInfo":{"name":"mcp-inspector","version":"0.7.0"}},"jsonrpc":"2.0","id":0}'
/message的响应:
{"jsonrpc":"2.0","id":0,"result":{"protocolVersion":"2024-11-05","capabilities":{"logging":{},"prompts":{"listChanged":true},"resources":{"subscribe":true,"listChanged":true},"tools":{}},"serverInfo":{"name":"example-servers/everything","version":"1.0.0"}}}
/sse收到的响应:
event: message
data: {"jsonrpc":"2.0","id":0,"result":{"protocolVersion":"2024-11-05","capabilities":{"logging":{},"prompts":{"listChanged":true},"resources":{"subscribe":true,"listChanged":true},"tools":{}},"serverInfo":{"name":"example-servers/everything","version":"1.0.0"}}}
|
4.7 处理请求
之前的内容,解析了如何构建MCP Server的实践和背后的实现。下面我们还需要了解mark3labs/mcp-go如何接受请求并进行响应的。
4.7.1 路由入口
从SSEServer结构体中已经知道使用的http.Server,所以其接受请求的入口为ServeHTTP方法,实现了 http.Handler 接口,用于处理 HTTP 请求。根据请求的路径,它会将请求分发到不同的处理方法(handleSSE 或 handleMessage),或者返回 404 未找到。
func (s *SSEServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 获取请求的路径
path := r.URL.Path
// 使用精确路径匹配,而不是模糊包含
ssePath := s.CompleteSsePath() // 获取完整的 SSE 路径
if ssePath != "" && path == ssePath {
// 如果请求路径与 SSE 路径匹配,则处理 SSE 请求
s.handleSSE(w, r)
return // 处理完成后直接返回,不再继续后续逻辑
}
// 获取消息处理的完整路径
messagePath := s.CompleteMessagePath()
if messagePath != "" && path == messagePath {
// 如果请求路径与消息处理路径匹配,则处理消息请求
s.handleMessage(w, r)
return // 处理完成后直接返回,不再继续后续逻辑
}
// 如果请求路径不匹配任何已知路径,则返回 404 未找到
http.NotFound(w, r)
}
其中:
这些信息都是可以在NewSSEServer()方法中设置。
需要详细查看的是 s.handleSSE(w, r)和s.handleMessage(w, r)方法,他们分别处理/see和/message的请求。
4.7.2 handleSSE
handleSSE实现了一个处理服务器发送事件(SSE)的HTTP处理器。SSE是一种允许服务器向客户端发送自动更新的技术。主要流程:
-
请求方法检查:只允许GET请求。
-
设置响应头:设置适当的SSE响应头。
-
创建会话:为每个客户端创建一个新的SSE会话。
-
注册和注销会话:在服务器中注册会话,并在处理完成后注销。
-
通知处理器:启动一个goroutine处理来自通知通道的事件,并将其发送到客户端。
-
主事件循环:处理来自事件队列的事件,并将其发送到客户端。
// 它设置适当的头信息并为客户端创建一个新的会话。
func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) {
// 1. 检查请求方法是否为GET,如果不是,返回405 Method Not Allowed错误
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 2. 设置SSE相关的响应头
w.Header().Set("Content-Type", "text/event-stream") // 设置内容类型为text/event-stream
w.Header().Set("Cache-Control", "no-cache") // 禁用缓存
w.Header().Set("Connection", "keep-alive") // 保持连接活跃
w.Header().Set("Access-Control-Allow-Origin", "*") // 允许所有域跨域请求
// 检查ResponseWriter是否支持Flush,如果不支持,返回500 Internal Server Error错误
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
// 3. 创建一个新的会话ID和会话对象
sessionID := uuid.New().String() // 生成唯一的会话ID
session := &sseSession{
writer: w, // 响应写入器
flusher: flusher, // 刷新器
done: make(chan struct{}), // 用于通知会话结束的通道
eventQueue: make(chan string, 100), // 事件队列,缓冲区大小为100
sessionID: sessionID, // 会话ID
notificationChannel: make(chan mcp.JSONRPCNotification, 100), // 通知通道,缓冲区大小为100
}
// 4. 将会话存储到会话存储中,并在处理完成后删除
s.sessions.Store(sessionID, session)
defer s.sessions.Delete(sessionID)
// 在服务器中注册会话,如果注册失败,返回500 Internal Server Error错误
if err := s.server.RegisterSession(session); err != nil {
http.Error(w, fmt.Sprintf("Session registration failed: %v", err), http.StatusInternalServerError)
return
}
// 在处理完成后注销会话
defer s.server.UnregisterSession(sessionID)
// 5. 启动一个goroutine处理通知通道中的事件
go func() {
for {
select {
case notification := <-session.notificationChannel: // 从通知通道接收通知
eventData, err := json.Marshal(notification) // 将通知序列化为JSON
if err == nil {
select {
case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData): // 将事件发送到事件队列
// 事件成功入队
case <-session.done: // 如果会话结束,退出goroutine
return
}
}
case <-session.done: // 如果会话结束,退出goroutine
return
case <-r.Context().Done(): // 如果请求上下文被取消,退出goroutine
return
}
}
}()
// 生成消息端点URL并发送初始的endpoint事件
messageEndpoint := fmt.Sprintf("%s?sessionId=%s", s.CompleteMessageEndpoint(), sessionID)
fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", messageEndpoint) // 发送endpoint事件
flusher.Flush() // 刷新响应,确保事件立即发送到客户端
// 6. 主事件循环,运行在HTTP处理器goroutine中
for {
select {
case event := <-session.eventQueue: // 从事件队列接收事件
fmt.Fprint(w, event) // 将事件写入响应
flusher.Flush() // 刷新响应,确保事件立即发送到客户端
case <-r.Context().Done(): // 如果请求上下文被取消,关闭会话并退出
close(session.done)
return
}
}
}
第三阶段又看到Session了,与之前的stdioSession相比,sseSession明显复杂多了,它们都实现接口ClientSession:
type ClientSession interface {
// NotificationChannel provides a channel suitable for sending notifications to client.
NotificationChannel() chan<- mcp.JSONRPCNotification
// SessionID is a unique identifier used to track user session.
SessionID() string
}
sseSession是专门用于表示一个基于服务器发送事件(Server-Sent Events, SSE)协议的活跃连接。sseSession负责管理客户端与服务器之间的单向实时数据推送,并保持会话。其结构体如下:
type sseSession struct {
writer http.ResponseWriter // HTTP响应写入器,用于向客户端发送数据
flusher http.Flusher // HTTP刷新器,用于刷新响应缓冲区,确保数据立即发送给客户端
done chan struct{} // 用于通知会话结束的通道
eventQueue chan string // 用于排队事件的通道,存储待发送给客户端的事件
sessionID string // 会话的唯一标识符
notificationChannel chan mcp.JSONRPCNotification // 用于接收JSON-RPC通知的通道
}
4.7.3 handleMessage
handleMessage 方法是 SSEServer 类型的一个方法,用于处理传入的 JSON-RPC 消息,并通过 SSE 连接和 HTTP 响应返回结果。其主要流程:
-
请求验证:检查请求方法
-
检查请求方法:方法首先检查请求方法是否为 HTTP POST。如果不是,返回 "Method not allowed" 错误,并终止处理。
-
验证 sessionId 参数:从请求的 URL 查询参数中获取 sessionId。如果缺失,返回 "Missing sessionId" 错误。
-
加载会话:使用 sessionId 从会话存储中加载会话。如果会话不存在,返回 "Invalid session ID" 错误。
-
设置上下文
-
调用 s.server.WithContext 方法,将请求上下文和会话信息合并,生成新的上下文。如果提供了 contextFunc,则进一步处理上下文。
-
解析 JSON-RPC 消息
-
使用 json.NewDecoder 解析请求体中的原始 JSON 消息。如果解析失败,返回 "Parse error" 错误。
-
处理消息
-
将解析后的消息传递给 s.server.HandleMessage 方法进行处理,并获取响应结果。
-
发送响应
如果 HandleMessage 返回了响应(非通知),则:
如果 HandleMessage 没有返回响应(通知),则仅设置 HTTP 响应状态码为 202 Accepted,不发送响应体。
-
将响应编码为 JSON 格式。
-
将响应事件加入会话的事件队列,供 SSE 连接发送。
-
设置 HTTP 响应头为 application/json,状态码为 202 Accepted,并发送响应体。
-
事件队列处理
尝试将事件加入会话的事件队列。如果队列已满或会话已关闭,则丢弃事件。
那么handleSSE和handleMessage的关系是怎样的呢?使用一张图来说明:
// handleMessage 处理来自客户端的JSON-RPC消息,并通过SSE连接和HTTP响应返回结果
func (s *SSEServer) handleMessage(w http.ResponseWriter, r *http.Request) {
// 1. 如果请求方法不是POST,则返回JSON-RPC错误响应,表示方法不允许
if r.Method != http.MethodPost {
s.writeJSONRPCError(w, nil, mcp.INVALID_REQUEST, "Method not allowed")
return
}
// 从请求的URL查询参数中获取sessionId
sessionID := r.URL.Query().Get("sessionId")
// 如果sessionId为空,则返回JSON-RPC错误响应,表示缺少sessionId参数
if sessionID == "" {
s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Missing sessionId")
return
}
// 从session存储中加载与sessionId对应的session
sessionI, ok := s.sessions.Load(sessionID)
// 如果session不存在,则返回JSON-RPC错误响应,表示无效的session ID
if !ok {
s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Invalid session ID")
return
}
session := sessionI.(*sseSession)
// 在处理消息之前设置客户端上下文
ctx := s.server.WithContext(r.Context(), session)
// 如果提供了自定义的上下文函数,则应用它
if s.contextFunc != nil {
ctx = s.contextFunc(ctx, r)
}
// 将请求体解析为原始JSON消息
var rawMessage json.RawMessage
if err := json.NewDecoder(r.Body).Decode(&rawMessage); err != nil {
// 如果解析失败,则返回JSON-RPC错误响应,表示解析错误
s.writeJSONRPCError(w, nil, mcp.PARSE_ERROR, "Parse error")
return
}
// 通过MCPServer处理消息
response := s.server.HandleMessage(ctx, rawMessage)
// 如果存在响应(非通知),则发送响应
if response != nil {
// 将响应编码为JSON格式
eventData, _ := json.Marshal(response)
// 将事件排队以通过SSE发送
select {
case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData):
// 事件成功排队
case <-session.done:
// 会话已关闭,不尝试排队
default:
// 队列已满,可以记录此情况
}
// 发送HTTP响应
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(response)
} else {
// 对于通知,只发送202 Accepted状态码,无响应体
w.WriteHeader(http.StatusAccepted)
}
}
需要注意的是HandleMessage方法,这是通过server/internal/gen/request_handler.go.tmpl生成的,也根据MCP协议实现的模版,其流程如下:
-
尝试将原始消息解析为对应请求类型。
-
如果解析失败,记录错误信息。
-
执行请求前的钩子函数。
-
调用对应的处理函数处理请求。
-
如果处理过程中发生错误,执行错误钩子函数并返回错误响应。
-
执行请求后的钩子函数。
-
返回成功响应。
以initialize请求为例:
// 根据消息方法进行分情况处理
switch baseMessage.Method {
// 初始化请求处理
case mcp.MethodInitialize:
var request mcp.InitializeRequest
var result *mcp.InitializeResult
// 尝试将原始消息解析为初始化请求类型
if unmarshalErr := json.Unmarshal(message, &request); unmarshalErr != nil {
// 如果解析失败,记录错误信息
err = &requestError{
id: baseMessage.ID,
code: mcp.INVALID_REQUEST,
err: &UnparseableMessageError{message: message, err: unmarshalErr, method: baseMessage.Method},
}
} else {
// 执行初始化请求前的钩子函数
s.hooks.beforeInitialize(baseMessage.ID, &request)
// 处理初始化请求
result, err = s.handleInitialize(ctx, baseMessage.ID, request)
}
// 如果处理过程中发生错误
if err != nil {
// 执行错误钩子函数
s.hooks.onError(baseMessage.ID, baseMessage.Method, &request, err)
// 返回错误响应
return err.ToJSONRPCError()
}
// 执行初始化请求后的钩子函数
s.hooks.afterInitialize(baseMessage.ID, &request, result)
// 返回成功响应
return createResponse(baseMessage.ID, *result)
// 其他方法处理逻辑类似,省略...
// 如果方法不匹配任何已知方法,返回方法未找到错误响应
default:
return createErrorResponse(
baseMessage.ID,
mcp.METHOD_NOT_FOUND,
fmt.Sprintf("Method %s not found", baseMessage.Method),
)
}
代码比较多,可以自行查看:https://github.com/mark3labs/mcp-go/blob/e183dd17cfec07072a188f6169033bf61f7bf37d/server/request_handler.go#L12
五、总结
mark3labs/mcp-go框架是一个简单易用的MCP框架,基本上实现了MCP协议,提供对 MCP 核心规范的完整支持,包括资源(Resources)、工具(Tools)、提示(Prompts)等核心组件,确保与主流 LLM 客户端(如 Claude、Cline)的兼容性。尤其是mark3labs/mcp-go提供的hooks机制,可以让开发者更好的使用类似gin空间一样的中间件能力,比如实现统一鉴权等能力。除此之外,还可以与主流的Web框架,如gin框架进行集成,进一步扩展了mark3labs/mcp-go的适用性。