浪潮云溪分布式数据库协议代码解析(1)
云溪数据库支持PostgreSQL protocol 3.0,用于客户端与服务端之间的信息通信,应用于连接认证及数据请求阶段。
PostgreSQL协议的消息通用格式如下图所示,包含1字节的消息类型,4字节的长度(不包括类型的长度),以及消息的内容。由于历史原因,startup消息不包含类型。
Part 1 - 连接认证阶段
1.用户使用客户端,通过云溪数据库 sql命令,尝试连接服务端时,客户端会获取连接命令的参数,生成URL,具体格式如下
postgres://<username>:<password>@<host>:<port>/<database>?<parameters>
其中,包含了当前用户的用户名和密码,节点的IP地址和端口,连接的数据库库名,以及额外的连接参数。
2.客户端会根据URL,建立与服务端之间的连接,发送一个startup消息。
func(c *Connector) open(ctx context.Context)(cn *conn, err error){...cn.startup(o)...}
上述代码,将构建一个startup消息,该消息没有消息类型,包含了协议版本号,和连接参数等内容。
3.服务端接收解析startup消息,获得连接参数。
func(s *Server) ServeConn(ctx context.Context, conn net.Conn)error{...var buf pgwirebase.ReadBuffern, err := buf.ReadUntypedMsg(conn)if err !=nil{return err}version, err := buf.GetUint32()if err !=nil{return err}...// get connection parametersif sArgs, err = parseOptions(ctx, buf.Msg); err !=nil{return sendErr(err)}...}
4.服务端发送AuthenticationRequest消息,要求客户端进一步提供认证信息,以进行用户身份认证。
func authPassword(c AuthConn,tlsState tls.ConnectionState,insecure bool,hashedPassword []byte,validUntil *tree.DTimestamp,encryption string,execCfg *sql.ExecutorConfig,entry *hba.Entry,)(security.UserAuthHook,error){if err := c.SendAuthRequest(authCleartextPassword,nil); err !=nil{returnnil,err}// recevice password from clientpassword, err := c.ReadPasswordString()...}
认证请求消息中,除了消息类型’R’外,还包含认证方式,目前云溪数据库支持证书、口令和GSSAPI三种认证方式。证书认证不需要额外的认证信息,认证通过后直接发送AuthenticationOk消息,跳过5、6。
5.客户端收到AuthenticationRequest消息后,则会发送对应的认证信息,回应此消息,该回应的消息类型为’p’。
func(cn *conn) startup(o values){...for{// recevice responses after sending startupt, r := cn.recv()switch t {case'K':cn.processBackendKeyData(r)case'S':cn.processParameterStatus(r)case'R':cn.auth(r, o)case'Z':cn.processReadyForQuery(r)returndefault:errorf("unknown response forstartup: %q",t)}}}func(cn *conn) auth(r *readBuf, o values){switch code := r.int32(); code {case0:// OKcase3:w := cn.writeBuf('p')w.string(o["password"])cn.send(w)...case7:// GSSAPI, startup...w := cn.writeBuf('p')w.bytes(token)cn.send(w)...}}
6.服务端收到认证回应后,进行用户的身份认证。
7.服务端认证完成后,给客户端发送认证结果。成功,发送AuthenticationOk(‘R’),authType为0;失败,则发送ErrorResponse(‘E’),连接过程结束。
func(c *conn) handleAuthentication(ctx context.Context,insecure bool,ie *sql.InternalExecutor,auth *hba.Conf,execCfg *sql.ExecutorConfig,)(authErr error){...c.msgBuilder.initMsg(pgwirebase.ServerMsgAuth)c.msgBuilder.putInt32(authOK)return c.msgBuilder.finishMsg(c.conn)}
8.服务端认证完成后,将发送多条参数信息ParameterStatus(‘S’),包括server_version, client_encoding 和 DateStyle 等参数。每个参数,都会发送一条ParameterStatus消息。
func(c *conn) serveImpl(ctx context.Context,draining func()bool,sqlServer *sql.Server,reserved mon.BoundAccount,stopper *stop.Stopper,)error{...sendStatusParam:=func(param, value string)error{c.msgBuilder.initMsg(pgwirebase.ServerMsgParameterStatus)c.msgBuilder.writeTerminatedString(param)c.msgBuilder.writeTerminatedString(value)return c.msgBuilder.finishMsg(c.conn)}...for _, param :=range statusReportParams {value := connHandler.GetStatusParam(ctx, param)if err := sendStatusParam(param, value); err !=nil{return err}}}...}
9.服务端发送ReadyForQuery(‘Z’),表示一切准备就绪,通知客户端可以发送SQL请求了。
func(c *conn) serveImpl(ctx context.Context,draining func()bool,sqlServer *sql.Server,reserved mon.BoundAccount,stopper *stop.Stopper,)error{...// An initial readyForQuery message is part of the handshake.c.msgBuilder.initMsg(pgwirebase.ServerMsgReady)c.msgBuilder.writeByte(byte(sql.IdleTxnBlock))if err := c.msgBuilder.finishMsg(c.conn); err !=nil{return err}...}
至此,客户端与服务端之间,已经成功建立起连接,用户可以执行后续操作了。

