Skip to content

Commit

Permalink
Add retry to recover from transient errors when checking the tens of …
Browse files Browse the repository at this point in the history
…thousands of series entries
  • Loading branch information
robskillington committed Jun 7, 2022
1 parent 28e54e1 commit 18a5169
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 28 deletions.
91 changes: 69 additions & 22 deletions src/dbnode/integration/graphite_find_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,16 @@ local:
}()

// Check each level of the tree can answer expected queries.
type checkResult struct {
leavesVerified int
}
type checkFailure struct {
expected graphiteFindResults
actual graphiteFindResults
failMsg string
}
var (
verifyFindQueries func(node *graphiteNode, level int)
verifyFindQueries func(node *graphiteNode, level int) (checkResult, *checkFailure, error)
parallelVerifyFindQueries func(node *graphiteNode, level int)
checkedSeriesAbort = atomic.NewBool(false)
numSeriesChecking = uint64(len(generateSeries))
Expand All @@ -288,22 +296,58 @@ local:
)
workerPool.Init()
parallelVerifyFindQueries = func(node *graphiteNode, level int) {
// Verify this node at level.
wg.Add(1)
workerPool.Go(func() {
verifyFindQueries(node, level)
wg.Done()
defer wg.Done()

if checkedSeriesAbort.Load() {
// Do not execute if aborted.
return
}

var (
result checkResult
failure = &checkFailure{}
err = fmt.Errorf("initial error")
)
for attempt := 0; (failure != nil || err != nil) && attempt < 2; attempt++ {
if attempt > 0 {
// Retry transient errors (should add a strict mode for this test
// avoid allowing transient errors too).
time.Sleep(5*time.Millsecond)
}
result, failure, err = verifyFindQueries(node, level)
}
if failure == nil && err == nil {
// Account for series checked (for progress report).
checkedSeries.Add(uint64(result.leavesVerified))
return
}

// Bail parallel execution (failed require/assert won't stop execution).
if checkedSeriesAbort.CAS(false, true) {
switch {
case failure != nil:
// Assert an error result and log once.
assert.Equal(t, failure.expected, failure.actual, failure.failMsg)
log.Error("aborting checks due to mismatch")
case err != nil:
assert.NoError(t, err)
log.Error("aborting checks due to error")
default:

}
}
})

// Verify children of children.
for _, child := range node.children {
parallelVerifyFindQueries(child, level+1)
}
}
verifyFindQueries = func(node *graphiteNode, level int) {
if checkedSeriesAbort.Load() {
// Do not execute if aborted.
return
}
verifyFindQueries = func(node *graphiteNode, level int) (checkResult, *checkFailure, error) {
var r checkResult

// Write progress report if progress made.
checked := checkedSeries.Load()
Expand Down Expand Up @@ -332,22 +376,28 @@ local:
require.NoError(t, err)

res, err := httpClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)
if err != nil {
return r, nil, err
}
if res.StatusCode != http.StatusOK {
return r, nil, fmt.Errorf("bad response code: expected=%d, actual=%d",
http.StatusOK, res.StatusCode)
}

defer res.Body.Close()

// Compare results.
var actual graphiteFindResults
require.NoError(t, json.NewDecoder(res.Body).Decode(&actual))
if err := json.NewDecoder(res.Body).Decode(&actual); err != nil {
return r, nil, err
}

expected := make(graphiteFindResults, 0, len(node.children))
leaves := 0
for _, child := range node.children {
leaf := 0
if child.isLeaf {
leaf = 1
leaves++
r.leavesVerified++
}
expected = append(expected, graphiteFindResult{
Text: child.name,
Expand All @@ -364,17 +414,14 @@ local:
failMsg += fmt.Sprintf("\n\ndiff:\n%s\n\n",
xtest.Diff(xtest.MustPrettyJSONObject(t, expected),
xtest.MustPrettyJSONObject(t, actual)))
// Bail parallel execution (failed require/assert won't stop execution).
if checkedSeriesAbort.CAS(false, true) {
// Assert an error result and log once.
assert.Equal(t, expected, actual, failMsg)
log.Error("aborting checks")
}
return
return r, &checkFailure{
expected: expected,
actual: actual,
failMsg: failMsg,
}, nil
}

// Account for series checked (for progress report).
checkedSeries.Add(uint64(leaves))
return r, nil, nil
}

// Check all top level entries and recurse.
Expand Down
12 changes: 6 additions & 6 deletions src/dbnode/storage/coldflush.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ func (m *coldFlushManager) Run(t xtime.UnixNano) bool {
m.Unlock()
}()

debugLog := m.log.Check(zapcore.DebugLevel, "cold flush run")
if debugLog != nil {
debugLog.Write(zap.String("status", "starting cold flush"), zap.Time("time", t.ToTime()))

if log := m.log.Check(zapcore.DebugLevel, "cold flush run start"); log != nil {
log.Write(zap.Time("time", t.ToTime()))
}

// NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks.
Expand All @@ -132,9 +132,9 @@ func (m *coldFlushManager) Run(t xtime.UnixNano) bool {
zap.Time("time", t.ToTime()), zap.Error(err))
})
}

if debugLog != nil {
debugLog.Write(zap.String("status", "completed cold flush"), zap.Time("time", t.ToTime()))
if log := m.log.Check(zapcore.DebugLevel, "cold flush run complete"); log != nil {
log.Write(zap.Time("time", t.ToTime()))
}

return true
Expand Down

0 comments on commit 18a5169

Please sign in to comment.