Skip to content

Commit

Permalink
Merge pull request #9511 from jcalvert/index_compaction_breakup
Browse files Browse the repository at this point in the history
mvcc: Clone for batch index compaction and shorten lock
  • Loading branch information
gyuho authored Apr 18, 2018
2 parents d0847f4 + f176427 commit e5c9483
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-3.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [
- e.g. a node is removed from cluster, or [`raftpb.MsgProp` arrives at current leader while there is an ongoing leadership transfer](https://github.com/coreos/etcd/issues/8975).
- Add [`snapshot`](https://github.com/coreos/etcd/pull/9118) package for easier snapshot workflow (see [`godoc.org/github.com/etcd/snapshot`](https://godoc.org/github.com/coreos/etcd/snapshot) for more).
- Improve [functional tester](https://github.com/coreos/etcd/tree/master/functional) coverage: [proxy layer to run network fault tests in CI](https://github.com/coreos/etcd/pull/9081), [TLS is enabled both for server and client](https://github.com/coreos/etcd/pull/9534), [liveness mode](https://github.com/coreos/etcd/issues/9230), [shuffle test sequence](https://github.com/coreos/etcd/issues/9381), [membership reconfiguration failure cases](https://github.com/coreos/etcd/pull/9564), [disastrous quorum loss and snapshot recover from a seed member](https://github.com/coreos/etcd/pull/9565), [embedded etcd](https://github.com/coreos/etcd/pull/9572).
- Improve [index compaction blocking](https://github.com/coreos/etcd/pull/9511) by using a copy on write clone to avoid holding the lock for the traversal of the entire index.

### Breaking Changes

Expand Down
44 changes: 20 additions & 24 deletions mvcc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,27 +185,34 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {

func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
var emptyki []*keyIndex
if ti.lg != nil {
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
} else {
plog.Printf("store.index: compact %d", rev)
}
// TODO: do not hold the lock for long time?
// This is probably OK. Compacting 10M keys takes O(10ms).
ti.Lock()
defer ti.Unlock()
ti.tree.Ascend(compactIndex(rev, available, &emptyki))
for _, ki := range emptyki {
item := ti.tree.Delete(ki)
if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
clone := ti.tree.Clone()
ti.Unlock()

clone.Ascend(func(item btree.Item) bool {
keyi := item.(*keyIndex)
//Lock is needed here to prevent modification to the keyIndex while
//compaction is going on or revision added to empty before deletion
ti.Lock()
keyi.compact(rev, available)
if keyi.isEmpty() {
item := ti.tree.Delete(keyi)
if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
}
}
}
}
ti.Unlock()
return true
})
return available
}

Expand All @@ -222,17 +229,6 @@ func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
return available
}

func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool {
return func(i btree.Item) bool {
keyi := i.(*keyIndex)
keyi.compact(rev, available)
if keyi.isEmpty() {
*emptyki = append(*emptyki, keyi)
}
return true
}
}

func (ti *treeIndex) Equal(bi index) bool {
b := bi.(*treeIndex)

Expand Down
42 changes: 42 additions & 0 deletions mvcc/index_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mvcc

import (
"testing"

"go.uber.org/zap"
)

func BenchmarkIndexCompact1(b *testing.B) { benchmarkIndexCompact(b, 1) }
func BenchmarkIndexCompact100(b *testing.B) { benchmarkIndexCompact(b, 100) }
func BenchmarkIndexCompact10000(b *testing.B) { benchmarkIndexCompact(b, 10000) }
func BenchmarkIndexCompact100000(b *testing.B) { benchmarkIndexCompact(b, 100000) }
func BenchmarkIndexCompact1000000(b *testing.B) { benchmarkIndexCompact(b, 1000000) }

func benchmarkIndexCompact(b *testing.B, size int) {
log := zap.NewNop()
kvindex := newTreeIndex(log)

bytesN := 64
keys := createBytesSlice(bytesN, size)
for i := 1; i < size; i++ {
kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)})
}
b.ResetTimer()
for i := 1; i < b.N; i++ {
kvindex.Compact(int64(i))
}
}
9 changes: 6 additions & 3 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,18 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev

func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.revMu.Lock()
defer s.revMu.Unlock()

if rev <= s.compactMainRev {
ch := make(chan struct{})
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
s.fifoSched.Schedule(f)
s.mu.Unlock()
s.revMu.Unlock()
return ch, ErrCompacted
}
if rev > s.currentRev {
s.mu.Unlock()
s.revMu.Unlock()
return nil, ErrFutureRev
}

Expand All @@ -245,6 +246,8 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
// ensure that desired compaction is persisted
s.b.ForceCommit()

s.mu.Unlock()
s.revMu.Unlock()
keep := s.kvindex.Compact(rev)
ch := make(chan struct{})
var j = func(ctx context.Context) {
Expand Down

0 comments on commit e5c9483

Please sign in to comment.