-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OplogToRedis: Run writer routines in parallel for each redis client. #75
Conversation
lib/oplog/tail.go
Outdated
@@ -116,7 +116,7 @@ func init() { | |||
|
|||
// Tail begins tailing the oplog. It doesn't return unless it receives a message | |||
// on the stop channel, in which case it wraps up its work and then returns. | |||
func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool) { | |||
func (tailer *Tailer) Tail(out [][]chan<- *redispub.Publication, stop <-chan bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this 2d array of channels is getting complicated-enough that the semantics aren't obvious. could we document it, or use type aliases, or something similar that would make it easier to understand?
main.go
Outdated
|
||
waitGroup.Add(1) | ||
|
||
// We crate two goroutines: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol that was there, i'll do a spelling pass
redisPubsAggregationEntry := make([]chan<- *redispub.Publication, clientsSize) | ||
stopRedisPubsEntry := make([]chan bool, clientsSize) | ||
|
||
for j := 0; j < clientsSize; j++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
general comment, this has grown somewhat hard to follow because of the nested arrays and loops, etc. it would be nice to refactor it to be a bit more abstract when we're back on firmer ground.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think type aliases will help with this somewhat, but will also add some comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the understandability changes; they helped a lot
(small reminder to bump version before merging) |
12f7215
to
c7d89ce
Compare
Revert #75 Also bump to 3.5 for all the parallelism & orchestration changes
Redo #75 in main, count through each redis client and make a separate chan and coroutine for clientsCount * ordinalCount different entries. make some changes to aggregation in order to accomodate this. in the tailer, get the [][]chan and for each incoming publication, first index by the shard ordinal (hash of db name), then iterate through the remaining chans (should be one for each redis client) and publish to all of them. at present, the publisher stream still accepts an array, but now it's only getting one (each). [fixed](66e79df) array index out of bounds issue
in main, count through each redis client and make a separate chan and coroutine for clientsCount * ordinalCount different entries. make some changes to aggregation in order to accomodate this.
in the tailer, get the [][]chan and for each incoming publication, first index by the shard ordinal (hash of db name), then iterate through the remaining chans (should be one for each redis client) and publish to all of them.
at present, the publisher stream still accepts an array, but now it's only getting one (each).