diff --git a/pkg/sql/colexec/colexecjoin/BUILD.bazel b/pkg/sql/colexec/colexecjoin/BUILD.bazel index 11a21e7ee9e8..a44ed456ad3e 100644 --- a/pkg/sql/colexec/colexecjoin/BUILD.bazel +++ b/pkg/sql/colexec/colexecjoin/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//pkg/sql/execinfrapb", "//pkg/sql/sem/tree", # keep "//pkg/sql/types", + "//pkg/util", "//pkg/util/duration", # keep "//pkg/util/mon", "@com_github_cockroachdb_apd_v2//:apd", # keep diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner_util.go b/pkg/sql/colexec/colexecjoin/mergejoiner_util.go index 78b299b0a1bd..74df84023034 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner_util.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner_util.go @@ -10,6 +10,8 @@ package colexecjoin +import "github.com/cockroachdb/cockroach/pkg/util" + // circularGroupsBuffer is a struct designed to store the groups' slices for a // given column. It starts out small and will grow dynamically if necessary // until pre-computed maximum capacity (since we know that there is a maximum @@ -45,7 +47,12 @@ type circularGroupsBuffer struct { // groupsBufferInitialSize determines the size used in initial allocations of // the slices of the circularGroupsBuffer. -const groupsBufferInitialSize = 8 +var groupsBufferInitialSize = util.ConstantWithMetamorphicTestRange( + "merge-joiner-groups-buffer", + 8, /* defaultValue */ + 1, /* min */ + 16, /* max */ +) func makeGroupsBuffer(batchSize int) circularGroupsBuffer { return circularGroupsBuffer{ @@ -156,8 +163,8 @@ func (b *circularGroupsBuffer) ensureCapacityForNewGroup() { newLeftGroups := make([]group, getGroupsSlicesLen(newSize)) newRightGroups := make([]group, getGroupsSlicesLen(newSize)) // Note that this if block is never reached when startIdx == endIdx (because - // that would indicate an empty buffer and we would enough capacity given - // our initialization in makeGroupsBuffer). + // that would indicate an empty buffer and we would have enough capacity + // given our initialization in makeGroupsBuffer). if b.startIdx <= b.endIdx { // Current groups are contiguous in the slices, so copying them over is // simple. @@ -165,28 +172,43 @@ func (b *circularGroupsBuffer) ensureCapacityForNewGroup() { copy(newRightGroups, b.rightGroups[b.startIdx:b.endIdx]) b.nextColStartIdx -= b.startIdx } else { - // Current groups are wrapped at position b.capacity. Namely, if we have - // size = 3, capacity = 7, we might have the following: - // buffer = [1, 2, 0', 1', 2', x, 0] - // where startIdx = 6, endIdx = 4, nextColStartIdx = 2, so we need to - // copy over with the adjustments. + // Current groups are wrapped after position b.capacity-1. Namely, if we + // have size = 3, capacity = 7, we might have the following: + // buffer = [1, 2, 0', 1', 2', x, 0] (1) + // where startIdx = 6, endIdx = 5, nextColStartIdx = 2, so we need to + // copy over with the adjustments so that the resulting state is + // buffer = [0, 1, 2, 0', 1', 2', x] + // where startIdx = 0, endIdx = 6, nextColStartIdx = 3. + // + // First, copy over the start of the buffer (which is currently at the + // end of the old slices) into the beginning of the new slices. In the + // example above, we're copying [0]. copy(newLeftGroups, b.leftGroups[b.startIdx:b.capacity]) copy(newRightGroups, b.rightGroups[b.startIdx:b.capacity]) + // If non-empty, copy over the end of the buffer (which is currently at + // the beginning of the old slices). In the example above, we're copying + // [1, 2, 0', 1', 2']. if b.endIdx > 0 { offset := b.capacity - b.startIdx copy(newLeftGroups[offset:], b.leftGroups[:b.endIdx]) copy(newRightGroups[offset:], b.rightGroups[:b.endIdx]) - b.nextColStartIdx += offset - } else { - // There is a special case when endIdx is 0 - we're simply shifting - // the groups by startIdx to the left (which we have already done) - // which requires adjusting nextColStartIdx accordingly. - // - // Consider the following, size = 3, capacity = 7 - // buffer = [x, 0, 1, 2, 0', 1', 2'] - // with startIdx = 1, endIdx = 0, nextColStartIdx = 4. We don't need - // to copy anything, but we need to decrement nextColStartIdx by 1. + } + // Now update b.nextColStartIdx. There are two cases: + // 1. it was in the part we copied first. In such scenario we need to + // shift the index towards the beginning by the same offset as when + // we were copying (by b.startIdx). + // For example, consider the following, size = 3, capacity = 7 + // buffer = [2', x, 0, 1, 2, 0', 1'] + // with startIdx = 2, endIdx = 1, nextColStartIdx = 5. We need to + // decrement nextColStartIdx by 2. + // 2. it was in the part we copied second. In such scenario we need to + // shift the index towards the end by the same offset as when we were + // copying (by b.capacity-b.startIdx). Consider the example (1) + // above: we need to increment nextColStartIdx by 1. + if b.nextColStartIdx >= b.startIdx { b.nextColStartIdx -= b.startIdx + } else { + b.nextColStartIdx += b.capacity - b.startIdx } } b.startIdx = 0