Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/mysql: Add buffering to prepared statement result sets #126

Merged
merged 2 commits into from
Feb 23, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 74 additions & 57 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,8 +914,8 @@ func (c *Conn) handleNextCommand(handler Handler) error {
}
case ComQuery:
// flush is called at the end of this block.
// We cannot encapsulate it with a defer inside a func because
// we have to return from this func if it fails.
// To simplify error handling, we do not
// encapsulate it with a defer'd func()
c.startWriterBuffering()

queryStart := time.Now()
Expand Down Expand Up @@ -1098,6 +1098,11 @@ func (c *Conn) handleNextCommand(handler Handler) error {
}

case ComStmtExecute:
// flush is called at the end of this block.
// To simplify error handling, we do not
// encapsulate it with a defer'd func()
c.startWriterBuffering()

queryStart := time.Now()
stmtID, _, err := c.parseComStmtExecute(c.PrepareData, data)
c.recycleReadPacket()
Expand All @@ -1116,66 +1121,19 @@ func (c *Conn) handleNextCommand(handler Handler) error {
log.Error("Error writing query error to client %v: %v", c.ConnectionID, werr)
return werr
}
return nil
return c.flush()
}

fieldSent := false
// sendFinished is set if the response should just be an OK packet.
sendFinished := false
prepare := c.PrepareData[stmtID]
err = handler.ComStmtExecute(c, prepare, func(qr *sqltypes.Result) error {
if sendFinished {
// Failsafe: Unreachable if server is well-behaved.
return io.EOF
}

if !fieldSent {
fieldSent = true

if len(qr.Fields) == 0 {
sendFinished = true
// We should not send any more packets after this.
return c.writeOKPacket(qr.RowsAffected, qr.InsertID, c.StatusFlags, 0)
}
if err := c.writeFields(qr); err != nil {
return err
}
}

return c.writeBinaryRows(qr)
})

// If no field was sent, we expect an error.
if !fieldSent {
// This is just a failsafe. Should never happen.
if err == nil || err == io.EOF {
err = NewSQLErrorFromError(errors.New("unexpected: query ended without no results and no error"))
}
if werr := c.writeErrorPacketFromError(err); werr != nil {
// If we can't even write the error, we're done.
log.Errorf("Error writing query error to %s: %v", c, werr)
return werr
}
} else {
if err != nil {
// We can't send an error in the middle of a stream.
// All we can do is abort the send, which will cause a 2013.
log.Errorf("Error in the middle of a stream to %s: %v", c, err)
return err
}

// Send the end packet only sendFinished is false (results were streamed).
// In this case the affectedRows and lastInsertID are always 0 since it
// was a read operation.
if !sendFinished {
if err := c.writeEndResult(false, 0, 0, handler.WarningCount(c)); err != nil {
log.Errorf("Error writing result to %s: %v", c, err)
return err
}
}
if err = c.execPrepareStatement(stmtID, handler); err != nil {
return err
}

timings.Record(queryTimingKey, queryStart)
if err := c.flush(); err != nil {
log.Errorf("Conn %v: Flush() failed: %v", c.ID(), err)
return err
}

case ComStmtSendLongData:
stmtID, paramID, chunkData, ok := c.parseComStmtSendLongData(data)
c.recycleReadPacket()
Expand Down Expand Up @@ -1364,6 +1322,65 @@ func (c *Conn) execQuery(query string, handler Handler, multiStatements bool) (s
return remainder, nil
}

func (c *Conn) execPrepareStatement(stmtID uint32, handler Handler) (err error) {
fieldSent := false
// sendFinished is set if the response should just be an OK packet.
sendFinished := false
prepare := c.PrepareData[stmtID]
err = handler.ComStmtExecute(c, prepare, func(qr *sqltypes.Result) error {
if sendFinished {
// Failsafe: Unreachable if server is well-behaved.
return io.EOF
}

if !fieldSent {
fieldSent = true

if len(qr.Fields) == 0 {
sendFinished = true
// We should not send any more packets after this.
return c.writeOKPacket(qr.RowsAffected, qr.InsertID, c.StatusFlags, 0)
}
if err := c.writeFields(qr); err != nil {
return err
}
}

return c.writeBinaryRows(qr)
})

// If no field was sent, we expect an error.
if !fieldSent {
// This is just a failsafe. Should never happen.
if err == nil || err == io.EOF {
err = NewSQLErrorFromError(errors.New("unexpected: query ended without no results and no error"))
}
if werr := c.writeErrorPacketFromError(err); werr != nil {
// If we can't even write the error, we're done.
log.Errorf("Error writing query error to %s: %v", c, werr)
return werr
}
} else {
if err != nil {
// We can't send an error in the middle of a stream.
// All we can do is abort the send, which will cause a 2013.
log.Errorf("Error in the middle of a stream to %s: %v", c, err)
return err
}

// Send the end packet only sendFinished is false (results were streamed).
// In this case the affectedRows and lastInsertID are always 0 since it
// was a read operation.
if !sendFinished {
if err := c.writeEndResult(false, 0, 0, handler.WarningCount(c)); err != nil {
log.Errorf("Error writing result to %s: %v", c, err)
return err
}
}
}
return
}

//
// Packet parsing methods, for generic packets.
//
Expand Down