Skip to content

Commit

Permalink
store: add lock for runtime stats to fix panic caused by concurrent e…
Browse files Browse the repository at this point in the history
…xecution (#18983)
  • Loading branch information
crazycs520 authored Aug 5, 2020
1 parent cd3e5ed commit 8976ffe
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 16 deletions.
9 changes: 9 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ func (s *testSuiteJoin1) TestJoinPanic(c *C) {
tk.MustQuery("SELECT * FROM events e JOIN (SELECT MAX(clock) AS clock FROM events e2 GROUP BY e2.source) e3 ON e3.clock=e.clock")
err := tk.ExecToErr("SELECT * FROM events e JOIN (SELECT clock FROM events e2 GROUP BY e2.source) e3 ON e3.clock=e.clock")
c.Check(err, NotNil)

// Test for PR 18983, use to detect race.
tk.MustExec("use test")
tk.MustExec("drop table if exists tpj1,tpj2;")
tk.MustExec("create table tpj1 (id int, b int, unique index (id));")
tk.MustExec("create table tpj2 (id int, b int, unique index (id));")
tk.MustExec("insert into tpj1 values (1,1);")
tk.MustExec("insert into tpj2 values (1,1);")
tk.MustQuery("select tpj1.b,tpj2.b from tpj1 left join tpj2 on tpj1.id=tpj2.id where tpj1.id=1;").Check(testkit.Rows("1 1"))
}

func (s *testSuite) TestJoinInDisk(c *C) {
Expand Down
40 changes: 25 additions & 15 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ type tikvSnapshot struct {
sync.RWMutex
hitCnt int64
cached map[string][]byte
stats *SnapshotRuntimeStats
}
stats *SnapshotRuntimeStats
}

// newTiKVSnapshot creates a snapshot of an TiKV store.
Expand Down Expand Up @@ -238,7 +238,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
minCommitTSPushed: &s.minCommitTSPushed,
Client: s.store.client,
}
if s.stats != nil {
if s.mu.stats != nil {
cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats)
defer func() {
s.mergeRegionRequestStats(cli.stats)
Expand Down Expand Up @@ -367,7 +367,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
Client: s.store.client,
resolveLite: true,
}
if s.stats != nil {
if s.mu.stats != nil {
cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats)
defer func() {
s.mergeRegionRequestStats(cli.stats)
Expand Down Expand Up @@ -452,7 +452,9 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) {
case kv.TaskID:
s.taskID = val.(uint64)
case kv.CollectRuntimeStats:
s.stats = val.(*SnapshotRuntimeStats)
s.mu.Lock()
s.mu.stats = val.(*SnapshotRuntimeStats)
s.mu.Unlock()
}
}

Expand All @@ -462,7 +464,9 @@ func (s *tikvSnapshot) DelOption(opt kv.Option) {
case kv.ReplicaRead:
s.replicaRead = kv.ReplicaReadLeader
case kv.CollectRuntimeStats:
s.stats = nil
s.mu.Lock()
s.mu.stats = nil
s.mu.Unlock()
}
}

Expand Down Expand Up @@ -579,35 +583,41 @@ func prettyWriteKey(buf *bytes.Buffer, key []byte) {
}

func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) {
if s.stats == nil || bo.totalSleep == 0 {
if s.mu.stats == nil || bo.totalSleep == 0 {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if s.stats.backoffSleepMS == nil {
s.stats.backoffSleepMS = bo.backoffSleepMS
s.stats.backoffTimes = bo.backoffTimes
if s.mu.stats == nil {
return
}
if s.mu.stats.backoffSleepMS == nil {
s.mu.stats.backoffSleepMS = bo.backoffSleepMS
s.mu.stats.backoffTimes = bo.backoffTimes
return
}
for k, v := range bo.backoffSleepMS {
s.stats.backoffSleepMS[k] += v
s.mu.stats.backoffSleepMS[k] += v
}
for k, v := range bo.backoffTimes {
s.stats.backoffTimes[k] += v
s.mu.stats.backoffTimes[k] += v
}
}

func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats) {
s.mu.Lock()
defer s.mu.Unlock()
if s.stats.rpcStats == nil {
s.stats.rpcStats = stats
if s.mu.stats == nil {
return
}
if s.mu.stats.rpcStats == nil {
s.mu.stats.rpcStats = stats
return
}
for k, v := range stats {
stat, ok := s.stats.rpcStats[k]
stat, ok := s.mu.stats.rpcStats[k]
if !ok {
s.stats.rpcStats[k] = v
s.mu.stats.rpcStats[k] = v
continue
}
stat.count += v.count
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,5 +316,5 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) {
snapshot.recordBackoffInfo(bo)
snapshot.recordBackoffInfo(bo)
expect := "Get:{num_rpc:4, total_time:2.002s},txnLockFast_backoff:{num:2, total_time:60 ms}"
c.Assert(snapshot.stats.String(), Equals, expect)
c.Assert(snapshot.mu.stats.String(), Equals, expect)
}

0 comments on commit 8976ffe

Please sign in to comment.