- 数据请求阶段 -
Part 1 - 简单查询
![]()
1. 客户端发送Query (‘Q’)消息给服务端,包含了一条字符串类型的SQL语句。
func(cn *conn) query(query string, args []driver.Value)(_ *rows, err error){ ... // Check to see if we can use the "simpleQuery"interface, which is // *much* faster than going through prepare/exec iflen(args)==0{ return cn.simpleQuery(query) } ...}
2. 服务端收到Query 消息,解析SQL语句,生成抽象语法树(AST),并传给执行器执行,获得结果。
func(c *conn) serveImpl( ctx context.Context, draining func()bool, sqlServer *sql.Server, reserved mon.BoundAccount, stopper *stop.Stopper,)error{ ...Loop: for{ typ, n, err = c.readBuf.ReadTypedMsg(&c.rd) if err !=nil{ break Loop } ... switch typ { case pgwirebase.ClientMsgSimpleQuery: ... case pgwirebase.ClientMsgExecute: ... case pgwirebase.ClientMsgParse: ... case pgwirebase.ClientMsgDescribe: ... case pgwirebase.ClientMsgBind: ... case pgwirebase.ClientMsgSync: ... } } ...}
3. 服务端根据SQL结果,首先发送RowDescription(B:‘T’)消息,包含列的数量,列名,列的类型等参数。
func(c *conn) writeRowDescription( ctx context.Context, columns []sqlbase.ResultColumn, formatCodes []pgwirebase.FormatCode, w io.Writer,)error{ c.msgBuilder.initMsg(pgwirebase.ServerMsgRowDescription) c.msgBuilder.putInt16(int16(len(columns))) for i, column :=range columns { ... c.msgBuilder.writeTerminatedString(column.Name) ... c.msgBuilder.putInt32(0)//Table OID (optional). c.msgBuilder.putInt16(0)//Column attribute ID (optional). c.msgBuilder.putInt32(int32(typ.oid)) c.msgBuilder.putInt16(int16(typ.size)) ... } ...}
4. RowDescription消息后面将跟着多个DataRow(B:‘D’)消息,每个DataRow消息包含一行的数据。
func(c *conn) bufferRow( ctx context.Context, row tree.Datums, formatCodes []pgwirebase.FormatCode, convsessiondata.DataConversionConfig, types []*types.T,){ c.msgBuilder.initMsg(pgwirebase.ServerMsgDataRow) c.msgBuilder.putInt16(int16(len(row))) for i, col :=range row { ... switch fmtCode { case pgwirebase.FormatText: c.msgBuilder.writeTextDatum(ctx, col, conv, types[i]) case pgwirebase.FormatBinary: c.msgBuilder.writeBinaryDatum(ctx, col, conv.Location, types[i]) ... } if err := c.msgBuilder.finishMsg(&c.writerState.buf); err !=nil{ panic(fmt.Sprintf("unexpected err from buffer: %s", err)) }}
5. 发送CommandComplete(B:‘C’)消息表示这个SQL请求执行结束了。
6. 服务端发送ReadyForQuery(‘Z’),通知客户端可以发送下一条SQL请求了。
func(r *commandResult) Close(ctx context.Context, t sql.TransactionStatusIndicator){ ... switch r.typ { case commandComplete: tag := cookTag( r.cmdCompleteTag, r.conn.writerState.tagBuf[:0], r.stmtType, r.rowsAffected, ) r.conn.bufferCommandComplete(tag) case parseComplete: r.conn.bufferParseComplete() case bindComplete: r.conn.bufferBindComplete() case closeComplete: r.conn.bufferCloseComplete() case readyForQuery: r.conn.bufferReadyForQuery(byte(t)) // The error is saved on conn.err. _ /* err */= r.conn.Flush(r.pos) ... } ...}
7.客户端根据接受SQL请求的结果。
func(cn *conn) simpleQuery(q string)(res *rows, err error){ b := cn.writeBuf('Q') b.string(q) cn.send(b)
for{ t, r := cn.recv1() switch t { case'C','I': ... case'Z': ... case'E': ... case'D': ... case'T': ... } }}
Part 2 - 扩展查询
1. 客户端发送扩展查询请求,依次发送Parse (F:‘P’), Bind (F:‘B’), Describe (F:‘D’), Execute (F:‘E’), Sync(F:‘S’)消息。
func(cn *conn) query(query string, args []driver.Value)(_ *rows, err error){ ... if cn.binaryParameters { cn.sendBinaryModeQuery(query, args) cn.readParseResponse() cn.readBindResponse() rows :=&rows{cn: cn} rows.rowsHeader = cn.readPortalDescribeResponse() cn.postExecuteWorkaround() return rows,nil } ...}
func(cn *conn) sendBinaryModeQuery(query string, args []driver.Value){ b := cn.writeBuf('P') b.byte(0)//unnamed statement b.string(query) b.int16(0)
b.next('B') b.int16(0)//unnamed portal and statement cn.sendBinaryParameters(b, args) b.bytes(colFmtDataAllText)
b.next('D') b.byte('P') b.byte(0)//unnamed portal
b.next('E') b.byte(0) b.int32(0)
b.next('S') cn.send(b)}
2. 服务端处理扩展查询请求。
3. 服务端发送回应消息,ParseComplete (B:‘1’), BindComplete (B:‘2’), ParameterDescription(B:‘t’), CommandComplete (B:‘C’), CloseComplete (B:‘3’), ReadyForQuery (B:‘Z’)。
func(r *commandResult) Close(ctx context.Context, t sql.TransactionStatusIndicator){ ... switch r.typ { case commandComplete: tag := cookTag( r.cmdCompleteTag, r.conn.writerState.tagBuf[:0], r.stmtType, r.rowsAffected, ) r.conn.bufferCommandComplete(tag) case parseComplete: r.conn.bufferParseComplete() case bindComplete: r.conn.bufferBindComplete() case closeComplete: r.conn.bufferCloseComplete() case readyForQuery: r.conn.bufferReadyForQuery(byte(t)) // The error is saved on conn.err. _ /* err */= r.conn.Flush(r.pos) ... } ...}