From 1ccc551c40f1cea7bf5997a1450dc29513f39178 Mon Sep 17 00:00:00 2001 From: Tommy Reilly Date: Tue, 18 Apr 2023 16:49:08 -0400 Subject: [PATCH] copy: fix extra flush loop causing test to flake Previously we would try again and increment batch number when a retriable error occurred, now we call a hook on retries so we can uncount that batch. Also don't bother calling the insert routine if the length of the batch is 0, it will be a noop but still call the hook incrementing the batch counter. Release note: None Epic: None Fixes: #101610 --- pkg/sql/copy/copy_test.go | 4 ++++ pkg/sql/copy_from.go | 9 +++++++-- pkg/sql/exec_util.go | 3 +++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/sql/copy/copy_test.go b/pkg/sql/copy/copy_test.go index d75aaeb2e44e..64e392a90e15 100644 --- a/pkg/sql/copy/copy_test.go +++ b/pkg/sql/copy/copy_test.go @@ -548,6 +548,10 @@ func TestLargeDynamicRows(t *testing.T) { batchNumber++ return nil }, + CopyFromInsertRetry: func() error { + batchNumber-- + return nil + }, } s, _, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) diff --git a/pkg/sql/copy_from.go b/pkg/sql/copy_from.go index 4649f93738d8..0fad84aeb5c6 100644 --- a/pkg/sql/copy_from.go +++ b/pkg/sql/copy_from.go @@ -642,9 +642,9 @@ func (c *copyMachine) processCopyData(ctx context.Context, data string, final bo // If we have a full batch of rows or we have exceeded maxRowMem process // them. Only set finalBatch to true if this is the last // CopyData segment AND we have no more data in the buffer. - if len := c.currentBatchSize(); c.rowsMemAcc.Used() > c.maxRowMem || len == c.copyBatchRowSize || batchDone { + if len := c.currentBatchSize(); len > 0 && (c.rowsMemAcc.Used() > c.maxRowMem || len >= c.copyBatchRowSize || batchDone) { if len != c.copyBatchRowSize { - log.VEventf(ctx, 2, "copy batch of %d rows flushing due to memory usage %d > %d", c.batch.Length(), c.rowsMemAcc.Used(), c.maxRowMem) + log.VEventf(ctx, 2, "copy batch of %d rows flushing due to memory usage %d > %d", len, c.rowsMemAcc.Used(), c.maxRowMem) } if err := c.processRows(ctx, final && c.buf.Len() == 0); err != nil { return err @@ -1026,6 +1026,11 @@ func (c *copyMachine) insertRows(ctx context.Context, finalBatch bool) error { // NOTE: in theory we can also retry if c.insertRows == 0. if c.implicitTxn && !c.p.SessionData().CopyFromAtomicEnabled && c.p.SessionData().CopyFromRetriesEnabled && errIsRetriable(err) { log.SqlExec.Infof(ctx, "%s failed on attempt %d and is retrying, error %+v", c.copyFromAST.String(), r.CurrentAttempt(), err) + if c.p.ExecCfg().TestingKnobs.CopyFromInsertRetry != nil { + if err := c.p.ExecCfg().TestingKnobs.CopyFromInsertRetry(); err != nil { + return err + } + } continue } return err diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 2937924be310..fe0c6f55cd63 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1613,6 +1613,9 @@ type ExecutorTestingKnobs struct { // BeforeCopyFromInsert, if set, will be called during a COPY FROM insert statement. BeforeCopyFromInsert func() error + // CopyFromInsertRetry, if set, will be called when a COPY FROM insert statement is retried. + CopyFromInsertRetry func() error + // ForceSQLLivenessSession will force the use of a sqlliveness session for // transaction deadlines even in the system tenant. ForceSQLLivenessSession bool