Skip to content

Commit

Permalink
OplogToRedis: Read Parallelism (#72)
Browse files Browse the repository at this point in the history
This is a follow-on to #70.

This adds read parallelism in addition to the write-parallelism. Every read routine has its own Mongo session, and will consume the entire oplog. When processing a message, the same hash that is used to route the message to the appropriate write routine will be first compared to the ordinal of the read routine. If it doesn't match, it will simply be discarded. The result of this is that every message will be processed by exactly one read routine.

The read routines need not correspond one-to-one with the write routines.
  • Loading branch information
alex-goodisman authored May 2, 2024
1 parent c32c6f7 commit 9bff003
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 63 deletions.
2 changes: 2 additions & 0 deletions integration-tests/fault-injection/redisStopStart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ func TestRedisStopStart(t *testing.T) {

nSuccess := harness.FindPromMetricCounter(metrics, "otr_redispub_processed_messages", map[string]string{
"status": "sent",
"idx": "0",
})
if nSuccess != 100 {
t.Errorf("Metric otr_redispub_processed_messages(status: sent) = %d, expected 100", nSuccess)
}

nPermFail := harness.FindPromMetricCounter(metrics, "otr_redispub_processed_messages", map[string]string{
"status": "failed",
"idx": "0",
})
if nPermFail != 0 {
t.Errorf("Metric otr_redispub_processed_messages(status: failed) = %d, expected 0", nPermFail)
Expand Down
9 changes: 9 additions & 0 deletions lib/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type oplogtoredisConfiguration struct {
MongoQueryTimeout time.Duration `default:"5s" split_words:"true"`
OplogV2ExtractSubfieldChanges bool `default:"false" envconfig:"OPLOG_V2_EXTRACT_SUBFIELD_CHANGES"`
WriteParallelism int `default:"1" split_words:"true"`
ReadParallelism int `default:"1" split_words:"true"`
}

var globalConfig *oplogtoredisConfiguration
Expand Down Expand Up @@ -140,6 +141,14 @@ func WriteParallelism() int {
return globalConfig.WriteParallelism
}

// ReadParallelism controls how many parallel read loops will be run. Each read loop has its own mongo
// connection. Each loop consumes the entire oplog, but will hash the database name and discard any
// messages that don't match the loop's ordinal (effectively the same shard algorithm the write loops use).
// Healthz endpoint will report fail if anyone of them dies.
func ReadParallelism() int {
return globalConfig.ReadParallelism
}

// ParseEnv parses the current environment variables and updates the stored
// configuration. It is *not* threadsafe, and should just be called once
// at the start of the program.
Expand Down
52 changes: 38 additions & 14 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package oplog
import (
"context"
"errors"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -102,12 +103,12 @@ var (
Help: "Oplog entries filtered by denylist",
}, []string{"database"})

metricLastReceivedStaleness = promauto.NewGauge(prometheus.GaugeOpts{
metricLastReceivedStaleness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "otr",
Subsystem: "oplog",
Name: "last_received_staleness",
Help: "Gauge recording the difference between this server's clock and the timestamp on the last read oplog entry.",
})
}, []string{"ordinal"})
)

func init() {
Expand All @@ -116,7 +117,7 @@ func init() {

// Tail begins tailing the oplog. It doesn't return unless it receives a message
// on the stop channel, in which case it wraps up its work and then returns.
func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool) {
func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) {
childStopC := make(chan bool)
wasStopped := false

Expand All @@ -128,7 +129,7 @@ func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool)

for {
log.Log.Info("Starting oplog tailing")
tailer.tailOnce(out, childStopC)
tailer.tailOnce(out, childStopC, readOrdinal, readParallelism)
log.Log.Info("Oplog tailing ended")

if wasStopped {
Expand All @@ -140,9 +141,7 @@ func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool)
}
}

func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan bool) {
parallelismSize := len(out)

func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) {
session, err := tailer.MongoClient.StartSession()
if err != nil {
log.Log.Errorw("Failed to start Mongo session", "error", err)
Expand All @@ -151,7 +150,7 @@ func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan b

oplogCollection := session.Client().Database("local").Collection("oplog.rs")

startTime := tailer.getStartTime(parallelismSize-1, func() (*primitive.Timestamp, error) {
startTime := tailer.getStartTime(len(out)-1, func() (*primitive.Timestamp, error) {
// Get the timestamp of the last entry in the oplog (as a position to
// start from if we don't have a last-written timestamp from Redis)
var entry rawOplogEntry
Expand Down Expand Up @@ -206,15 +205,31 @@ func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan b
continue
}

ts, pubs := tailer.unmarshalEntry(rawData, tailer.Denylist)
ts, pubs, sendMetricsData := tailer.unmarshalEntry(rawData, tailer.Denylist, readOrdinal)

if ts != nil {
lastTimestamp = *ts
}

// we only want to send metrics data once for the whole batch
metricsDataSent := false

for _, pub := range pubs {
if pub != nil {
outIdx := (pub.ParallelismKey%parallelismSize + parallelismSize) % parallelismSize
inIdx := assignToShard(pub.ParallelismKey, readParallelism)
if inIdx != readOrdinal {
// discard this publication
continue
}

// send metrics data only if we didnt discard all the publications due to sharding
if !metricsDataSent && sendMetricsData != nil {
metricsDataSent = true
sendMetricsData()
}

// inIdx and outIdx may be different if there are different #s of read and write routines
outIdx := assignToShard(pub.ParallelismKey, len(out))
out[outIdx] <- pub
} else {
log.Log.Error("Nil Redis publication")
Expand Down Expand Up @@ -340,7 +355,7 @@ func closeCursor(cursor *mongo.Cursor) {
//
// The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it
// ignored it or failed at some later step.
func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map) (timestamp *primitive.Timestamp, pubs []*redispub.Publication) {
func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) {
var result rawOplogEntry

err := bson.Unmarshal(rawData, &result)
Expand All @@ -357,16 +372,16 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map) (time
status := "ignored"
database := "(no database)"
messageLen := float64(len(rawData))
metricLastReceivedStaleness.Set(float64(time.Since(time.Unix(int64(timestamp.T), 0))))

defer func() {
sendMetricsData = func() {
// TODO: remove these in a future version
metricOplogEntriesReceived.WithLabelValues(database, status).Inc()
metricOplogEntriesReceivedSize.WithLabelValues(database).Add(messageLen)

metricOplogEntriesBySize.WithLabelValues(database, status).Observe(messageLen)
metricMaxOplogEntryByMinute.Report(messageLen, database, status)
}()
metricLastReceivedStaleness.WithLabelValues(strconv.Itoa(readOrdinal)).Set(float64(time.Since(time.Unix(int64(timestamp.T), 0))))
}

if len(entries) > 0 {
database = entries[0].Database
Expand Down Expand Up @@ -529,3 +544,12 @@ func parseNamespace(namespace string) (string, string) {

return database, collection
}

// assignToShard determines which shard should process a given key.
// The key is an integer generated from random bytes, which means it can be negative.
// In Go, A%B when A is negative produces a negative result, which is inadequate as an
// array index. We fix this by doing (A%B+B)%B, which is identical to A%B for positive A
// and produces the expected result for negative A.
func assignToShard(key int, shardCount int) int {
return (key%shardCount + shardCount) % shardCount
}
18 changes: 12 additions & 6 deletions lib/redispub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ var metricSentMessages = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "otr",
Subsystem: "redispub",
Name: "processed_messages",
Help: "Messages processed by Redis publisher, partitioned by whether or not we successfully sent them",
}, []string{"status"})
Help: "Messages processed by Redis publisher, partitioned by whether or not we successfully sent them and publish function index",
}, []string{"status", "idx"})

var metricTemporaryFailures = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "otr",
Expand Down Expand Up @@ -103,8 +103,14 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts
publishFns = append(publishFns, publishFn)
}

metricSendFailed := metricSentMessages.WithLabelValues("failed")
metricSendSuccess := metricSentMessages.WithLabelValues("sent")
publishFnsCount := len(publishFns)
metricsSendFailed := make([]prometheus.Counter, publishFnsCount) //metricSentMessages.WithLabelValues("failed")
metricsSendSuccess := make([]prometheus.Counter, publishFnsCount) //metricSentMessages.WithLabelValues("sent")
for i := 0; i < publishFnsCount; i++ {
idx := strconv.Itoa(i)
metricsSendFailed[i] = metricSentMessages.WithLabelValues("failed", idx)
metricsSendSuccess[i] = metricSentMessages.WithLabelValues("sent", idx)
}

for {
select {
Expand All @@ -119,12 +125,12 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts
log.Log.Debugw("Published to", "idx", i)

if err != nil {
metricSendFailed.Inc()
metricsSendFailed[i].Inc()
log.Log.Errorw("Permanent error while trying to publish message; giving up",
"error", err,
"message", p)
} else {
metricSendSuccess.Inc()
metricsSendSuccess[i].Inc()

// We want to make sure we do this *after* we've successfully published
// the messages
Expand Down
99 changes: 56 additions & 43 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,16 @@ func main() {
panic("Error parsing environment variables: " + err.Error())
}

mongoSession, err := createMongoClient()
if err != nil {
panic("Error initializing oplog tailer: " + err.Error())
}
defer func() {
mongoCloseCtx, cancel := context.WithTimeout(context.Background(), config.MongoConnectTimeout())
defer cancel()

mongoCloseErr := mongoSession.Disconnect(mongoCloseCtx)
if mongoCloseErr != nil {
log.Log.Errorw("Error closing Mongo client", "error", mongoCloseErr)
}
}()
log.Log.Info("Initialized connection to Mongo")

parallelism := config.WriteParallelism()
aggregatedRedisClients := make([][]redis.UniversalClient, parallelism)
aggregatedRedisPubs := make([]chan<- *redispub.Publication, parallelism)
stopRedisPubs := make([]chan bool, parallelism)
writeParallelism := config.WriteParallelism()
aggregatedRedisClients := make([][]redis.UniversalClient, writeParallelism)
aggregatedRedisPubs := make([]chan<- *redispub.Publication, writeParallelism)
stopRedisPubs := make([]chan bool, writeParallelism)

bufferSize := 10000
waitGroup := sync.WaitGroup{}
denylist := sync.Map{}

for i := 0; i < config.WriteParallelism(); i++ {
for i := 0; i < writeParallelism; i++ {
redisClients, err := createRedisClients()
if err != nil {
panic(fmt.Sprintf("[%d] Error initializing Redis client: %s", i, err.Error()))
Expand Down Expand Up @@ -121,25 +106,49 @@ func main() {
})
}

stopOplogTail := make(chan bool)
waitGroup.Add(1)
go func() {
tailer := oplog.Tailer{
MongoClient: mongoSession,
RedisClients: aggregatedRedisClients[0], // the tailer coroutine needs a redis client for determining start timestamp
// it doesn't really matter which one since this isn't a meaningful amount of load, so just take the first one
RedisPrefix: config.RedisMetadataPrefix(),
MaxCatchUp: config.MaxCatchUp(),
Denylist: &denylist,
readParallelism := config.ReadParallelism()

stopOplogTails := make([]chan bool, readParallelism)
aggregatedMongoSessions := make([]*mongo.Client, readParallelism)
for i := 0; i < readParallelism; i++ {
mongoSession, err := createMongoClient()
if err != nil {
panic(fmt.Sprintf("[%d] Error initializing oplog tailer: %s", i, err.Error()))
}
tailer.Tail(aggregatedRedisPubs, stopOplogTail)
defer func(i int) {
mongoCloseCtx, cancel := context.WithTimeout(context.Background(), config.MongoConnectTimeout())
defer cancel()

log.Log.Info("Oplog tailer completed")
waitGroup.Done()
}()
mongoCloseErr := mongoSession.Disconnect(mongoCloseCtx)
if mongoCloseErr != nil {
log.Log.Errorw("Error closing Mongo client", "i", i, "error", mongoCloseErr)
}
}(i)
log.Log.Infow("Initialized connection to Mongo", "i", i)
aggregatedMongoSessions[i] = mongoSession

stopOplogTail := make(chan bool)
stopOplogTails[i] = stopOplogTail

waitGroup.Add(1)
go func(i int) {
tailer := oplog.Tailer{
MongoClient: mongoSession,
RedisClients: aggregatedRedisClients[0], // the tailer coroutine needs a redis client for determining start timestamp
// it doesn't really matter which one since this isn't a meaningful amount of load, so just take the first one
RedisPrefix: config.RedisMetadataPrefix(),
MaxCatchUp: config.MaxCatchUp(),
Denylist: &denylist,
}
tailer.Tail(aggregatedRedisPubs, stopOplogTail, i, readParallelism)

log.Log.Info("Oplog tailer completed")
waitGroup.Done()
}(i)
}

// Start one more goroutine for the HTTP server
httpServer := makeHTTPServer(aggregatedRedisClients, mongoSession, &denylist)
httpServer := makeHTTPServer(aggregatedRedisClients, aggregatedMongoSessions, &denylist)
go func() {
httpErr := httpServer.ListenAndServe()
if httpErr != nil {
Expand All @@ -165,7 +174,9 @@ func main() {
log.Log.Warnf("Exiting cleanly due to signal %s. Interrupt again to force unclean shutdown.", sig)
signal.Reset()

stopOplogTail <- true
for _, stopOplogTail := range stopOplogTails {
stopOplogTail <- true
}
for _, stopRedisPub := range stopRedisPubs {
stopRedisPub <- true
}
Expand Down Expand Up @@ -247,7 +258,7 @@ func createRedisClients() ([]redis.UniversalClient, error) {
return ret, nil
}

func makeHTTPServer(aggregatedClients [][]redis.UniversalClient, mongo *mongo.Client, denylistMap *sync.Map) *http.Server {
func makeHTTPServer(aggregatedClients [][]redis.UniversalClient, aggregatedMongos []*mongo.Client, denylistMap *sync.Map) *http.Server {
mux := http.NewServeMux()

mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -266,12 +277,14 @@ func makeHTTPServer(aggregatedClients [][]redis.UniversalClient, mongo *mongo.Cl
ctx, cancel := context.WithTimeout(context.Background(), config.MongoConnectTimeout())
defer cancel()

mongoErr := mongo.Ping(ctx, readpref.Primary())
mongoOK := mongoErr == nil

if !mongoOK {
log.Log.Errorw("Error connecting to Mongo during healthz check",
"error", mongoErr)
mongoOK := true
for _, mongo := range aggregatedMongos {
mongoErr := mongo.Ping(ctx, readpref.Primary())
mongoOK = (mongoOK && (mongoErr == nil))
if !mongoOK {
log.Log.Errorw("Error connecting to Mongo during healthz check",
"error", mongoErr)
}
}

if mongoOK && redisOK {
Expand Down

0 comments on commit 9bff003

Please sign in to comment.