万字长文解读MCP框架,让你掌握mark3labs/mcp-go
一、引言
二、MCP-Server的简述
2.1 连接生命周期
2.1.1 初始化连接
- 客户端发送带有协议版本和功能initialize请求。
- 服务器以其协议版本和功能进行响应
- 客户端发送initialized通知作为确认
- 开始正常信息交换
2.1.2 信息交换
- 请求-响应:客户端或服务器发送请求,对方响应
- 通知:任何一方发送单向消息
2.1.3 终止
enum ErrorCode { // Standard JSON-RPC error codes ParseError = -32700, InvalidRequest = -32600, MethodNotFound = -32601, InvalidParams = -32602, InternalError = -32603 }
任何一方都可以终止连接: - 通过close()干净关闭
- 传输断开
- 错误情况
- 请求的错误响应
- 传输中的错误事件
- 协议级错误处理程序
-
2.2 MCP Server的业务能力
Request Method | 发起方 | 响应方 | 描述 |
initialized | Client | Server | 初始化会话 |
tools-list | Client | Server | 发现可用的工具 |
tools/call | Client | Server | 调用工具 |
resources/list | Client | Server | 发现可用的资源 |
resources/read | Client | Server | 获取资源内容 |
resources/templates | Client | Server | 发现可用的参数化资源 |
resources/subscribe | Client | Server | 订阅特定资源,监听其变化事件 |
prompts/list | Client | Server | 发现可用的提示词 |
prompts/get | Client | Server | 获取特定提示词 |
roots/list | Server | Client | 列出服务器有权访问的客户端文件系统根节点(暴露目录和文件) |
sampling/create | Server | Client | 启用服务器的AI生成能力( sampling creation ) |
三、mark3labs/mcp-go示例
package main import ( "context" "errors" "fmt" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" ) func main() { // 创建MCP服务器 s := server.NewMCPServer( "Demo 🚀", // 服务器名称 "1.0.0", // 服务器版本 ) // 添加工具 tool := mcp.NewTool("hello_world", // 工具名称 mcp.WithDescription("Say hello to someone"), // 工具描述 mcp.WithString("name", // 参数名称 mcp.Required(), // 参数是必需的 mcp.Description("Name of the person to greet"), // 参数描述 ), ) // 为工具添加处理器 s.AddTool(tool, helloHandler) // 启动标准输入输出服务器 if err := server.ServeStdio(s); err != nil { fmt.Printf("Server error: %v\n", err) // 打印服务器错误 } } func helloHandler(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { // 从请求参数中获取名字参数,并断言为字符串类型 name, ok := request.Params.Arguments["name"].(string) if !ok { // 如果断言失败,返回错误 return nil, errors.New("name must be a string") } // 返回包含问候语的结果 return mcp.NewToolResultText(fmt.Sprintf("Hello, %s!", name)), nil }
3.1 搭建一个联调环境
$ go build -v -o server
再启动mcp inspetor:
$ npx -y @modelcontextprotocol/inspector ./server
四、源码解读
4.1 MCPServer结构体
type MCPServer struct { mu sync.RWMutex // 用于保护共享资源,确保并发访问时的数据一致性 name string // 服务器的名称,用于标识服务器 version string // 服务器的版本,用于跟踪和管理服务器的不同版本 instructions string // 服务器的指令,通常在初始化响应中返回给客户端,提供使用指南或帮助信息 resources map[string]resourceEntry // 存储服务器支持的资源及其处理函数 resourceTemplates map[string]resourceTemplateEntry // 存储资源模板及其处理函数,支持URI模板匹配多类似资源 prompts map[string]mcp.Prompt // 存储服务器支持的提示,用于与用户交互 promptHandlers map[string]PromptHandlerFunc // 存储处理提示请求的函数,每个提示对应一个处理函数 tools map[string]ServerTool // 存储服务器支持的工具及其处理函数 notificationHandlers map[string]NotificationHandlerFunc // 存储处理传入通知的函数,接收客户端通知并处理 capabilities serverCapabilities // 定义服务器支持的功能特性,包括资源、提示、工具和日志记录等 sessions sync.Map // 存储当前活跃的客户端会话,用于跟踪用户交互 initialized atomic.Bool // 使用原子操作标记服务器是否已初始化,确保线程安全 hooks *Hooks // 存储服务器钩子,允许在请求处理前后或返回错误前执行自定义逻辑 }
MCPServer 是 Model Control Protocol (MCP) 服务器的实现,用于处理包括资源、提示和工具在内的各种类型的请求。
4.2 MCPServer初始化
func NewMCPServer( name, version string, opts ...ServerOption, ) *MCPServer { s := &MCPServer{ resources: make(map[string]resourceEntry), resourceTemplates: make(map[string]resourceTemplateEntry), prompts: make(map[string]mcp.Prompt), promptHandlers: make(map[string]PromptHandlerFunc), tools: make(map[string]ServerTool), name: name, version: version, notificationHandlers: make(map[string]NotificationHandlerFunc), capabilities: serverCapabilities{ tools: nil, resources: nil, prompts: nil, logging: false, }, } for _, opt := range opts { opt(s) } return s }
在NewMCPServer()方法中,我们需要关注的opts ...ServerOption,进一步看看ServerOption有哪些选项:
选项 | 功能 | 使用方式 |
WithResourceCapabilities | 配置资源相关的服务器功能,如订阅和资源列表变化通知 | WithResourceCapabilities(subscribe, listChanged bool) |
WithHooks | 添加钩子函数,用于在请求处理前后执行特定逻辑 | WithHooks(hooks *Hooks) |
WithPromptCapabilities | 配置提示相关的服务器功能,如提示列表变化通知 | WithPromptCapabilities(listChanged bool) |
WithToolCapabilities | 配置工具相关的服务器功能,如工具列表变化通知 | WithToolCapabilities(listChanged bool) |
WithLogging | 启用服务器日志记录功能 | WithLogging() |
WithInstructions | 设置服务器指令,用于在初始化响应中返回给客户端 | WithInstructions(instructions string) |
字段名 | 类型 | 描述 |
OnBeforeAny | []BeforeAnyHookFunc | 在请求被解析后但方法调用前执行的钩子函数。 |
OnSuccess | []OnSuccessHookFunc | 在请求成功生成结果但结果尚未发送给客户端之前执行的钩子函数。 |
OnError | []OnErrorHookFunc | 在请求解析或方法执行过程中发生错误时执行的钩子函数。 |
OnBeforeInitialize | []OnBeforeInitializeFunc | 在处理初始化请求前执行的钩子函数。 |
OnAfterInitialize | []OnAfterInitializeFunc | 在处理初始化请求后执行的钩子函数。 |
OnBeforePing | []OnBeforePingFunc | 在处理 Ping 请求前执行的钩子函数。 |
OnAfterPing | []OnAfterPingFunc | 在处理 Ping 请求后执行的钩子函数。 |
OnBeforeListResources | []OnBeforeListResourcesFunc | 在处理列出资源请求前执行的钩子函数。 |
OnAfterListResources | []OnAfterListResourcesFunc | 在处理列出资源请求后执行的钩子函数。 |
OnBeforeListResourceTemplates | []OnBeforeListResourceTemplatesFunc | 在处理列出资源模板请求前执行的钩子函数。 |
OnAfterListResourceTemplates | []OnAfterListResourceTemplatesFunc | 在处理列出资源模板请求后执行的钩子函数。 |
OnBeforeReadResource | []OnBeforeReadResourceFunc | 在处理读取资源请求前执行的钩子函数。 |
OnAfterReadResource | []OnAfterReadResourceFunc | 在处理读取资源请求后执行的钩子函数。 |
OnBeforeListPrompts | []OnBeforeListPromptsFunc | 在处理列出提示请求前执行的钩子函数。 |
OnAfterListPrompts | []OnAfterListPromptsFunc | 在处理列出提示请求后执行的钩子函数。 |
OnBeforeGetPrompt | []OnBeforeGetPromptFunc | 在处理获取提示请求前执行的钩子函数。 |
OnAfterGetPrompt | []OnAfterGetPromptFunc | 在处理获取提示请求后执行的钩子函数。 |
OnBeforeListTools | []OnBeforeListToolsFunc | 在处理列出工具请求前执行的钩子函数。 |
OnAfterListTools | []OnAfterListToolsFunc | 在处理列出工具请求后执行的钩子函数。 |
OnBeforeCallTool | []OnBeforeCallToolFunc | 在处理调用工具请求前执行的钩子函数。 |
OnAfterCallTool | []OnAfterCallToolFunc | 在处理调用工具请求后执行的钩子函数。 |
hooks := &server.Hooks{} hooks.AddBeforeAny(func(id any, method mcp.MCPMethod, message any) { fmt.Printf("beforeAny: %s, %v, %v\n", method, id, message) }) hooks.AddOnSuccess(func(id any, method mcp.MCPMethod, message any, result any) { fmt.Printf("onSuccess: %s, %v, %v, %v\n", method, id, message, result) }) hooks.AddOnError(func(id any, method mcp.MCPMethod, message any, err error) { fmt.Printf("onError: %s, %v, %v, %v\n", method, id, message, err) }) hooks.AddBeforeInitialize(func(id any, message *mcp.InitializeRequest) { fmt.Printf("beforeInitialize: %v, %v\n", id, message) }) hooks.AddAfterInitialize(func(id any, message *mcp.InitializeRequest, result *mcp.InitializeResult) { fmt.Printf("afterInitialize: %v, %v, %v\n", id, message, result) }) hooks.AddAfterCallTool(func(id any, message *mcp.CallToolRequest, result *mcp.CallToolResult) { fmt.Printf("afterCallTool: %v, %v, %v\n", id, message, result) }) hooks.AddBeforeCallTool(func(id any, message *mcp.CallToolRequest) { fmt.Printf("beforeCallTool: %v, %v\n", id, message) }) // 创建MCP服务器 s := server.NewMCPServer( "Demo 🚀", // 服务器名称 "1.0.0", // 服务器版本 server.WithLogging(), server.WithToolCapabilities(true), server.WithResourceCapabilities(true, true), server.WithPromptCapabilities(true), server.WithInstructions("initialized"), server.WithHooks(hooks), )
4.3 Tools模块
4.3.1 创建Tool
- mcp.NewTool(name string, opts ...ToolOption) Tool
- mcp.NewToolWithRawSchema(name, description string, schema json.RawMessage) Tool
方式一:mcp.NewTool() tool := mcp.NewTool("hello_world", // 工具名称 mcp.WithDescription("Say hello to someone"), // 工具描述 mcp.WithString("name", // 参数名称 mcp.Required(), // 参数是必需的 mcp.Description("Name of the person to greet"), // 参数描述 ), ) 方式二:mcp.NewToolWithRawSchema() rawSchema := json.RawMessage(`{ "type": "object", "properties": { "name": {"type": "string", "description": "Name of the person to greet"} }, "required": ["name"] }`) // Create a tool with raw schema toolRS := mcp.NewToolWithRawSchema("hello_world_1", "Say hello to someone", rawSchema)
其中rawSchema的结构需要符合ToolInputSchema结构体:
type ToolInputSchema struct { Type string `json:"type"` Properties map[string]interface{} `json:"properties,omitempty"` Required []string `json:"required,omitempty"` }
需要注意的是Properties来源于jsonSchema,所以具备的需要校验属性,比如default、maximum、minimum、maxLength、minLength、enum等等。其中key为请求传入的参数字段,interface{}为对key的各种属性校验。
4.3.2 存放Tool
// AddTool registers a new tool and its handler func (s *MCPServer) AddTool(tool mcp.Tool, handler ToolHandlerFunc) { s.AddTools(ServerTool{Tool: tool, Handler: handler}) } // AddTools registers multiple tools at once func (s *MCPServer) AddTools(tools ...ServerTool) { // 检查工具 if s.capabilities.tools == nil { s.capabilities.tools = &toolCapabilities{} } // 加锁 s.mu.Lock() // 遍历工具 for _, entry := range tools { s.tools[entry.Tool.Name] = entry } // 获取初始化状态 initialized := s.initialized.Load() s.mu.Unlock() // 发送通知 if initialized { s.sendNotificationToAllClients("notifications/tools/list_changed", nil) } }
可以看出Tool是放入到MCPServer的tools字段中,使用name作为key,ServerTool作为value,其中ServerTool结构如下:
type ServerTool struct { Tool mcp.Tool Handler ToolHandlerFunc }
框架还提供了:
- DeleteTools(names... string)作为删除Tool关联的方法。
- SetTools(tools ...ServerTool)可以设置当前所有的Tool 列表。
4.4 Resource模块
// Static resource example - exposing a README file resource := mcp.NewResource( "docs://readme", "Project README", mcp.WithResourceDescription("The project's README file"), mcp.WithMIMEType("text/markdown"), ) // Add resource with its handler s.AddResource(resource, func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) { content, err := os.ReadFile("main.go") if err != nil { return nil, err } return []mcp.ResourceContents{ mcp.TextResourceContents{ URI: "docs://readme", MIMEType: "text/markdown", Text: string(content), }, }, nil })
- File contents 文件内容
- Database records 数据库记录
- API responses API响应
- Live system data 实时系统数据
- images 图像
- Log files 日志文件
- 等等... ...
type Resource struct { Annotated // 包含可选注解,用于告知客户端如何使用或显示对象 // 资源的URI URI string `json:"uri"` // 资源的唯一标识符,用于定位和访问资源 // 资源的可读名称 // // 客户端可以使用此名称来填充UI元素 Name string `json:"name"` // 资源的显示名称,便于用户理解和界面展示 // 对此资源所代表内容的描述 // // 客户端可以使用此描述来帮助大型语言模型(LLM)理解可用资源 // 这可以看作是对模型的“提示” Description string `json:"description,omitempty"` // 资源的详细描述,为模型提供上下文信息 // 如果已知,此资源的MIME类型 MIMEType string `json:"mimeType,omitempty"` // 资源的媒体类型,如text/plain、image/jpeg等,用于指示资源的内容格式 } type Annotated struct { Annotations *struct { // 描述此对象或数据的预期客户是谁 // // 它可以包含多个条目,以指示对多个受众有用的内容(例如,`["user", "assistant"]`) Audience []Role `json:"audience,omitempty"` // 受众群体,指示数据对哪些角色或用户群体有用 // 描述此数据对服务器操作的重要性 // // 值为1表示“最重要”,并指示数据实际上是必需的,而0表示“最不重要”,并指示数据完全是可选的 Priority float64 `json:"priority,omitempty"` // 优先级,表示数据对服务器操作的重要程度,范围从0(最不重要)到1(最重要) } `json:"annotations,omitempty"` }
其中:
- URI用于定位一个具体资源的标识,场景的有http://、file://、postgres://等等还可以去https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml 看看。
- MIMEType表示文件的类型,常见的有 text/html、image/png,这里也可以查看到更多:https://www.iana.org/assignments/media-types/media-types.xhtml。
func (s *MCPServer) AddResource( resource mcp.Resource, handler ResourceHandlerFunc, ) { // 检查资源 if s.capabilities.resources == nil { s.capabilities.resources = &resourceCapabilities{} } // 加锁 s.mu.Lock() // 解锁(defer) defer s.mu.Unlock() // 存储资源 s.resources[resource.URI] = resourceEntry{ resource: resource, handler: handler, } }
resource是放入到MCPServer的resources字段中,使用URI作为key,resourceEntry作为value,其中resourceEntry结构如下:
type resourceEntry struct { resource mcp.Resource handler ResourceHandlerFunc }
框架还提供了添加Resource templates的功能,主要是针对动态资源,服务器可以公开 URI 模板 ,客户端可以使用它来构建有效的资源 URI。
// Dynamic resource example - user profiles by ID template := mcp.NewResourceTemplate( "users://{id}/profile", "User Profile", mcp.WithTemplateDescription("Returns user profile information"), mcp.WithTemplateMIMEType("application/json"), ) // Add template with its handler s.AddResourceTemplate(template, func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) { // Extract ID from the URI using regex matching // The server automatically matches URIs to templates userID := extractIDFromURI(request.Params.URI) profile := fmt.Sprintf("Hello %s", userID) // Your DB/API call here return []mcp.ResourceContents{ mcp.TextResourceContents{ URI: request.Params.URI, MIMEType: "application/json", Text: profile, }, }, nil }) // extractIDFromURI 从给定的 URI 中提取用户 ID。 // 假设 URI 的格式为 "users://{id}/profile"。 func extractIDFromURI(uri string) string { // 定义正则表达式来匹配 URI 中的 ID re := regexp.MustCompile(`users://([^/]+)/profile`) // 使用正则表达式查找匹配项 matches := re.FindStringSubmatch(uri) // 如果找到了匹配项,并且匹配项的数量正确,则返回 ID if len(matches) == 2 { return matches[1] } // 如果没有找到匹配项,或者匹配项的数量不正确,则返回空字符串 return "" }
4.5 Prompts添加
// Simple greeting prompt s.AddPrompt(mcp.NewPrompt("greeting", mcp.WithPromptDescription("A friendly greeting prompt"), mcp.WithArgument("name", mcp.ArgumentDescription("Name of the person to greet"), ), ), func(ctx context.Context, request mcp.GetPromptRequest) (*mcp.GetPromptResult, error) { name := request.Params.Arguments["name"] if name == "" { name = "friend" } return mcp.NewGetPromptResult( "A friendly greeting", []mcp.PromptMessage{ mcp.NewPromptMessage( mcp.RoleAssistant, mcp.NewTextContent(fmt.Sprintf("Hello, %s! How can I help you today?", name)), ), }, ), nil })
// Prompt 表示服务器提供的提示或提示模板。 // 如果 Arguments 非空且包含元素,则表示该提示是一个模板, // 在调用 prompts/get 时需要提供参数值。 // 如果 Arguments 为空或为 nil,则这是一个不需要参数的静态提示。 type Prompt struct { // 提示或提示模板的名称。 Name string `json:"name"` // 提示提供内容的可选描述。 Description string `json:"description,omitempty"` // 用于模板化提示的参数列表。 // 参数的存在表明这是一个模板提示。 Arguments []PromptArgument `json:"arguments,omitempty"` } // PromptArgument 描述提示模板可以接受的参数。 // 当提示包含参数时,客户端在发出 prompts/get 请求时 // 必须为所有必需参数提供值。 type PromptArgument struct { // 参数的名称。 Name string `json:"name"` // 参数的可读描述。 Description string `json:"description,omitempty"` // 此参数是否必须提供。 // 如果为 true,则客户端在调用 prompts/get 时必须包含此参数。 Required bool `json:"required,omitempty"` }
可以通过mcp.NewPrompt方法来生成Prompt对象:
func NewPrompt(name string, opts ...PromptOption) Prompt { prompt := Prompt{ Name: name, } for _, opt := range opts { opt(&prompt) } return prompt }
又见到了opts ...PromptOption,看看有哪些选项:
- WithPromptDescription用于设置description;
- WithArgument用于设置arguments;
4.6 选择传输方式
// 启动标准输入输出服务器 if err := server.ServeStdio(s); err != nil { fmt.Printf("Server error: %v\n", err) // 打印服务器错误 }
MCP当前主要提供两类Stdio transport和HTTP with SSE transport 。
4.6.1 Stdio
创建StdioServer
type StdioServer struct { server *MCPServer errLogger *log.Logger contextFunc StdioContextFunc }
其中在,重要的字段有server和contextFunc,给MCPServer进来就是为了具备MCP的能力,只是使用stdio的传输方式。contextFunc是为了让外部自定义的context可以进入到StdioServer,可以用于结束服务和控制超时.
// 为MCP Server 启用stdio func ServeStdio(server *MCPServer, opts ...StdioOption) error { // 创建Stdio服务器 s := NewStdioServer(server) // 设置错误日志 s.SetErrorLogger(log.New(os.Stderr, "", log.LstdFlags)) // 应用选项 for _, opt := range opts { opt(s) } // 创建上下文 ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 设置信号通道 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) // 监听信号 go func() { <-sigChan cancel() }() // 开始监听 return s.Listen(ctx, os.Stdin, os.Stdout) }
其中最关键的是Listen方法,它是 StdioServer 类型的一个关键方法,用于监听标准输入输出的 JSON-RPC 消息。
// Listen 启动监听,从提供的输入读取 JSON-RPC 消息,并将响应写入提供的输出。 // 它将持续运行,直到上下文被取消或发生错误。 // 如果在读取输入或写入输出时遇到问题,将返回错误。 func (s *StdioServer) Listen( ctx context.Context, // 监听过程的上下文,用于控制生命周期和传递请求范围的信息 stdin io.Reader, // 标准输入流,用于读取客户端发送的 JSON-RPC 消息 stdout io.Writer, // 标准输出流,用于将响应写回客户端 ) error { // 由于标准输入输出只有一个客户端,因此设置一个静态客户端上下文,SessionId为stdio if err := s.server.RegisterSession(&stdioSessionInstance); err != nil { // 如果会话注册失败,返回错误 return fmt.Errorf("register session: %w", err) } // 确保在函数结束时注销会话 defer s.server.UnregisterSession(stdioSessionInstance.SessionID()) // 更新上下文,将会话信息加入 ctx = s.server.WithContext(ctx, &stdioSessionInstance) // 如果存在自定义上下文函数,则应用该函数修改上下文 if s.contextFunc != nil { ctx = s.contextFunc(ctx) } // 创建一个带缓冲的读取器,用于从标准输入流高效读取数据 reader := bufio.NewReader(stdin) // 启动一个协程专门处理通知 go func() { for { select { case notification := <-stdioSessionInstance.notifications: // 收到通知时,调用 writeResponse 方法将通知写入标准输出 err := s.writeResponse(notification, stdout) if err != nil { // 如果写入通知时出错,记录错误日志 s.errLogger.Printf("Error writing notification: %v", err) } case <-ctx.Done(): // 如果上下文完成,退出协程 return } } }() // 主循环,用于处理输入消息 for { select { case <-ctx.Done(): // 如果上下文完成,返回上下文错误 return ctx.Err() default: // 使用协程使读取操作可取消 readChan := make(chan string, 1) // 用于接收读取到的行 errChan := make(chan error, 1) // 用于接收读取错误 go func() { line, err := reader.ReadString('\n') // 读取一行输入 if err != nil { errChan <- err // 发送读取错误 return } readChan <- line // 发送读取到的行 }() select { case <-ctx.Done(): // 如果上下文完成,返回上下文错误 return ctx.Err() case err := <-errChan: // 处理读取错误 if err == io.EOF { // 如果是文件结束符,表示输入结束,返回 nil return nil } // 其他错误则记录日志并返回 s.errLogger.Printf("Error reading input: %v", err) return err case line := <-readChan: // 处理读取到的行 if err := s.processMessage(ctx, line, stdout); err != nil { // 如果处理消息时出错 if err == io.EOF { // 如果是文件结束符,返回 nil return nil } // 其他错误则记录日志并返回 s.errLogger.Printf("Error handling message: %v", err) return err } } } } }
4.6.2 SSE
mcpServer := NewMCPServer() sseServer := server.NewSSEServer(mcpServer, server.WithBaseURL("http://localhost:8080")) log.Printf("SSE server listening on :8080") if err := sseServer.Start(":8080"); err != nil { log.Fatalf("Server error: %v", err) }
现在已经知道如何在mark3labs/mcp-go中启用SSE了,现在来分析一下,它是如何实现的。
创建SSEServer
// SSEServer implements a Server-Sent Events (SSE) based MCP server. // It provides real-time communication capabilities over HTTP using the SSE protocol. type SSEServer struct { server *MCPServer // MCPServer 实例,用于处理实际的消息传递和通信逻辑 baseURL string // SSE 服务器的基础 URL,用于构建完整的端点路径 basePath string // SSE 服务器的基础路径,通常用于区分不同的服务或版本 messageEndpoint string // 消息端点的路径,客户端通过此端点发送 JSON-RPC 消息 sseEndpoint string // SSE 端点的路径,客户端通过此端点建立 SSE 连接 sessions sync.Map // 存储活动 SSE 会话的同步映射,用于跟踪和管理客户端连接 srv *http.Server // 内部的 HTTP 服务器实例,用于处理 HTTP 请求和响应 contextFunc SSEContextFunc // 可选的上下文函数,用于根据请求内容自定义上下文 }
与之前的StdioServer相比,SSEServer多了用于http中使用的URI、path,还有srv这是一个httpServer的类型,用于支持HTTP请求和响应。继续查看如何构建SSEServer:
// NewSSEServer creates a new SSE server instance with the given MCP server and options. func NewSSEServer(server *MCPServer, opts ...SSEOption) *SSEServer { s := &SSEServer{ server: server, sseEndpoint: "/sse", messageEndpoint: "/message", } // Apply all options for _, opt := range opts { opt(s) } return s }
还是采用的才是经典Option模式,继续看看SSEOption有哪些选项:
名称 | 功能 | 使用方式 |
WithBaseURL | 设置 SSE 服务器的基础 URL | WithBaseURL("https://example.com") |
WithBasePath | 设置 SSE 服务器的基础路径 | WithBasePath("/v1") |
WithMessageEndpoint | 设置消息端点的路径 | WithMessageEndpoint("/custom-message") |
WithSSEEndpoint | 设置 SSE 端点的路径 | WithSSEEndpoint("/custom-sse") |
WithHTTPServer | 设置 HTTP 服务器实例(通常用于测试或自定义服务器配置) | WithHTTPServer(customHttpServer) |
WithContextFunc | 设置一个函数,用于根据请求内容自定义上下文 | WithContextFunc(func(ctx context.Context, r *http.Request) context.Context { ... }) |
func (s *SSEServer) Start(addr string) error { s.srv = &http.Server{ Addr: addr, Handler: s, } return s.srv.ListenAndServe() }
还可以调用Shutdown()实现对服务器的关闭。
4.6.3 集成到gin框架
// 创建一个新的 Gin 引擎 r := gin.Default() // 创建一个新的 MCPServer 实例(假设这是 SSEServer 所需的) mcpServer := server.NewMCPServer("gin-mcp-server", "1.0.0") // 根据你的实际代码调整 // mcpServer 新加Tool、Resource、Prompt // ... ... // 创建一个新的 SSEServer 实例,并传入 MCPServer sseServer := server.NewSSEServer(mcpServer) // 将 SSEServer 的 SSE 端点和处理函数集成到 Gin 路由中 r.GET(sseServer.CompleteSsePath(), func(c *gin.Context) { sseServer.ServeHTTP(c.Writer, c.Request) }) // 将 SSEServer 的消息端点和处理函数集成到 Gin 路由中 r.POST(sseServer.CompleteMessagePath(), func(c *gin.Context) { sseServer.ServeHTTP(c.Writer, c.Request) }) // 启动 Gin 服务器 if err := r.Run("localhost:8081"); err != nil { log.Fatalf("Gin server startup failed: %v", err) }
上述的代码会生成两个路由:
路由 | 方法 | 作用 | 实例 |
/sse | GET |
|
|
/message | POST |
|
|
4.7 处理请求
4.7.1 路由入口
func (s *SSEServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { // 获取请求的路径 path := r.URL.Path // 使用精确路径匹配,而不是模糊包含 ssePath := s.CompleteSsePath() // 获取完整的 SSE 路径 if ssePath != "" && path == ssePath { // 如果请求路径与 SSE 路径匹配,则处理 SSE 请求 s.handleSSE(w, r) return // 处理完成后直接返回,不再继续后续逻辑 } // 获取消息处理的完整路径 messagePath := s.CompleteMessagePath() if messagePath != "" && path == messagePath { // 如果请求路径与消息处理路径匹配,则处理消息请求 s.handleMessage(w, r) return // 处理完成后直接返回,不再继续后续逻辑 } // 如果请求路径不匹配任何已知路径,则返回 404 未找到 http.NotFound(w, r) }
其中:
- SSE路径为:s.baseURL + s.basePath + s.sseEndpoint
- Message路径为:s.baseURL + s.basePath + s.messageEndpoint
4.7.2 handleSSE
- 请求方法检查:只允许GET请求。
- 设置响应头:设置适当的SSE响应头。
- 创建会话:为每个客户端创建一个新的SSE会话。
- 注册和注销会话:在服务器中注册会话,并在处理完成后注销。
- 通知处理器:启动一个goroutine处理来自通知通道的事件,并将其发送到客户端。
- 主事件循环:处理来自事件队列的事件,并将其发送到客户端。
// 它设置适当的头信息并为客户端创建一个新的会话。 func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) { // 1. 检查请求方法是否为GET,如果不是,返回405 Method Not Allowed错误 if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // 2. 设置SSE相关的响应头 w.Header().Set("Content-Type", "text/event-stream") // 设置内容类型为text/event-stream w.Header().Set("Cache-Control", "no-cache") // 禁用缓存 w.Header().Set("Connection", "keep-alive") // 保持连接活跃 w.Header().Set("Access-Control-Allow-Origin", "*") // 允许所有域跨域请求 // 检查ResponseWriter是否支持Flush,如果不支持,返回500 Internal Server Error错误 flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported", http.StatusInternalServerError) return } // 3. 创建一个新的会话ID和会话对象 sessionID := uuid.New().String() // 生成唯一的会话ID session := &sseSession{ writer: w, // 响应写入器 flusher: flusher, // 刷新器 done: make(chan struct{}), // 用于通知会话结束的通道 eventQueue: make(chan string, 100), // 事件队列,缓冲区大小为100 sessionID: sessionID, // 会话ID notificationChannel: make(chan mcp.JSONRPCNotification, 100), // 通知通道,缓冲区大小为100 } // 4. 将会话存储到会话存储中,并在处理完成后删除 s.sessions.Store(sessionID, session) defer s.sessions.Delete(sessionID) // 在服务器中注册会话,如果注册失败,返回500 Internal Server Error错误 if err := s.server.RegisterSession(session); err != nil { http.Error(w, fmt.Sprintf("Session registration failed: %v", err), http.StatusInternalServerError) return } // 在处理完成后注销会话 defer s.server.UnregisterSession(sessionID) // 5. 启动一个goroutine处理通知通道中的事件 go func() { for { select { case notification := <-session.notificationChannel: // 从通知通道接收通知 eventData, err := json.Marshal(notification) // 将通知序列化为JSON if err == nil { select { case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData): // 将事件发送到事件队列 // 事件成功入队 case <-session.done: // 如果会话结束,退出goroutine return } } case <-session.done: // 如果会话结束,退出goroutine return case <-r.Context().Done(): // 如果请求上下文被取消,退出goroutine return } } }() // 生成消息端点URL并发送初始的endpoint事件 messageEndpoint := fmt.Sprintf("%s?sessionId=%s", s.CompleteMessageEndpoint(), sessionID) fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", messageEndpoint) // 发送endpoint事件 flusher.Flush() // 刷新响应,确保事件立即发送到客户端 // 6. 主事件循环,运行在HTTP处理器goroutine中 for { select { case event := <-session.eventQueue: // 从事件队列接收事件 fmt.Fprint(w, event) // 将事件写入响应 flusher.Flush() // 刷新响应,确保事件立即发送到客户端 case <-r.Context().Done(): // 如果请求上下文被取消,关闭会话并退出 close(session.done) return } } }
第三阶段又看到Session了,与之前的stdioSession相比,sseSession明显复杂多了,它们都实现接口ClientSession:
type ClientSession interface { // NotificationChannel provides a channel suitable for sending notifications to client. NotificationChannel() chan<- mcp.JSONRPCNotification // SessionID is a unique identifier used to track user session. SessionID() string }
sseSession是专门用于表示一个基于服务器发送事件(Server-Sent Events, SSE)协议的活跃连接。sseSession负责管理客户端与服务器之间的单向实时数据推送,并保持会话。其结构体如下:
type sseSession struct { writer http.ResponseWriter // HTTP响应写入器,用于向客户端发送数据 flusher http.Flusher // HTTP刷新器,用于刷新响应缓冲区,确保数据立即发送给客户端 done chan struct{} // 用于通知会话结束的通道 eventQueue chan string // 用于排队事件的通道,存储待发送给客户端的事件 sessionID string // 会话的唯一标识符 notificationChannel chan mcp.JSONRPCNotification // 用于接收JSON-RPC通知的通道 }
4.7.3 handleMessage
- 请求验证:检查请求方法
- 检查请求方法:方法首先检查请求方法是否为 HTTP POST。如果不是,返回 "Method not allowed" 错误,并终止处理。
- 验证 sessionId 参数:从请求的 URL 查询参数中获取 sessionId。如果缺失,返回 "Missing sessionId" 错误。
- 加载会话:使用 sessionId 从会话存储中加载会话。如果会话不存在,返回 "Invalid session ID" 错误。
-
- 设置上下文
- 调用 s.server.WithContext 方法,将请求上下文和会话信息合并,生成新的上下文。如果提供了 contextFunc,则进一步处理上下文。
-
- 解析 JSON-RPC 消息
- 使用 json.NewDecoder 解析请求体中的原始 JSON 消息。如果解析失败,返回 "Parse error" 错误。
-
- 处理消息
- 将解析后的消息传递给 s.server.HandleMessage 方法进行处理,并获取响应结果。
-
- 发送响应如果 HandleMessage 返回了响应(非通知),则:如果 HandleMessage 没有返回响应(通知),则仅设置 HTTP 响应状态码为 202 Accepted,不发送响应体。
- 将响应编码为 JSON 格式。
- 将响应事件加入会话的事件队列,供 SSE 连接发送。
- 设置 HTTP 响应头为 application/json,状态码为 202 Accepted,并发送响应体。
-
- 事件队列处理
// handleMessage 处理来自客户端的JSON-RPC消息,并通过SSE连接和HTTP响应返回结果 func (s *SSEServer) handleMessage(w http.ResponseWriter, r *http.Request) { // 1. 如果请求方法不是POST,则返回JSON-RPC错误响应,表示方法不允许 if r.Method != http.MethodPost { s.writeJSONRPCError(w, nil, mcp.INVALID_REQUEST, "Method not allowed") return } // 从请求的URL查询参数中获取sessionId sessionID := r.URL.Query().Get("sessionId") // 如果sessionId为空,则返回JSON-RPC错误响应,表示缺少sessionId参数 if sessionID == "" { s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Missing sessionId") return } // 从session存储中加载与sessionId对应的session sessionI, ok := s.sessions.Load(sessionID) // 如果session不存在,则返回JSON-RPC错误响应,表示无效的session ID if !ok { s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Invalid session ID") return } session := sessionI.(*sseSession) // 在处理消息之前设置客户端上下文 ctx := s.server.WithContext(r.Context(), session) // 如果提供了自定义的上下文函数,则应用它 if s.contextFunc != nil { ctx = s.contextFunc(ctx, r) } // 将请求体解析为原始JSON消息 var rawMessage json.RawMessage if err := json.NewDecoder(r.Body).Decode(&rawMessage); err != nil { // 如果解析失败,则返回JSON-RPC错误响应,表示解析错误 s.writeJSONRPCError(w, nil, mcp.PARSE_ERROR, "Parse error") return } // 通过MCPServer处理消息 response := s.server.HandleMessage(ctx, rawMessage) // 如果存在响应(非通知),则发送响应 if response != nil { // 将响应编码为JSON格式 eventData, _ := json.Marshal(response) // 将事件排队以通过SSE发送 select { case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData): // 事件成功排队 case <-session.done: // 会话已关闭,不尝试排队 default: // 队列已满,可以记录此情况 } // 发送HTTP响应 w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusAccepted) json.NewEncoder(w).Encode(response) } else { // 对于通知,只发送202 Accepted状态码,无响应体 w.WriteHeader(http.StatusAccepted) } }
- 尝试将原始消息解析为对应请求类型。
- 如果解析失败,记录错误信息。
- 执行请求前的钩子函数。
- 调用对应的处理函数处理请求。
- 如果处理过程中发生错误,执行错误钩子函数并返回错误响应。
- 执行请求后的钩子函数。
- 返回成功响应。
// 根据消息方法进行分情况处理 switch baseMessage.Method { // 初始化请求处理 case mcp.MethodInitialize: var request mcp.InitializeRequest var result *mcp.InitializeResult // 尝试将原始消息解析为初始化请求类型 if unmarshalErr := json.Unmarshal(message, &request); unmarshalErr != nil { // 如果解析失败,记录错误信息 err = &requestError{ id: baseMessage.ID, code: mcp.INVALID_REQUEST, err: &UnparseableMessageError{message: message, err: unmarshalErr, method: baseMessage.Method}, } } else { // 执行初始化请求前的钩子函数 s.hooks.beforeInitialize(baseMessage.ID, &request) // 处理初始化请求 result, err = s.handleInitialize(ctx, baseMessage.ID, request) } // 如果处理过程中发生错误 if err != nil { // 执行错误钩子函数 s.hooks.onError(baseMessage.ID, baseMessage.Method, &request, err) // 返回错误响应 return err.ToJSONRPCError() } // 执行初始化请求后的钩子函数 s.hooks.afterInitialize(baseMessage.ID, &request, result) // 返回成功响应 return createResponse(baseMessage.ID, *result) // 其他方法处理逻辑类似,省略... // 如果方法不匹配任何已知方法,返回方法未找到错误响应 default: return createErrorResponse( baseMessage.ID, mcp.METHOD_NOT_FOUND, fmt.Sprintf("Method %s not found", baseMessage.Method), ) }
代码比较多,可以自行查看:https://github.com/mark3labs/mcp-go/blob/e183dd17cfec07072a188f6169033bf61f7bf37d/server/request_handler.go#L12
五、总结

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
《跟老卫学仓颉编程语言开发》实战:猜数字游戏
以前中央电视台财经频道推出过一档大型演播室互动娱乐节目《购物街》,该节目里面包含了一个叫做“高了低了”的游戏环节。笔者非常喜爱这个游戏。这个游戏环节设置了八个百元价位左右的商品,首先选手要选择编号1-8的商品,之后猜这个商品的价格。在选手猜价格的过程中,主持人会给出高了、低了的提示,直到帮助选手猜出正确价格为止。之后继续选择,以此类推,直到30秒时间到为止。 本节所要介绍的猜数字游戏也是类似的,程序给出一个1到100之间的随机整数,让用户猜。用户猜一个数并输入到程序,然后程序会提示猜测是大了还是小了。如果猜对了,它会打印祝贺信息并退出。 本节通过仓颉语言,来开发一个简单的猜数字游戏,综合运用了流程控制、标准输入、字符串的操作、整型的比较等知识。 本节示例可以在“guessing_game”应用下找到。 输入数字 在程序界面输入数字代表用户猜数字的实现。如何实现在程序界面输入数字?这里就需要用到std.console包,该模块包含许多在执行输入和输出时需要的常见操作。 import std.console.* // 标准输入流(stdIn)读取一行 let line = Console....
- 下一篇
算力,并不是大模型厂商发展的护城河
上个月,OpenAI CEO 山姆·奥尔特曼在社交媒体上表示,号称 52 万亿参数量的 GPT-5 将在数月内发布。相较于上一代 GPT-4 的 2 万亿参数,体量上足足增长了 26 倍,虽无公布具体的训练成本,但想必也一定是个天文数字,堪称大模型领域的“力大砖飞”。 反观 LLM 界的“黑天鹅”,DeepSeek-V3 却仅用了 2048 块英伟达 H800 ,耗费了 557.6 万美金就完成了训练,一度引起硅谷恐慌,力证了:算力并非不可逾越的堡垒。 一边是暴力填鸭,一边是技术深化,2025 年的大模型似乎走出了两条截然不同的道路,也逐渐撕开了 AI 行业最残酷的真相:早期以“算力”建立护城河的大模型厂商们,在面对新一轮技术冲击时,高成本的算力反而成为了其灵活发展的累赘。 算力的必要性和局限性 作为数字经济的“新电力”,算力在大模型的训练和推理过程中确实起到了不可或缺的作用。 以 OpenAI 为例,早期在 GPT-4 的训练中,大概就使用了 25000 个 A100 芯片。如果 OpenAI 云计算的成本是差不多 1 美元/每 A100 小时的话,那么在这样的条件下,仅一次训练的成...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2全家桶,快速入门学习开发网站教程
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8编译安装MySQL8.0.19
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Red5直播服务器,属于Java语言的直播服务器