Skip to content

Commit

Permalink
Add metric for catchup failure and increase catchup time to 2 minutes (
Browse files Browse the repository at this point in the history
…#91)

Previously the max catchup time default was 1 minute, which wasn't
always enough to recover from a pod restart. This
1. doubles the time to 2 minutes 
2. and also increases the dedup key TTL from 2m to 2.5m to allow for
covering the catchup time without greatly increasing the number of keys
(25% increase in total dedup keys, same create/expire rate).
3. Also added is `resume_failed` metric that increments each time it
cannot catchup from where it left off.
4. Logging is also improved with the addition of last processed age in
seconds.
  • Loading branch information
eparker-tulip authored Dec 16, 2024
1 parent e6eaeea commit a74d8ba
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 18 deletions.
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

buildGoModule {
pname = "oplogtoredis";
version = "3.8.0";
version = "3.8.1";
src = builtins.path { path = ./.; };

postInstall = ''
Expand Down
4 changes: 2 additions & 2 deletions lib/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type oplogtoredisConfiguration struct {
HTTPServerAddr string `default:"0.0.0.0:9000" envconfig:"HTTP_SERVER_ADDR"`
BufferSize int `default:"10000" split_words:"true"`
TimestampFlushInterval time.Duration `default:"1s" split_words:"true"`
MaxCatchUp time.Duration `default:"60s" split_words:"true"`
RedisDedupeExpiration time.Duration `default:"120s" split_words:"true"`
MaxCatchUp time.Duration `default:"120s" split_words:"true"`
RedisDedupeExpiration time.Duration `default:"150s" split_words:"true"`
RedisMetadataPrefix string `default:"oplogtoredis::" split_words:"true"`
MongoConnectTimeout time.Duration `default:"10s" split_words:"true"`
MongoQueryTimeout time.Duration `default:"5s" split_words:"true"`
Expand Down
20 changes: 10 additions & 10 deletions lib/config/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ var envTests = map[string]struct {
HTTPServerAddr: "0.0.0.0:9000",
BufferSize: 10000,
TimestampFlushInterval: time.Second,
MaxCatchUp: time.Minute,
RedisDedupeExpiration: 2 * time.Minute,
MaxCatchUp: 2 * time.Minute,
RedisDedupeExpiration: 2 * time.Minute + 30 * time.Second,
RedisMetadataPrefix: "oplogtoredis::",
},
},
Expand Down Expand Up @@ -109,42 +109,42 @@ func TestParseEnv(t *testing.T) {

func checkConfigExpectation(t *testing.T, expectedConfig *oplogtoredisConfiguration) {
if expectedConfig.MongoURL != MongoURL() {
t.Errorf("Incorrect Mongo URL. Got \"%s\", Expected \"%s\"",
t.Errorf("Incorrect Mongo URL. Expected \"%s\", Got \"%s\"",
expectedConfig.MongoURL, MongoURL())
}

if expectedConfig.RedisURL != strings.Join(RedisURL()[:], "") {
t.Errorf("Incorrect Redis URL. Got \"%s\", Expected \"%s\"",
t.Errorf("Incorrect Redis URL. Expected \"%s\", Got \"%s\"",
expectedConfig.RedisURL, RedisURL())
}

if expectedConfig.HTTPServerAddr != HTTPServerAddr() {
t.Errorf("Incorrect HTTPServerAddr. Got \"%s\", Expected \"%s\"",
t.Errorf("Incorrect HTTPServerAddr. Expected \"%s\", Got \"%s\"",
expectedConfig.HTTPServerAddr, HTTPServerAddr())
}

if expectedConfig.BufferSize != BufferSize() {
t.Errorf("Incorrect BufferSize. Got %d, Expected %d",
t.Errorf("Incorrect BufferSize. Expected %d, Got %d",
expectedConfig.BufferSize, BufferSize())
}

if expectedConfig.TimestampFlushInterval != TimestampFlushInterval() {
t.Errorf("Incorrect TimestampFlushInterval. Got %d, Expected %d",
t.Errorf("Incorrect TimestampFlushInterval. Expected %d, Got %d",
expectedConfig.TimestampFlushInterval, TimestampFlushInterval())
}

if expectedConfig.MaxCatchUp != MaxCatchUp() {
t.Errorf("Incorrect MaxCatchUp. Got %d, Expected %d",
t.Errorf("Incorrect MaxCatchUp. Expected %d, Got %d",
expectedConfig.MaxCatchUp, MaxCatchUp())
}

if expectedConfig.RedisDedupeExpiration != RedisDedupeExpiration() {
t.Errorf("Incorrect RedisDedupeExpiration. Got %d, Expected %d",
t.Errorf("Incorrect RedisDedupeExpiration. Expected %d, Got %d",
expectedConfig.RedisDedupeExpiration, RedisDedupeExpiration())
}

if expectedConfig.RedisMetadataPrefix != RedisMetadataPrefix() {
t.Errorf("Incorrect RedisMetadataPrefix. Got \"%s\", Expected \"%s\"",
t.Errorf("Incorrect RedisMetadataPrefix. Expected \"%s\", Got \"%s\"",
expectedConfig.RedisMetadataPrefix, RedisMetadataPrefix())
}
}
27 changes: 22 additions & 5 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ var (
Name: "last_received_staleness",
Help: "Gauge recording the difference between this server's clock and the timestamp on the last read oplog entry.",
}, []string{"ordinal"})

metricOplogResumeGap = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "otr",
Subsystem: "oplog",
Name: "resume_gap_seconds",
Help: "Histogram recording the gap in time that a tailing resume had to catchup and whether it was successful or not.",
Buckets: []float64{1, 2.5, 5, 10, 25, 50, 100, 250, 500, 1000},
}, []string{"status"})
)

func init() {
Expand Down Expand Up @@ -440,25 +448,34 @@ func (tailer *Tailer) getStartTime(maxOrdinal int, getTimestampOfLastOplogEntry
// Get the earliest "last processed time" for each shard. This assumes that the number of shards is constant.
ts, tsTime, redisErr := redispub.FirstLastProcessedTimestamp(tailer.RedisClients[0], tailer.RedisPrefix, maxOrdinal)

gapSeconds := time.Since(tsTime) / time.Second

if redisErr == nil {
// we have a last write time, check that it's not too far in the
// past
// we have a last write time, check that it's not too far in the past
if tsTime.After(time.Now().Add(-1 * tailer.MaxCatchUp)) {
log.Log.Infof("Found last processed timestamp, resuming oplog tailing from %d", tsTime.Unix())
log.Log.Infof("Found last processed timestamp, resuming oplog tailing",
"timestamp", tsTime.Unix(),
"age_seconds", gapSeconds)
metricOplogResumeGap.WithLabelValues("success").Observe(float64(gapSeconds))
return ts
}

log.Log.Warnf("Found last processed timestamp, but it was too far in the past (%d). Will start from end of oplog", tsTime.Unix())
log.Log.Warnw("Found last processed timestamp, but it was too far in the past. Will start from end of oplog",
"timestamp", tsTime.Unix(),
"age_seconds", gapSeconds)
}

if (redisErr != nil) && (redisErr != redis.Nil) {
log.Log.Errorw("Error querying Redis for last processed timestamp. Will start from end of oplog.",
"error", redisErr)
}

metricOplogResumeGap.WithLabelValues("failed").Observe(float64(gapSeconds))

mongoOplogEndTimestamp, mongoErr := getTimestampOfLastOplogEntry()
if mongoErr == nil {
log.Log.Infof("Starting tailing from end of oplog (timestamp %d)", mongoOplogEndTimestamp.T)
log.Log.Infow("Starting tailing from end of oplog",
"timestamp", mongoOplogEndTimestamp.T)
return *mongoOplogEndTimestamp
}

Expand Down

0 comments on commit a74d8ba

Please sign in to comment.