From a11d85938e0aa91c114201c61d5d2c6daa90963e Mon Sep 17 00:00:00 2001 From: Andy Arthur Date: Wed, 23 Feb 2022 09:20:42 -0800 Subject: [PATCH 1/2] add buffering to prepared statement result sets --- go/mysql/conn.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/go/mysql/conn.go b/go/mysql/conn.go index fd8331e1762..cab400c75cb 100644 --- a/go/mysql/conn.go +++ b/go/mysql/conn.go @@ -1098,6 +1098,11 @@ func (c *Conn) handleNextCommand(handler Handler) error { } case ComStmtExecute: + // flush is called at the end of this block. + // We cannot encapsulate it with a defer inside a func because + // we have to return from this func if it fails. + c.startWriterBuffering() + queryStart := time.Now() stmtID, _, err := c.parseComStmtExecute(c.PrepareData, data) c.recycleReadPacket() @@ -1176,6 +1181,11 @@ func (c *Conn) handleNextCommand(handler Handler) error { } timings.Record(queryTimingKey, queryStart) + if err := c.flush(); err != nil { + log.Errorf("Conn %v: Flush() failed: %v", c.ID(), err) + return err + } + case ComStmtSendLongData: stmtID, paramID, chunkData, ok := c.parseComStmtSendLongData(data) c.recycleReadPacket() From bdb5657e1ab94f7cda819c60eb5e76327c9ec688 Mon Sep 17 00:00:00 2001 From: Andy Arthur Date: Wed, 23 Feb 2022 09:46:16 -0800 Subject: [PATCH 2/2] factored out executePreparedStatement(), cleaned up error handling --- go/mysql/conn.go | 125 +++++++++++++++++++++++++---------------------- 1 file changed, 66 insertions(+), 59 deletions(-) diff --git a/go/mysql/conn.go b/go/mysql/conn.go index cab400c75cb..3b953482679 100644 --- a/go/mysql/conn.go +++ b/go/mysql/conn.go @@ -914,8 +914,8 @@ func (c *Conn) handleNextCommand(handler Handler) error { } case ComQuery: // flush is called at the end of this block. - // We cannot encapsulate it with a defer inside a func because - // we have to return from this func if it fails. + // To simplify error handling, we do not + // encapsulate it with a defer'd func() c.startWriterBuffering() queryStart := time.Now() @@ -1099,8 +1099,8 @@ func (c *Conn) handleNextCommand(handler Handler) error { case ComStmtExecute: // flush is called at the end of this block. - // We cannot encapsulate it with a defer inside a func because - // we have to return from this func if it fails. + // To simplify error handling, we do not + // encapsulate it with a defer'd func() c.startWriterBuffering() queryStart := time.Now() @@ -1121,63 +1121,11 @@ func (c *Conn) handleNextCommand(handler Handler) error { log.Error("Error writing query error to client %v: %v", c.ConnectionID, werr) return werr } - return nil + return c.flush() } - fieldSent := false - // sendFinished is set if the response should just be an OK packet. - sendFinished := false - prepare := c.PrepareData[stmtID] - err = handler.ComStmtExecute(c, prepare, func(qr *sqltypes.Result) error { - if sendFinished { - // Failsafe: Unreachable if server is well-behaved. - return io.EOF - } - - if !fieldSent { - fieldSent = true - - if len(qr.Fields) == 0 { - sendFinished = true - // We should not send any more packets after this. - return c.writeOKPacket(qr.RowsAffected, qr.InsertID, c.StatusFlags, 0) - } - if err := c.writeFields(qr); err != nil { - return err - } - } - - return c.writeBinaryRows(qr) - }) - - // If no field was sent, we expect an error. - if !fieldSent { - // This is just a failsafe. Should never happen. - if err == nil || err == io.EOF { - err = NewSQLErrorFromError(errors.New("unexpected: query ended without no results and no error")) - } - if werr := c.writeErrorPacketFromError(err); werr != nil { - // If we can't even write the error, we're done. - log.Errorf("Error writing query error to %s: %v", c, werr) - return werr - } - } else { - if err != nil { - // We can't send an error in the middle of a stream. - // All we can do is abort the send, which will cause a 2013. - log.Errorf("Error in the middle of a stream to %s: %v", c, err) - return err - } - - // Send the end packet only sendFinished is false (results were streamed). - // In this case the affectedRows and lastInsertID are always 0 since it - // was a read operation. - if !sendFinished { - if err := c.writeEndResult(false, 0, 0, handler.WarningCount(c)); err != nil { - log.Errorf("Error writing result to %s: %v", c, err) - return err - } - } + if err = c.execPrepareStatement(stmtID, handler); err != nil { + return err } timings.Record(queryTimingKey, queryStart) @@ -1374,6 +1322,65 @@ func (c *Conn) execQuery(query string, handler Handler, multiStatements bool) (s return remainder, nil } +func (c *Conn) execPrepareStatement(stmtID uint32, handler Handler) (err error) { + fieldSent := false + // sendFinished is set if the response should just be an OK packet. + sendFinished := false + prepare := c.PrepareData[stmtID] + err = handler.ComStmtExecute(c, prepare, func(qr *sqltypes.Result) error { + if sendFinished { + // Failsafe: Unreachable if server is well-behaved. + return io.EOF + } + + if !fieldSent { + fieldSent = true + + if len(qr.Fields) == 0 { + sendFinished = true + // We should not send any more packets after this. + return c.writeOKPacket(qr.RowsAffected, qr.InsertID, c.StatusFlags, 0) + } + if err := c.writeFields(qr); err != nil { + return err + } + } + + return c.writeBinaryRows(qr) + }) + + // If no field was sent, we expect an error. + if !fieldSent { + // This is just a failsafe. Should never happen. + if err == nil || err == io.EOF { + err = NewSQLErrorFromError(errors.New("unexpected: query ended without no results and no error")) + } + if werr := c.writeErrorPacketFromError(err); werr != nil { + // If we can't even write the error, we're done. + log.Errorf("Error writing query error to %s: %v", c, werr) + return werr + } + } else { + if err != nil { + // We can't send an error in the middle of a stream. + // All we can do is abort the send, which will cause a 2013. + log.Errorf("Error in the middle of a stream to %s: %v", c, err) + return err + } + + // Send the end packet only sendFinished is false (results were streamed). + // In this case the affectedRows and lastInsertID are always 0 since it + // was a read operation. + if !sendFinished { + if err := c.writeEndResult(false, 0, 0, handler.WarningCount(c)); err != nil { + log.Errorf("Error writing result to %s: %v", c, err) + return err + } + } + } + return +} + // // Packet parsing methods, for generic packets. //