万字长文解读MCP框架,让你掌握mark3labs/mcp-go
一、引言
二、MCP-Server的简述
2.1 连接生命周期
2.1.1 初始化连接
-
客户端发送带有协议版本和功能initialize请求。
-
服务器以其协议版本和功能进行响应
-
客户端发送initialized通知作为确认
-
开始正常信息交换
2.1.2 信息交换
-
请求-响应:客户端或服务器发送请求,对方响应
-
通知:任何一方发送单向消息
2.1.3 终止
enum ErrorCode {
// Standard JSON-RPC error codes
ParseError = -32700,
InvalidRequest = -32600,
MethodNotFound = -32601,
InvalidParams = -32602,
InternalError = -32603
}
任何一方都可以终止连接:
-
通过close()干净关闭
-
传输断开
-
错误情况
-
请求的错误响应
-
传输中的错误事件
-
协议级错误处理程序
-
2.2 MCP Server的业务能力
Request Method | 发起方 | 响应方 | 描述 |
initialized | Client | Server | 初始化会话 |
tools-list | Client | Server | 发现可用的工具 |
tools/call | Client | Server | 调用工具 |
resources/list | Client | Server | 发现可用的资源 |
resources/read | Client | Server | 获取资源内容 |
resources/templates | Client | Server | 发现可用的参数化资源 |
resources/subscribe | Client | Server | 订阅特定资源,监听其变化事件 |
prompts/list | Client | Server | 发现可用的提示词 |
prompts/get | Client | Server | 获取特定提示词 |
roots/list | Server | Client | 列出服务器有权访问的客户端文件系统根节点(暴露目录和文件) |
sampling/create | Server | Client | 启用服务器的AI生成能力( sampling creation ) |
三、mark3labs/mcp-go示例
package main
import (
"context"
"errors"
"fmt"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
func main() {
// 创建MCP服务器
s := server.NewMCPServer(
"Demo 🚀", // 服务器名称
"1.0.0", // 服务器版本
)
// 添加工具
tool := mcp.NewTool("hello_world", // 工具名称
mcp.WithDescription("Say hello to someone"), // 工具描述
mcp.WithString("name", // 参数名称
mcp.Required(), // 参数是必需的
mcp.Description("Name of the person to greet"), // 参数描述
),
)
// 为工具添加处理器
s.AddTool(tool, helloHandler)
// 启动标准输入输出服务器
if err := server.ServeStdio(s); err != nil {
fmt.Printf("Server error: %v\n", err) // 打印服务器错误
}
}
func helloHandler(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// 从请求参数中获取名字参数,并断言为字符串类型
name, ok := request.Params.Arguments["name"].(string)
if !ok {
// 如果断言失败,返回错误
return nil, errors.New("name must be a string")
}
// 返回包含问候语的结果
return mcp.NewToolResultText(fmt.Sprintf("Hello, %s!", name)), nil
}
3.1 搭建一个联调环境
$ go build -v -o server
再启动mcp inspetor:
$ npx -y @modelcontextprotocol/inspector ./server
四、源码解读
4.1 MCPServer结构体
type MCPServer struct {
mu sync.RWMutex // 用于保护共享资源,确保并发访问时的数据一致性
name string // 服务器的名称,用于标识服务器
version string // 服务器的版本,用于跟踪和管理服务器的不同版本
instructions string // 服务器的指令,通常在初始化响应中返回给客户端,提供使用指南或帮助信息
resources map[string]resourceEntry // 存储服务器支持的资源及其处理函数
resourceTemplates map[string]resourceTemplateEntry // 存储资源模板及其处理函数,支持URI模板匹配多类似资源
prompts map[string]mcp.Prompt // 存储服务器支持的提示,用于与用户交互
promptHandlers map[string]PromptHandlerFunc // 存储处理提示请求的函数,每个提示对应一个处理函数
tools map[string]ServerTool // 存储服务器支持的工具及其处理函数
notificationHandlers map[string]NotificationHandlerFunc // 存储处理传入通知的函数,接收客户端通知并处理
capabilities serverCapabilities // 定义服务器支持的功能特性,包括资源、提示、工具和日志记录等
sessions sync.Map // 存储当前活跃的客户端会话,用于跟踪用户交互
initialized atomic.Bool // 使用原子操作标记服务器是否已初始化,确保线程安全
hooks *Hooks // 存储服务器钩子,允许在请求处理前后或返回错误前执行自定义逻辑
}
MCPServer 是 Model Control Protocol (MCP) 服务器的实现,用于处理包括资源、提示和工具在内的各种类型的请求。
4.2 MCPServer初始化
func NewMCPServer(
name, version string,
opts ...ServerOption,
) *MCPServer {
s := &MCPServer{
resources: make(map[string]resourceEntry),
resourceTemplates: make(map[string]resourceTemplateEntry),
prompts: make(map[string]mcp.Prompt),
promptHandlers: make(map[string]PromptHandlerFunc),
tools: make(map[string]ServerTool),
name: name,
version: version,
notificationHandlers: make(map[string]NotificationHandlerFunc),
capabilities: serverCapabilities{
tools: nil,
resources: nil,
prompts: nil,
logging: false,
},
}
for _, opt := range opts {
opt(s)
}
return s
}
在NewMCPServer()方法中,我们需要关注的opts ...ServerOption,进一步看看ServerOption有哪些选项:
选项 | 功能 | 使用方式 |
WithResourceCapabilities | 配置资源相关的服务器功能,如订阅和资源列表变化通知 | WithResourceCapabilities(subscribe, listChanged bool) |
WithHooks | 添加钩子函数,用于在请求处理前后执行特定逻辑 | WithHooks(hooks *Hooks) |
WithPromptCapabilities | 配置提示相关的服务器功能,如提示列表变化通知 | WithPromptCapabilities(listChanged bool) |
WithToolCapabilities | 配置工具相关的服务器功能,如工具列表变化通知 | WithToolCapabilities(listChanged bool) |
WithLogging | 启用服务器日志记录功能 | WithLogging() |
WithInstructions | 设置服务器指令,用于在初始化响应中返回给客户端 | WithInstructions(instructions string) |
字段名 | 类型 | 描述 |
OnBeforeAny | []BeforeAnyHookFunc | 在请求被解析后但方法调用前执行的钩子函数。 |
OnSuccess | []OnSuccessHookFunc | 在请求成功生成结果但结果尚未发送给客户端之前执行的钩子函数。 |
OnError | []OnErrorHookFunc | 在请求解析或方法执行过程中发生错误时执行的钩子函数。 |
OnBeforeInitialize | []OnBeforeInitializeFunc | 在处理初始化请求前执行的钩子函数。 |
OnAfterInitialize | []OnAfterInitializeFunc | 在处理初始化请求后执行的钩子函数。 |
OnBeforePing | []OnBeforePingFunc | 在处理 Ping 请求前执行的钩子函数。 |
OnAfterPing | []OnAfterPingFunc | 在处理 Ping 请求后执行的钩子函数。 |
OnBeforeListResources | []OnBeforeListResourcesFunc | 在处理列出资源请求前执行的钩子函数。 |
OnAfterListResources | []OnAfterListResourcesFunc | 在处理列出资源请求后执行的钩子函数。 |
OnBeforeListResourceTemplates | []OnBeforeListResourceTemplatesFunc | 在处理列出资源模板请求前执行的钩子函数。 |
OnAfterListResourceTemplates | []OnAfterListResourceTemplatesFunc | 在处理列出资源模板请求后执行的钩子函数。 |
OnBeforeReadResource | []OnBeforeReadResourceFunc | 在处理读取资源请求前执行的钩子函数。 |
OnAfterReadResource | []OnAfterReadResourceFunc | 在处理读取资源请求后执行的钩子函数。 |
OnBeforeListPrompts | []OnBeforeListPromptsFunc | 在处理列出提示请求前执行的钩子函数。 |
OnAfterListPrompts | []OnAfterListPromptsFunc | 在处理列出提示请求后执行的钩子函数。 |
OnBeforeGetPrompt | []OnBeforeGetPromptFunc | 在处理获取提示请求前执行的钩子函数。 |
OnAfterGetPrompt | []OnAfterGetPromptFunc | 在处理获取提示请求后执行的钩子函数。 |
OnBeforeListTools | []OnBeforeListToolsFunc | 在处理列出工具请求前执行的钩子函数。 |
OnAfterListTools | []OnAfterListToolsFunc | 在处理列出工具请求后执行的钩子函数。 |
OnBeforeCallTool | []OnBeforeCallToolFunc | 在处理调用工具请求前执行的钩子函数。 |
OnAfterCallTool | []OnAfterCallToolFunc | 在处理调用工具请求后执行的钩子函数。 |
hooks := &server.Hooks{}
hooks.AddBeforeAny(func(id any, method mcp.MCPMethod, message any) {
fmt.Printf("beforeAny: %s, %v, %v\n", method, id, message)
})
hooks.AddOnSuccess(func(id any, method mcp.MCPMethod, message any, result any) {
fmt.Printf("onSuccess: %s, %v, %v, %v\n", method, id, message, result)
})
hooks.AddOnError(func(id any, method mcp.MCPMethod, message any, err error) {
fmt.Printf("onError: %s, %v, %v, %v\n", method, id, message, err)
})
hooks.AddBeforeInitialize(func(id any, message *mcp.InitializeRequest) {
fmt.Printf("beforeInitialize: %v, %v\n", id, message)
})
hooks.AddAfterInitialize(func(id any, message *mcp.InitializeRequest, result *mcp.InitializeResult) {
fmt.Printf("afterInitialize: %v, %v, %v\n", id, message, result)
})
hooks.AddAfterCallTool(func(id any, message *mcp.CallToolRequest, result *mcp.CallToolResult) {
fmt.Printf("afterCallTool: %v, %v, %v\n", id, message, result)
})
hooks.AddBeforeCallTool(func(id any, message *mcp.CallToolRequest) {
fmt.Printf("beforeCallTool: %v, %v\n", id, message)
})
// 创建MCP服务器
s := server.NewMCPServer(
"Demo 🚀", // 服务器名称
"1.0.0", // 服务器版本
server.WithLogging(),
server.WithToolCapabilities(true),
server.WithResourceCapabilities(true, true),
server.WithPromptCapabilities(true),
server.WithInstructions("initialized"),
server.WithHooks(hooks),
)
4.3 Tools模块
4.3.1 创建Tool
-
mcp.NewTool(name string, opts ...ToolOption) Tool
-
mcp.NewToolWithRawSchema(name, description string, schema json.RawMessage) Tool
方式一:mcp.NewTool()
tool := mcp.NewTool("hello_world", // 工具名称
mcp.WithDescription("Say hello to someone"), // 工具描述
mcp.WithString("name", // 参数名称
mcp.Required(), // 参数是必需的
mcp.Description("Name of the person to greet"), // 参数描述
),
)
方式二:mcp.NewToolWithRawSchema()
rawSchema := json.RawMessage(`{
"type": "object",
"properties": {
"name": {"type": "string", "description": "Name of the person to greet"}
},
"required": ["name"]
}`)
// Create a tool with raw schema
toolRS := mcp.NewToolWithRawSchema("hello_world_1", "Say hello to someone", rawSchema)
其中rawSchema的结构需要符合ToolInputSchema结构体:
type ToolInputSchema struct {
Type string `json:"type"`
Properties map[string]interface{} `json:"properties,omitempty"`
Required []string `json:"required,omitempty"`
}
需要注意的是Properties来源于jsonSchema,所以具备的需要校验属性,比如default、maximum、minimum、maxLength、minLength、enum等等。其中key为请求传入的参数字段,interface{}为对key的各种属性校验。
4.3.2 存放Tool
// AddTool registers a new tool and its handler
func (s *MCPServer) AddTool(tool mcp.Tool, handler ToolHandlerFunc) {
s.AddTools(ServerTool{Tool: tool, Handler: handler})
}
// AddTools registers multiple tools at once
func (s *MCPServer) AddTools(tools ...ServerTool) {
// 检查工具
if s.capabilities.tools == nil {
s.capabilities.tools = &toolCapabilities{}
}
// 加锁
s.mu.Lock()
// 遍历工具
for _, entry := range tools {
s.tools[entry.Tool.Name] = entry
}
// 获取初始化状态
initialized := s.initialized.Load()
s.mu.Unlock()
// 发送通知
if initialized {
s.sendNotificationToAllClients("notifications/tools/list_changed", nil)
}
}
可以看出Tool是放入到MCPServer的tools字段中,使用name作为key,ServerTool作为value,其中ServerTool结构如下:
type ServerTool struct {
Tool mcp.Tool
Handler ToolHandlerFunc
}
框架还提供了:
-
DeleteTools(names... string)作为删除Tool关联的方法。
-
SetTools(tools ...ServerTool)可以设置当前所有的Tool 列表。
4.4 Resource模块
// Static resource example - exposing a README file
resource := mcp.NewResource(
"docs://readme",
"Project README",
mcp.WithResourceDescription("The project's README file"),
mcp.WithMIMEType("text/markdown"),
)
// Add resource with its handler
s.AddResource(resource, func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
content, err := os.ReadFile("main.go")
if err != nil {
return nil, err
}
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: "docs://readme",
MIMEType: "text/markdown",
Text: string(content),
},
}, nil
})
-
File contents 文件内容
-
Database records 数据库记录
-
API responses API响应
-
Live system data 实时系统数据
-
images 图像
-
Log files 日志文件
-
等等... ...
type Resource struct {
Annotated // 包含可选注解,用于告知客户端如何使用或显示对象
// 资源的URI
URI string `json:"uri"` // 资源的唯一标识符,用于定位和访问资源
// 资源的可读名称
//
// 客户端可以使用此名称来填充UI元素
Name string `json:"name"` // 资源的显示名称,便于用户理解和界面展示
// 对此资源所代表内容的描述
//
// 客户端可以使用此描述来帮助大型语言模型(LLM)理解可用资源
// 这可以看作是对模型的“提示”
Description string `json:"description,omitempty"` // 资源的详细描述,为模型提供上下文信息
// 如果已知,此资源的MIME类型
MIMEType string `json:"mimeType,omitempty"` // 资源的媒体类型,如text/plain、image/jpeg等,用于指示资源的内容格式
}
type Annotated struct {
Annotations *struct {
// 描述此对象或数据的预期客户是谁
//
// 它可以包含多个条目,以指示对多个受众有用的内容(例如,`["user", "assistant"]`)
Audience []Role `json:"audience,omitempty"` // 受众群体,指示数据对哪些角色或用户群体有用
// 描述此数据对服务器操作的重要性
//
// 值为1表示“最重要”,并指示数据实际上是必需的,而0表示“最不重要”,并指示数据完全是可选的
Priority float64 `json:"priority,omitempty"` // 优先级,表示数据对服务器操作的重要程度,范围从0(最不重要)到1(最重要)
} `json:"annotations,omitempty"`
}
其中:
-
URI用于定位一个具体资源的标识,场景的有http://、file://、postgres://等等还可以去https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml 看看。
-
MIMEType表示文件的类型,常见的有 text/html、image/png,这里也可以查看到更多:https://www.iana.org/assignments/media-types/media-types.xhtml。
func (s *MCPServer) AddResource(
resource mcp.Resource,
handler ResourceHandlerFunc,
) {
// 检查资源
if s.capabilities.resources == nil {
s.capabilities.resources = &resourceCapabilities{}
}
// 加锁
s.mu.Lock()
// 解锁(defer)
defer s.mu.Unlock()
// 存储资源
s.resources[resource.URI] = resourceEntry{
resource: resource,
handler: handler,
}
}
resource是放入到MCPServer的resources字段中,使用URI作为key,resourceEntry作为value,其中resourceEntry结构如下:
type resourceEntry struct {
resource mcp.Resource
handler ResourceHandlerFunc
}
框架还提供了添加Resource templates的功能,主要是针对动态资源,服务器可以公开 URI 模板 ,客户端可以使用它来构建有效的资源 URI。
// Dynamic resource example - user profiles by ID
template := mcp.NewResourceTemplate(
"users://{id}/profile",
"User Profile",
mcp.WithTemplateDescription("Returns user profile information"),
mcp.WithTemplateMIMEType("application/json"),
)
// Add template with its handler
s.AddResourceTemplate(template, func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
// Extract ID from the URI using regex matching
// The server automatically matches URIs to templates
userID := extractIDFromURI(request.Params.URI)
profile := fmt.Sprintf("Hello %s", userID) // Your DB/API call here
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: request.Params.URI,
MIMEType: "application/json",
Text: profile,
},
}, nil
})
// extractIDFromURI 从给定的 URI 中提取用户 ID。
// 假设 URI 的格式为 "users://{id}/profile"。
func extractIDFromURI(uri string) string {
// 定义正则表达式来匹配 URI 中的 ID
re := regexp.MustCompile(`users://([^/]+)/profile`)
// 使用正则表达式查找匹配项
matches := re.FindStringSubmatch(uri)
// 如果找到了匹配项,并且匹配项的数量正确,则返回 ID
if len(matches) == 2 {
return matches[1]
}
// 如果没有找到匹配项,或者匹配项的数量不正确,则返回空字符串
return ""
}
4.5 Prompts添加
// Simple greeting prompt
s.AddPrompt(mcp.NewPrompt("greeting",
mcp.WithPromptDescription("A friendly greeting prompt"),
mcp.WithArgument("name",
mcp.ArgumentDescription("Name of the person to greet"),
),
), func(ctx context.Context, request mcp.GetPromptRequest) (*mcp.GetPromptResult, error) {
name := request.Params.Arguments["name"]
if name == "" {
name = "friend"
}
return mcp.NewGetPromptResult(
"A friendly greeting",
[]mcp.PromptMessage{
mcp.NewPromptMessage(
mcp.RoleAssistant,
mcp.NewTextContent(fmt.Sprintf("Hello, %s! How can I help you today?", name)),
),
},
), nil
})
// Prompt 表示服务器提供的提示或提示模板。
// 如果 Arguments 非空且包含元素,则表示该提示是一个模板,
// 在调用 prompts/get 时需要提供参数值。
// 如果 Arguments 为空或为 nil,则这是一个不需要参数的静态提示。
type Prompt struct {
// 提示或提示模板的名称。
Name string `json:"name"`
// 提示提供内容的可选描述。
Description string `json:"description,omitempty"`
// 用于模板化提示的参数列表。
// 参数的存在表明这是一个模板提示。
Arguments []PromptArgument `json:"arguments,omitempty"`
}
// PromptArgument 描述提示模板可以接受的参数。
// 当提示包含参数时,客户端在发出 prompts/get 请求时
// 必须为所有必需参数提供值。
type PromptArgument struct {
// 参数的名称。
Name string `json:"name"`
// 参数的可读描述。
Description string `json:"description,omitempty"`
// 此参数是否必须提供。
// 如果为 true,则客户端在调用 prompts/get 时必须包含此参数。
Required bool `json:"required,omitempty"`
}
可以通过mcp.NewPrompt方法来生成Prompt对象:
func NewPrompt(name string, opts ...PromptOption) Prompt {
prompt := Prompt{
Name: name,
}
for _, opt := range opts {
opt(&prompt)
}
return prompt
}
又见到了opts ...PromptOption,看看有哪些选项:
-
WithPromptDescription用于设置description;
-
WithArgument用于设置arguments;
4.6 选择传输方式
// 启动标准输入输出服务器
if err := server.ServeStdio(s); err != nil {
fmt.Printf("Server error: %v\n", err) // 打印服务器错误
}
MCP当前主要提供两类Stdio transport和HTTP with SSE transport 。
4.6.1 Stdio
创建StdioServer
type StdioServer struct {
server *MCPServer
errLogger *log.Logger
contextFunc StdioContextFunc
}
其中在,重要的字段有server和contextFunc,给MCPServer进来就是为了具备MCP的能力,只是使用stdio的传输方式。contextFunc是为了让外部自定义的context可以进入到StdioServer,可以用于结束服务和控制超时.
// 为MCP Server 启用stdio
func ServeStdio(server *MCPServer, opts ...StdioOption) error {
// 创建Stdio服务器
s := NewStdioServer(server)
// 设置错误日志
s.SetErrorLogger(log.New(os.Stderr, "", log.LstdFlags))
// 应用选项
for _, opt := range opts {
opt(s)
}
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 设置信号通道
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
// 监听信号
go func() {
<-sigChan
cancel()
}()
// 开始监听
return s.Listen(ctx, os.Stdin, os.Stdout)
}
其中最关键的是Listen方法,它是 StdioServer 类型的一个关键方法,用于监听标准输入输出的 JSON-RPC 消息。
// Listen 启动监听,从提供的输入读取 JSON-RPC 消息,并将响应写入提供的输出。
// 它将持续运行,直到上下文被取消或发生错误。
// 如果在读取输入或写入输出时遇到问题,将返回错误。
func (s *StdioServer) Listen(
ctx context.Context, // 监听过程的上下文,用于控制生命周期和传递请求范围的信息
stdin io.Reader, // 标准输入流,用于读取客户端发送的 JSON-RPC 消息
stdout io.Writer, // 标准输出流,用于将响应写回客户端
) error {
// 由于标准输入输出只有一个客户端,因此设置一个静态客户端上下文,SessionId为stdio
if err := s.server.RegisterSession(&stdioSessionInstance); err != nil {
// 如果会话注册失败,返回错误
return fmt.Errorf("register session: %w", err)
}
// 确保在函数结束时注销会话
defer s.server.UnregisterSession(stdioSessionInstance.SessionID())
// 更新上下文,将会话信息加入
ctx = s.server.WithContext(ctx, &stdioSessionInstance)
// 如果存在自定义上下文函数,则应用该函数修改上下文
if s.contextFunc != nil {
ctx = s.contextFunc(ctx)
}
// 创建一个带缓冲的读取器,用于从标准输入流高效读取数据
reader := bufio.NewReader(stdin)
// 启动一个协程专门处理通知
go func() {
for {
select {
case notification := <-stdioSessionInstance.notifications:
// 收到通知时,调用 writeResponse 方法将通知写入标准输出
err := s.writeResponse(notification, stdout)
if err != nil {
// 如果写入通知时出错,记录错误日志
s.errLogger.Printf("Error writing notification: %v", err)
}
case <-ctx.Done():
// 如果上下文完成,退出协程
return
}
}
}()
// 主循环,用于处理输入消息
for {
select {
case <-ctx.Done():
// 如果上下文完成,返回上下文错误
return ctx.Err()
default:
// 使用协程使读取操作可取消
readChan := make(chan string, 1) // 用于接收读取到的行
errChan := make(chan error, 1) // 用于接收读取错误
go func() {
line, err := reader.ReadString('\n') // 读取一行输入
if err != nil {
errChan <- err // 发送读取错误
return
}
readChan <- line // 发送读取到的行
}()
select {
case <-ctx.Done():
// 如果上下文完成,返回上下文错误
return ctx.Err()
case err := <-errChan:
// 处理读取错误
if err == io.EOF {
// 如果是文件结束符,表示输入结束,返回 nil
return nil
}
// 其他错误则记录日志并返回
s.errLogger.Printf("Error reading input: %v", err)
return err
case line := <-readChan:
// 处理读取到的行
if err := s.processMessage(ctx, line, stdout); err != nil {
// 如果处理消息时出错
if err == io.EOF {
// 如果是文件结束符,返回 nil
return nil
}
// 其他错误则记录日志并返回
s.errLogger.Printf("Error handling message: %v", err)
return err
}
}
}
}
}
4.6.2 SSE
mcpServer := NewMCPServer()
sseServer := server.NewSSEServer(mcpServer, server.WithBaseURL("http://localhost:8080"))
log.Printf("SSE server listening on :8080")
if err := sseServer.Start(":8080"); err != nil {
log.Fatalf("Server error: %v", err)
}
现在已经知道如何在mark3labs/mcp-go中启用SSE了,现在来分析一下,它是如何实现的。
创建SSEServer
// SSEServer implements a Server-Sent Events (SSE) based MCP server.
// It provides real-time communication capabilities over HTTP using the SSE protocol.
type SSEServer struct {
server *MCPServer // MCPServer 实例,用于处理实际的消息传递和通信逻辑
baseURL string // SSE 服务器的基础 URL,用于构建完整的端点路径
basePath string // SSE 服务器的基础路径,通常用于区分不同的服务或版本
messageEndpoint string // 消息端点的路径,客户端通过此端点发送 JSON-RPC 消息
sseEndpoint string // SSE 端点的路径,客户端通过此端点建立 SSE 连接
sessions sync.Map // 存储活动 SSE 会话的同步映射,用于跟踪和管理客户端连接
srv *http.Server // 内部的 HTTP 服务器实例,用于处理 HTTP 请求和响应
contextFunc SSEContextFunc // 可选的上下文函数,用于根据请求内容自定义上下文
}
与之前的StdioServer相比,SSEServer多了用于http中使用的URI、path,还有srv这是一个httpServer的类型,用于支持HTTP请求和响应。继续查看如何构建SSEServer:
// NewSSEServer creates a new SSE server instance with the given MCP server and options.
func NewSSEServer(server *MCPServer, opts ...SSEOption) *SSEServer {
s := &SSEServer{
server: server,
sseEndpoint: "/sse",
messageEndpoint: "/message",
}
// Apply all options
for _, opt := range opts {
opt(s)
}
return s
}
还是采用的才是经典Option模式,继续看看SSEOption有哪些选项:
名称 | 功能 | 使用方式 |
WithBaseURL | 设置 SSE 服务器的基础 URL | WithBaseURL("https://example.com") |
WithBasePath | 设置 SSE 服务器的基础路径 | WithBasePath("/v1") |
WithMessageEndpoint | 设置消息端点的路径 | WithMessageEndpoint("/custom-message") |
WithSSEEndpoint | 设置 SSE 端点的路径 | WithSSEEndpoint("/custom-sse") |
WithHTTPServer | 设置 HTTP 服务器实例(通常用于测试或自定义服务器配置) | WithHTTPServer(customHttpServer) |
WithContextFunc | 设置一个函数,用于根据请求内容自定义上下文 | WithContextFunc(func(ctx context.Context, r *http.Request) context.Context { ... }) |
func (s *SSEServer) Start(addr string) error {
s.srv = &http.Server{
Addr: addr,
Handler: s,
}
return s.srv.ListenAndServe()
}
还可以调用Shutdown()实现对服务器的关闭。
4.6.3 集成到gin框架
// 创建一个新的 Gin 引擎
r := gin.Default()
// 创建一个新的 MCPServer 实例(假设这是 SSEServer 所需的)
mcpServer := server.NewMCPServer("gin-mcp-server", "1.0.0") // 根据你的实际代码调整
// mcpServer 新加Tool、Resource、Prompt
// ... ...
// 创建一个新的 SSEServer 实例,并传入 MCPServer
sseServer := server.NewSSEServer(mcpServer)
// 将 SSEServer 的 SSE 端点和处理函数集成到 Gin 路由中
r.GET(sseServer.CompleteSsePath(), func(c *gin.Context) {
sseServer.ServeHTTP(c.Writer, c.Request)
})
// 将 SSEServer 的消息端点和处理函数集成到 Gin 路由中
r.POST(sseServer.CompleteMessagePath(), func(c *gin.Context) {
sseServer.ServeHTTP(c.Writer, c.Request)
})
// 启动 Gin 服务器
if err := r.Run("localhost:8081"); err != nil {
log.Fatalf("Gin server startup failed: %v", err)
}
上述的代码会生成两个路由:
路由
|
方法
|
作用
|
实例
|
/sse
|
GET
|
|
|
/message
|
POST
|
|
|
4.7 处理请求
4.7.1 路由入口
func (s *SSEServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 获取请求的路径
path := r.URL.Path
// 使用精确路径匹配,而不是模糊包含
ssePath := s.CompleteSsePath() // 获取完整的 SSE 路径
if ssePath != "" && path == ssePath {
// 如果请求路径与 SSE 路径匹配,则处理 SSE 请求
s.handleSSE(w, r)
return // 处理完成后直接返回,不再继续后续逻辑
}
// 获取消息处理的完整路径
messagePath := s.CompleteMessagePath()
if messagePath != "" && path == messagePath {
// 如果请求路径与消息处理路径匹配,则处理消息请求
s.handleMessage(w, r)
return // 处理完成后直接返回,不再继续后续逻辑
}
// 如果请求路径不匹配任何已知路径,则返回 404 未找到
http.NotFound(w, r)
}
其中:
-
SSE路径为:s.baseURL + s.basePath + s.sseEndpoint
-
Message路径为:s.baseURL + s.basePath + s.messageEndpoint
4.7.2 handleSSE
-
请求方法检查:只允许GET请求。
-
设置响应头:设置适当的SSE响应头。
-
创建会话:为每个客户端创建一个新的SSE会话。
-
注册和注销会话:在服务器中注册会话,并在处理完成后注销。
-
通知处理器:启动一个goroutine处理来自通知通道的事件,并将其发送到客户端。
-
主事件循环:处理来自事件队列的事件,并将其发送到客户端。
// 它设置适当的头信息并为客户端创建一个新的会话。
func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) {
// 1. 检查请求方法是否为GET,如果不是,返回405 Method Not Allowed错误
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 2. 设置SSE相关的响应头
w.Header().Set("Content-Type", "text/event-stream") // 设置内容类型为text/event-stream
w.Header().Set("Cache-Control", "no-cache") // 禁用缓存
w.Header().Set("Connection", "keep-alive") // 保持连接活跃
w.Header().Set("Access-Control-Allow-Origin", "*") // 允许所有域跨域请求
// 检查ResponseWriter是否支持Flush,如果不支持,返回500 Internal Server Error错误
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
// 3. 创建一个新的会话ID和会话对象
sessionID := uuid.New().String() // 生成唯一的会话ID
session := &sseSession{
writer: w, // 响应写入器
flusher: flusher, // 刷新器
done: make(chan struct{}), // 用于通知会话结束的通道
eventQueue: make(chan string, 100), // 事件队列,缓冲区大小为100
sessionID: sessionID, // 会话ID
notificationChannel: make(chan mcp.JSONRPCNotification, 100), // 通知通道,缓冲区大小为100
}
// 4. 将会话存储到会话存储中,并在处理完成后删除
s.sessions.Store(sessionID, session)
defer s.sessions.Delete(sessionID)
// 在服务器中注册会话,如果注册失败,返回500 Internal Server Error错误
if err := s.server.RegisterSession(session); err != nil {
http.Error(w, fmt.Sprintf("Session registration failed: %v", err), http.StatusInternalServerError)
return
}
// 在处理完成后注销会话
defer s.server.UnregisterSession(sessionID)
// 5. 启动一个goroutine处理通知通道中的事件
go func() {
for {
select {
case notification := <-session.notificationChannel: // 从通知通道接收通知
eventData, err := json.Marshal(notification) // 将通知序列化为JSON
if err == nil {
select {
case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData): // 将事件发送到事件队列
// 事件成功入队
case <-session.done: // 如果会话结束,退出goroutine
return
}
}
case <-session.done: // 如果会话结束,退出goroutine
return
case <-r.Context().Done(): // 如果请求上下文被取消,退出goroutine
return
}
}
}()
// 生成消息端点URL并发送初始的endpoint事件
messageEndpoint := fmt.Sprintf("%s?sessionId=%s", s.CompleteMessageEndpoint(), sessionID)
fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", messageEndpoint) // 发送endpoint事件
flusher.Flush() // 刷新响应,确保事件立即发送到客户端
// 6. 主事件循环,运行在HTTP处理器goroutine中
for {
select {
case event := <-session.eventQueue: // 从事件队列接收事件
fmt.Fprint(w, event) // 将事件写入响应
flusher.Flush() // 刷新响应,确保事件立即发送到客户端
case <-r.Context().Done(): // 如果请求上下文被取消,关闭会话并退出
close(session.done)
return
}
}
}
第三阶段又看到Session了,与之前的stdioSession相比,sseSession明显复杂多了,它们都实现接口ClientSession:
type ClientSession interface {
// NotificationChannel provides a channel suitable for sending notifications to client.
NotificationChannel() chan<- mcp.JSONRPCNotification
// SessionID is a unique identifier used to track user session.
SessionID() string
}
sseSession是专门用于表示一个基于服务器发送事件(Server-Sent Events, SSE)协议的活跃连接。sseSession负责管理客户端与服务器之间的单向实时数据推送,并保持会话。其结构体如下:
type sseSession struct {
writer http.ResponseWriter // HTTP响应写入器,用于向客户端发送数据
flusher http.Flusher // HTTP刷新器,用于刷新响应缓冲区,确保数据立即发送给客户端
done chan struct{} // 用于通知会话结束的通道
eventQueue chan string // 用于排队事件的通道,存储待发送给客户端的事件
sessionID string // 会话的唯一标识符
notificationChannel chan mcp.JSONRPCNotification // 用于接收JSON-RPC通知的通道
}
4.7.3 handleMessage
-
请求验证:检查请求方法
-
检查请求方法:方法首先检查请求方法是否为 HTTP POST。如果不是,返回 "Method not allowed" 错误,并终止处理。
-
验证 sessionId 参数:从请求的 URL 查询参数中获取 sessionId。如果缺失,返回 "Missing sessionId" 错误。
-
加载会话:使用 sessionId 从会话存储中加载会话。如果会话不存在,返回 "Invalid session ID" 错误。
-
-
设置上下文
-
调用 s.server.WithContext 方法,将请求上下文和会话信息合并,生成新的上下文。如果提供了 contextFunc,则进一步处理上下文。
-
-
解析 JSON-RPC 消息
-
使用 json.NewDecoder 解析请求体中的原始 JSON 消息。如果解析失败,返回 "Parse error" 错误。
-
-
处理消息
-
将解析后的消息传递给 s.server.HandleMessage 方法进行处理,并获取响应结果。
-
-
发送响应如果 HandleMessage 返回了响应(非通知),则:如果 HandleMessage 没有返回响应(通知),则仅设置 HTTP 响应状态码为 202 Accepted,不发送响应体。
-
将响应编码为 JSON 格式。
-
将响应事件加入会话的事件队列,供 SSE 连接发送。
-
设置 HTTP 响应头为 application/json,状态码为 202 Accepted,并发送响应体。
-
-
事件队列处理
// handleMessage 处理来自客户端的JSON-RPC消息,并通过SSE连接和HTTP响应返回结果
func (s *SSEServer) handleMessage(w http.ResponseWriter, r *http.Request) {
// 1. 如果请求方法不是POST,则返回JSON-RPC错误响应,表示方法不允许
if r.Method != http.MethodPost {
s.writeJSONRPCError(w, nil, mcp.INVALID_REQUEST, "Method not allowed")
return
}
// 从请求的URL查询参数中获取sessionId
sessionID := r.URL.Query().Get("sessionId")
// 如果sessionId为空,则返回JSON-RPC错误响应,表示缺少sessionId参数
if sessionID == "" {
s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Missing sessionId")
return
}
// 从session存储中加载与sessionId对应的session
sessionI, ok := s.sessions.Load(sessionID)
// 如果session不存在,则返回JSON-RPC错误响应,表示无效的session ID
if !ok {
s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Invalid session ID")
return
}
session := sessionI.(*sseSession)
// 在处理消息之前设置客户端上下文
ctx := s.server.WithContext(r.Context(), session)
// 如果提供了自定义的上下文函数,则应用它
if s.contextFunc != nil {
ctx = s.contextFunc(ctx, r)
}
// 将请求体解析为原始JSON消息
var rawMessage json.RawMessage
if err := json.NewDecoder(r.Body).Decode(&rawMessage); err != nil {
// 如果解析失败,则返回JSON-RPC错误响应,表示解析错误
s.writeJSONRPCError(w, nil, mcp.PARSE_ERROR, "Parse error")
return
}
// 通过MCPServer处理消息
response := s.server.HandleMessage(ctx, rawMessage)
// 如果存在响应(非通知),则发送响应
if response != nil {
// 将响应编码为JSON格式
eventData, _ := json.Marshal(response)
// 将事件排队以通过SSE发送
select {
case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData):
// 事件成功排队
case <-session.done:
// 会话已关闭,不尝试排队
default:
// 队列已满,可以记录此情况
}
// 发送HTTP响应
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(response)
} else {
// 对于通知,只发送202 Accepted状态码,无响应体
w.WriteHeader(http.StatusAccepted)
}
}
-
尝试将原始消息解析为对应请求类型。
-
如果解析失败,记录错误信息。
-
执行请求前的钩子函数。
-
调用对应的处理函数处理请求。
-
如果处理过程中发生错误,执行错误钩子函数并返回错误响应。
-
执行请求后的钩子函数。
-
返回成功响应。
// 根据消息方法进行分情况处理
switch baseMessage.Method {
// 初始化请求处理
case mcp.MethodInitialize:
var request mcp.InitializeRequest
var result *mcp.InitializeResult
// 尝试将原始消息解析为初始化请求类型
if unmarshalErr := json.Unmarshal(message, &request); unmarshalErr != nil {
// 如果解析失败,记录错误信息
err = &requestError{
id: baseMessage.ID,
code: mcp.INVALID_REQUEST,
err: &UnparseableMessageError{message: message, err: unmarshalErr, method: baseMessage.Method},
}
} else {
// 执行初始化请求前的钩子函数
s.hooks.beforeInitialize(baseMessage.ID, &request)
// 处理初始化请求
result, err = s.handleInitialize(ctx, baseMessage.ID, request)
}
// 如果处理过程中发生错误
if err != nil {
// 执行错误钩子函数
s.hooks.onError(baseMessage.ID, baseMessage.Method, &request, err)
// 返回错误响应
return err.ToJSONRPCError()
}
// 执行初始化请求后的钩子函数
s.hooks.afterInitialize(baseMessage.ID, &request, result)
// 返回成功响应
return createResponse(baseMessage.ID, *result)
// 其他方法处理逻辑类似,省略...
// 如果方法不匹配任何已知方法,返回方法未找到错误响应
default:
return createErrorResponse(
baseMessage.ID,
mcp.METHOD_NOT_FOUND,
fmt.Sprintf("Method %s not found", baseMessage.Method),
)
}
代码比较多,可以自行查看:https://github.com/mark3labs/mcp-go/blob/e183dd17cfec07072a188f6169033bf61f7bf37d/server/request_handler.go#L12
五、总结

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
《跟老卫学仓颉编程语言开发》实战:猜数字游戏
以前中央电视台财经频道推出过一档大型演播室互动娱乐节目《购物街》,该节目里面包含了一个叫做“高了低了”的游戏环节。笔者非常喜爱这个游戏。这个游戏环节设置了八个百元价位左右的商品,首先选手要选择编号1-8的商品,之后猜这个商品的价格。在选手猜价格的过程中,主持人会给出高了、低了的提示,直到帮助选手猜出正确价格为止。之后继续选择,以此类推,直到30秒时间到为止。 本节所要介绍的猜数字游戏也是类似的,程序给出一个1到100之间的随机整数,让用户猜。用户猜一个数并输入到程序,然后程序会提示猜测是大了还是小了。如果猜对了,它会打印祝贺信息并退出。 本节通过仓颉语言,来开发一个简单的猜数字游戏,综合运用了流程控制、标准输入、字符串的操作、整型的比较等知识。 本节示例可以在“guessing_game”应用下找到。 输入数字 在程序界面输入数字代表用户猜数字的实现。如何实现在程序界面输入数字?这里就需要用到std.console包,该模块包含许多在执行输入和输出时需要的常见操作。 import std.console.* // 标准输入流(stdIn)读取一行 let line = Console....
-
下一篇
算力,并不是大模型厂商发展的护城河
上个月,OpenAI CEO 山姆·奥尔特曼在社交媒体上表示,号称 52 万亿参数量的 GPT-5 将在数月内发布。相较于上一代 GPT-4 的 2 万亿参数,体量上足足增长了 26 倍,虽无公布具体的训练成本,但想必也一定是个天文数字,堪称大模型领域的“力大砖飞”。 反观 LLM 界的“黑天鹅”,DeepSeek-V3 却仅用了 2048 块英伟达 H800 ,耗费了 557.6 万美金就完成了训练,一度引起硅谷恐慌,力证了:算力并非不可逾越的堡垒。 一边是暴力填鸭,一边是技术深化,2025 年的大模型似乎走出了两条截然不同的道路,也逐渐撕开了 AI 行业最残酷的真相:早期以“算力”建立护城河的大模型厂商们,在面对新一轮技术冲击时,高成本的算力反而成为了其灵活发展的累赘。 算力的必要性和局限性 作为数字经济的“新电力”,算力在大模型的训练和推理过程中确实起到了不可或缺的作用。 以 OpenAI 为例,早期在 GPT-4 的训练中,大概就使用了 25000 个 A100 芯片。如果 OpenAI 云计算的成本是差不多 1 美元/每 A100 小时的话,那么在这样的条件下,仅一次训练的成...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Crontab安装和使用
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池