Skip to content

Commit

Permalink
read parallelism first draft
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-goodisman committed Apr 25, 2024
1 parent 137014e commit a68b0e7
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 51 deletions.
23 changes: 17 additions & 6 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func init() {

// Tail begins tailing the oplog. It doesn't return unless it receives a message
// on the stop channel, in which case it wraps up its work and then returns.
func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool) {
func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) {
childStopC := make(chan bool)
wasStopped := false

Expand All @@ -119,7 +119,7 @@ func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool)

for {
log.Log.Info("Starting oplog tailing")
tailer.tailOnce(out, childStopC)
tailer.tailOnce(out, childStopC, readOrdinal, readParallelism)
log.Log.Info("Oplog tailing ended")

if wasStopped {
Expand All @@ -131,9 +131,7 @@ func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool)
}
}

func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan bool) {
parallelismSize := len(out)

func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) {
session, err := tailer.MongoClient.StartSession()
if err != nil {
log.Log.Errorw("Failed to start Mongo session", "error", err)
Expand Down Expand Up @@ -205,7 +203,13 @@ func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan b

for _, pub := range pubs {
if pub != nil {
outIdx := (pub.ParallelismKey%parallelismSize + parallelismSize) % parallelismSize
inIdx := assignToShard(pub.ParallelismKey, readParallelism)
if inIdx != readOrdinal {
// discard
continue
}
// inIdx and outIdx may be different if there are different #s of read and write routines
outIdx := assignToShard(pub.ParallelismKey, len(out))
out[outIdx] <- pub
} else {
log.Log.Error("Nil Redis publication")
Expand Down Expand Up @@ -512,3 +516,10 @@ func parseNamespace(namespace string) (string, string) {

return database, collection
}

// assignToShard determines which shard should process a given key.
// This should just be key % shardCount, but Go modulo is weird with negative numbers,
// and the parallelism key can be negative.
func assignToShard(key int, shardCount int) int {
return (key%shardCount + shardCount) % shardCount
}
104 changes: 59 additions & 45 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,10 @@ func main() {
panic("Error parsing environment variables: " + err.Error())
}

mongoSession, err := createMongoClient()
if err != nil {
panic("Error initializing oplog tailer: " + err.Error())
}
defer func() {
mongoCloseCtx, cancel := context.WithTimeout(context.Background(), config.MongoConnectTimeout())
defer cancel()

mongoCloseErr := mongoSession.Disconnect(mongoCloseCtx)
if mongoCloseErr != nil {
log.Log.Errorw("Error closing Mongo client", "error", mongoCloseErr)
}
}()
log.Log.Info("Initialized connection to Mongo")

parallelism := config.WriteParallelism()
aggregatedRedisClients := make([][]redis.UniversalClient, parallelism)
aggregatedRedisPubs := make([]chan<- *redispub.Publication, parallelism)
stopRedisPubs := make([]chan bool, parallelism)
writeParallelism := config.WriteParallelism()
aggregatedRedisClients := make([][]redis.UniversalClient, writeParallelism)
aggregatedRedisPubs := make([]chan<- *redispub.Publication, writeParallelism)
stopRedisPubs := make([]chan bool, writeParallelism)

bufferSize := 10000
waitGroup := sync.WaitGroup{}
Expand All @@ -67,7 +52,7 @@ func main() {
if err != nil {
panic(fmt.Sprintf("[%d] Error initializing Redis client: %s", i, err.Error()))
}
defer func() {
defer func(i int) {
for _, redisClient := range redisClients {
redisCloseErr := redisClient.Close()
if redisCloseErr != nil {
Expand All @@ -76,7 +61,7 @@ func main() {
"i", i)
}
}
}()
}(i)
log.Log.Infow("Initialized connection to Redis", "i", i)

aggregatedRedisClients[i] = redisClients
Expand All @@ -96,15 +81,15 @@ func main() {

stopRedisPub := make(chan bool)
waitGroup.Add(1)
go func() {
go func(i int) {
redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{
FlushInterval: config.TimestampFlushInterval(),
DedupeExpiration: config.RedisDedupeExpiration(),
MetadataPrefix: config.RedisMetadataPrefix(),
}, stopRedisPub)
log.Log.Infow("Redis publisher completed", "i", i)
waitGroup.Done()
}()
}(i)
log.Log.Info("Started up processing goroutines")
stopRedisPubs[i] = stopRedisPub
}
Expand All @@ -121,24 +106,49 @@ func main() {
return float64(total)
})

stopOplogTail := make(chan bool)
waitGroup.Add(1)
go func() {
tailer := oplog.Tailer{
MongoClient: mongoSession,
RedisClients: aggregatedRedisClients[0], // the tailer coroutine needs a redis client for determining start timestamp
// it doesn't really matter which one since this isn't a meaningful amount of load, so just take the first one
RedisPrefix: config.RedisMetadataPrefix(),
MaxCatchUp: config.MaxCatchUp(),
// TODO create a separate env var for this
readParallelism := config.WriteParallelism()

stopOplogTails := make([]chan bool, readParallelism)
aggregatedMongoSessions := make([]*mongo.Client, readParallelism)
for i := 0; i < readParallelism; i++ {
mongoSession, err := createMongoClient()
if err != nil {
panic(fmt.Sprintf("[%d] Error initializing oplog tailer: %s", i, err.Error()))
}
tailer.Tail(aggregatedRedisPubs, stopOplogTail)
defer func(i int) {
mongoCloseCtx, cancel := context.WithTimeout(context.Background(), config.MongoConnectTimeout())
defer cancel()

log.Log.Info("Oplog tailer completed")
waitGroup.Done()
}()
mongoCloseErr := mongoSession.Disconnect(mongoCloseCtx)
if mongoCloseErr != nil {
log.Log.Errorw("Error closing Mongo client", "i", i, "error", mongoCloseErr)
}
}(i)
log.Log.Infow("Initialized connection to Mongo", "i", i)
aggregatedMongoSessions[i] = mongoSession

stopOplogTail := make(chan bool)
stopOplogTails[i] = stopOplogTail

waitGroup.Add(1)
go func(i int) {
tailer := oplog.Tailer{
MongoClient: mongoSession,
RedisClients: aggregatedRedisClients[0], // the tailer coroutine needs a redis client for determining start timestamp
// it doesn't really matter which one since this isn't a meaningful amount of load, so just take the first one
RedisPrefix: config.RedisMetadataPrefix(),
MaxCatchUp: config.MaxCatchUp(),
}
tailer.Tail(aggregatedRedisPubs, stopOplogTail, i, readParallelism)

log.Log.Info("Oplog tailer completed")
waitGroup.Done()
}(i)
}

// Start one more goroutine for the HTTP server
httpServer := makeHTTPServer(aggregatedRedisClients, mongoSession)
httpServer := makeHTTPServer(aggregatedRedisClients, aggregatedMongoSessions)
go func() {
httpErr := httpServer.ListenAndServe()
if httpErr != nil {
Expand All @@ -164,7 +174,9 @@ func main() {
log.Log.Warnf("Exiting cleanly due to signal %s. Interrupt again to force unclean shutdown.", sig)
signal.Reset()

stopOplogTail <- true
for _, stopOplogTail := range stopOplogTails {
stopOplogTail <- true
}
for _, stopRedisPub := range stopRedisPubs {
stopRedisPub <- true
}
Expand Down Expand Up @@ -246,7 +258,7 @@ func createRedisClients() ([]redis.UniversalClient, error) {
return ret, nil
}

func makeHTTPServer(aggregatedClients [][]redis.UniversalClient, mongo *mongo.Client) *http.Server {
func makeHTTPServer(aggregatedClients [][]redis.UniversalClient, aggregatedMongos []*mongo.Client) *http.Server {
mux := http.NewServeMux()

mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -265,12 +277,14 @@ func makeHTTPServer(aggregatedClients [][]redis.UniversalClient, mongo *mongo.Cl
ctx, cancel := context.WithTimeout(context.Background(), config.MongoConnectTimeout())
defer cancel()

mongoErr := mongo.Ping(ctx, readpref.Primary())
mongoOK := mongoErr == nil

if !mongoOK {
log.Log.Errorw("Error connecting to Mongo during healthz check",
"error", mongoErr)
mongoOK := true
for _, mongo := range aggregatedMongos {
mongoErr := mongo.Ping(ctx, readpref.Primary())
mongoOK := (mongoOK && (mongoErr == nil))
if !mongoOK {
log.Log.Errorw("Error connecting to Mongo during healthz check",
"error", mongoErr)
}
}

if mongoOK && redisOK {
Expand Down

0 comments on commit a68b0e7

Please sign in to comment.