Skip to content

Commit

Permalink
Merge pull request cockroachdb#62876 from yuzefovich/backport21.1-62785
Browse files Browse the repository at this point in the history
release-21.1: colexecjoin: fix the merge joiner in some cases
  • Loading branch information
yuzefovich authored Apr 2, 2021
2 parents a314396 + e9257e9 commit b981854
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 18 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//pkg/sql/execinfrapb",
"//pkg/sql/sem/tree", # keep
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/duration", # keep
"//pkg/util/mon",
"@com_github_cockroachdb_apd_v2//:apd", # keep
Expand Down
58 changes: 40 additions & 18 deletions pkg/sql/colexec/colexecjoin/mergejoiner_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

package colexecjoin

import "github.com/cockroachdb/cockroach/pkg/util"

// circularGroupsBuffer is a struct designed to store the groups' slices for a
// given column. It starts out small and will grow dynamically if necessary
// until pre-computed maximum capacity (since we know that there is a maximum
Expand Down Expand Up @@ -45,7 +47,12 @@ type circularGroupsBuffer struct {

// groupsBufferInitialSize determines the size used in initial allocations of
// the slices of the circularGroupsBuffer.
const groupsBufferInitialSize = 8
var groupsBufferInitialSize = util.ConstantWithMetamorphicTestRange(
"merge-joiner-groups-buffer",
8, /* defaultValue */
1, /* min */
16, /* max */
)

func makeGroupsBuffer(batchSize int) circularGroupsBuffer {
return circularGroupsBuffer{
Expand Down Expand Up @@ -156,37 +163,52 @@ func (b *circularGroupsBuffer) ensureCapacityForNewGroup() {
newLeftGroups := make([]group, getGroupsSlicesLen(newSize))
newRightGroups := make([]group, getGroupsSlicesLen(newSize))
// Note that this if block is never reached when startIdx == endIdx (because
// that would indicate an empty buffer and we would enough capacity given
// our initialization in makeGroupsBuffer).
// that would indicate an empty buffer and we would have enough capacity
// given our initialization in makeGroupsBuffer).
if b.startIdx <= b.endIdx {
// Current groups are contiguous in the slices, so copying them over is
// simple.
copy(newLeftGroups, b.leftGroups[b.startIdx:b.endIdx])
copy(newRightGroups, b.rightGroups[b.startIdx:b.endIdx])
b.nextColStartIdx -= b.startIdx
} else {
// Current groups are wrapped at position b.capacity. Namely, if we have
// size = 3, capacity = 7, we might have the following:
// buffer = [1, 2, 0', 1', 2', x, 0]
// where startIdx = 6, endIdx = 4, nextColStartIdx = 2, so we need to
// copy over with the adjustments.
// Current groups are wrapped after position b.capacity-1. Namely, if we
// have size = 3, capacity = 7, we might have the following:
// buffer = [1, 2, 0', 1', 2', x, 0] (1)
// where startIdx = 6, endIdx = 5, nextColStartIdx = 2, so we need to
// copy over with the adjustments so that the resulting state is
// buffer = [0, 1, 2, 0', 1', 2', x]
// where startIdx = 0, endIdx = 6, nextColStartIdx = 3.
//
// First, copy over the start of the buffer (which is currently at the
// end of the old slices) into the beginning of the new slices. In the
// example above, we're copying [0].
copy(newLeftGroups, b.leftGroups[b.startIdx:b.capacity])
copy(newRightGroups, b.rightGroups[b.startIdx:b.capacity])
// If non-empty, copy over the end of the buffer (which is currently at
// the beginning of the old slices). In the example above, we're copying
// [1, 2, 0', 1', 2'].
if b.endIdx > 0 {
offset := b.capacity - b.startIdx
copy(newLeftGroups[offset:], b.leftGroups[:b.endIdx])
copy(newRightGroups[offset:], b.rightGroups[:b.endIdx])
b.nextColStartIdx += offset
} else {
// There is a special case when endIdx is 0 - we're simply shifting
// the groups by startIdx to the left (which we have already done)
// which requires adjusting nextColStartIdx accordingly.
//
// Consider the following, size = 3, capacity = 7
// buffer = [x, 0, 1, 2, 0', 1', 2']
// with startIdx = 1, endIdx = 0, nextColStartIdx = 4. We don't need
// to copy anything, but we need to decrement nextColStartIdx by 1.
}
// Now update b.nextColStartIdx. There are two cases:
// 1. it was in the part we copied first. In such scenario we need to
// shift the index towards the beginning by the same offset as when
// we were copying (by b.startIdx).
// For example, consider the following, size = 3, capacity = 7
// buffer = [2', x, 0, 1, 2, 0', 1']
// with startIdx = 2, endIdx = 1, nextColStartIdx = 5. We need to
// decrement nextColStartIdx by 2.
// 2. it was in the part we copied second. In such scenario we need to
// shift the index towards the end by the same offset as when we were
// copying (by b.capacity-b.startIdx). Consider the example (1)
// above: we need to increment nextColStartIdx by 1.
if b.nextColStartIdx >= b.startIdx {
b.nextColStartIdx -= b.startIdx
} else {
b.nextColStartIdx += b.capacity - b.startIdx
}
}
b.startIdx = 0
Expand Down

0 comments on commit b981854

Please sign in to comment.