-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Eliminate semaphore contention #435
Eliminate semaphore contention #435
Conversation
Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
@dgtony, this is great! Thanks for diving so deep. I am back at the desk this week. I gave it a skim but need to look deeper tomorrow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @dgtony, this is a big improvement. I left a couple questions, but LGTM!
defer wg.Done() | ||
p, err := addr.ValueForProtocol(ma.P_P2P) | ||
pid, ok, err := s.callablePeer(addr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice cleanups 🤙
log.Debugf("received %d records in log %s from %s", len(l.Records), logID, pid) | ||
|
||
if l.Log != nil && len(l.Log.Addrs) > 0 { | ||
if err = s.net.store.AddAddrs(tid, logID, addrsFromProto(l.Log.Addrs), pstore.PermanentAddrTTL); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the idea in using AddAddrs
and AddPubKey
(below) instead of AddLog
to be more explicit? Looks like a good change, just curious the motivation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly to avoid moving the head of the log. Method getRecords
now just fetches the records and except adding new addresses and keys is stateless, real processing and state update happens in putRecord
.
net/client.go
Outdated
@@ -315,49 +337,43 @@ func (s *server) pushRecord(ctx context.Context, id thread.ID, lid peer.ID, rec | |||
Body: body, | |||
} | |||
|
|||
logErr := func(addr ma.Multiaddr, f func(addr ma.Multiaddr) error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be a package level func
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
@@ -826,11 +824,25 @@ func (n *net) Validate(id thread.ID, token thread.Token, readOnly bool) (thread. | |||
return token.Validate(n.getPrivKey()) | |||
} | |||
|
|||
// getConnector returns the connector tied to the thread if it exists | |||
func (n *net) addConnector(id thread.ID, conn *app.Connector) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooph, good catch on the unprotected map access 🤦
// from the record possibly won't reach reducers/listeners or even get dispatched. | ||
// 2. Rollback log head to the previous record. In this case record handling will be retried until | ||
// success, but reducers must guarantee its idempotence and there is a chance of getting stuck | ||
// with bad event and not making any progress at all. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good notes! The original thinking here is inline with your warning in (2)... if it failed the first time, chances are it will fail again.
|
||
// Generally broadcasting should not block for too long, i.e. we have to run it | ||
// under the semaphore to ensure consistent order seen by the listeners. Record | ||
// bursts could be overcome by adjusting listener buffers (EventBusCapacity). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
lock.Lock() | ||
fetchedRcs = append(fetchedRcs, recs) | ||
lock.Unlock() | ||
}(lg) | ||
} | ||
wg.Wait() | ||
|
||
// maybe we should preliminary deduplicate records? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in, check for duplicates in fetchedRcs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, in order to remove redundant processing of already seen records
Oop, I left you with a conflict... I think just from this tiny change: f4a9ab7#diff-2fef49f8571a7dfa93a40dec78adc852 |
Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
Here is a little update - take a look, please. We've been testing this branch during the last week and discovered that in a presence of large amount of threads, semaphore protection performs better on a thread-level rather than on a log-level. So there are two types of semaphores introduced: first protects against concurrent pulls of the same thread, and another one guards updates of local thread state. |
Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
…ment/eliminate-semaphore-contention Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
Cool, looks reasonable. @jsign, since you worked on the initial concurrency model, could you give this a skim? |
Well, a lot of things happened since that so I'm not very gymnastic with all the changed code since then. |
Cool, makes sense. Even just a 2 minute skim as a sanity check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
Every time I see the TryAcquire
defaulting to do nothing if the lock is already taken, making that behavior indistinguishable from a real update seems a bit weird. (btw, was always that way).
Maybe something for the future is to return some extra bool to indicate if real work was done or the call was ignored, since the caller can't really know after having a successful return if something really happened or maybe got unlucky and should try again soon.
Anyway, looks like if this hasn't happened yet is because callers doesn't want to have that guarantee. Or maybe that's something that callers doesn't have that clear, since in general when you're calling a method you wouldn't expect that "it might do nothing" and you should retry later (without any way to identify that). Looks like this magical "ignore" style might be error-prone.
Just sharing some thoughts here. Not fully related with this PR change really.
Great, good stuff to think about 👍 @dgtony, are you still testing or do you consider this mergeable? |
We've done with testing, latest version looks stable. Regarding silent But as you said, problem may arise in a future if some caller will assume synchronism, e.g. that thread was definitely updated after return from the |
Yes, until there's some good argument to include a signal that the call was skipped I think it's ok as it is. 👍🏼 |
Using go-threads on a high-degree nodes we detected there is a scalability problem with a current approach of protecting thread operations with semaphores. Currently semaphore remains acquired by an active process during entire flow, including cyclic fetches of record blocks from the network, app-level validation etc. Sometimes it may result in a long time holding the semaphore. Unfortunately several processes trying to acquire the same semaphore become naturally queued even further amplifying waiting times.
To give an idea on how bad it can be, here are the quantiles of per-thread semaphore acquiring time measured in a sliding window during the day and a week respectively:
In the end it prevents normal system operation and leads to a lot of requests failed due to exceeded deadlines.
Here we propose a solution to the problem by decoupling long-running operations and synchronized sections. Network operations and record validation are performed without acquiring the thread semaphore, but concurrent log pullings are still avoided with per-thread-per-log semaphores. Per-thread semaphores now protect only local thread state mutations, record processing and internal bus broadcasting to preserve order of the records.