Skip to content

Commit

Permalink
Revert "sink-to-mysql(cdc) simplify conflict detector, prevent worklo…
Browse files Browse the repository at this point in the history
…ad skew issue (#10376)"

This reverts commit 9e0cacf.
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Apr 18, 2024
1 parent 6930d87 commit 41ee557
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 61 deletions.
33 changes: 20 additions & 13 deletions pkg/causality/conflict_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct {
// nextWorkerID is used to dispatch transactions round-robin.
nextWorkerID atomic.Int64

// Used to run a background goroutine to GC.
garbageNodes *chann.DrainableChann[txnFinishedEvent]
wg sync.WaitGroup
closeCh chan struct{}
// Used to run a background goroutine to GC or notify nodes.
notifiedNodes *chann.DrainableChann[func()]
garbageNodes *chann.DrainableChann[txnFinishedEvent]
wg sync.WaitGroup
closeCh chan struct{}
}

type txnFinishedEvent struct {
Expand All @@ -53,11 +54,12 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent](
numSlots uint64,
) *ConflictDetector[Worker, Txn] {
ret := &ConflictDetector[Worker, Txn]{
workers: workers,
slots: internal.NewSlots[*internal.Node](numSlots),
numSlots: numSlots,
garbageNodes: chann.NewAutoDrainChann[txnFinishedEvent](),
closeCh: make(chan struct{}),
workers: workers,
slots: internal.NewSlots[*internal.Node](numSlots),
numSlots: numSlots,
notifiedNodes: chann.NewAutoDrainChann[func()](),
garbageNodes: chann.NewAutoDrainChann[txnFinishedEvent](),
closeCh: make(chan struct{}),
}

ret.wg.Add(1)
Expand All @@ -76,23 +78,23 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent](
func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) {
sortedKeysHash := txn.GenSortedDedupKeysHash(d.numSlots)
node := internal.NewNode()
// SendToWorker is called after all dependencies are removed.
node.SendToWorker = func(workerID int64) {
node.OnResolved = func(workerID int64) {
// This callback is called after the transaction is executed.
postTxnExecuted := func() {
// After this transaction is executed, we can remove the node from the graph,
// and remove related dependencies for these transacitons which depend on this
// and resolve related dependencies for these transacitons which depend on this
// executed transaction.
node.Remove()

// Send this node to garbageNodes to GC it from the slots if this node is still
// occupied related slots.
d.garbageNodes.In() <- txnFinishedEvent{node, sortedKeysHash}
}
// Send this txn to related worker as soon as all dependencies are removed.
// Send this txn to related worker as soon as all dependencies are resolved.
d.sendToWorker(txn, postTxnExecuted, workerID)
}
node.RandWorkerID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.workers)) }
node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback }
d.slots.Add(node, sortedKeysHash)
}

Expand All @@ -104,12 +106,17 @@ func (d *ConflictDetector[Worker, Txn]) Close() {

func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() {
defer func() {
d.notifiedNodes.CloseAndDrain()
d.garbageNodes.CloseAndDrain()
}()
for {
select {
case <-d.closeCh:
return
case notifyCallback := <-d.notifiedNodes.Out():
if notifyCallback != nil {
notifyCallback()
}
case event := <-d.garbageNodes.Out():
if event.node != nil {
d.slots.Free(event.node, event.sortedKeysHash)
Expand Down
157 changes: 122 additions & 35 deletions pkg/causality/internal/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package internal

import (
"fmt"
"sync"
stdatomic "sync/atomic"

Expand Down Expand Up @@ -49,14 +48,18 @@ type Node struct {
// Immutable fields.
id int64

// SendToWorker is used to send the node to a worker.
SendToWorker func(id workerID)
// RandWorkerID is used to select a worker randomly.
// Called when all dependencies are resolved.
OnResolved func(id workerID)
// Set the id generator to get a random ID.
RandWorkerID func() workerID
// Set the callback that the node is notified.
OnNotified func(callback func())

// Following fields are used for notifying a node's dependers lock-free.
totalDependencies int32
removedDependencies int32
totalDependencies int32
resolvedDependencies int32
removedDependencies int32
resolvedList []int64

// Following fields are protected by `mu`.
mu sync.Mutex
Expand All @@ -82,10 +85,12 @@ type Node struct {
func NewNode() (ret *Node) {
defer func() {
ret.id = genNextNodeID()
ret.SendToWorker = nil
ret.OnResolved = nil
ret.RandWorkerID = nil
ret.totalDependencies = 0
ret.resolvedDependencies = 0
ret.removedDependencies = 0
ret.resolvedList = nil
ret.assignedTo = unassigned
ret.removed = false
}()
Expand All @@ -101,20 +106,42 @@ func (n *Node) NodeID() int64 {

// DependOn implements interface internal.SlotNode.
func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) {
resolvedDependencies, removedDependencies := int32(0), int32(0)

depend := func(target *Node) {
if target == nil {
// For a given Node, every dependency corresponds to a target.
// If target is nil it means the dependency doesn't conflict
// with any other nodes. However it's still necessary to track
// it because Node.tryResolve needs to counting the number of
// resolved dependencies.
resolvedDependencies = stdatomic.AddInt32(&n.resolvedDependencies, 1)
stdatomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], assignedToAny)
removedDependencies = stdatomic.AddInt32(&n.removedDependencies, 1)
return
}

if target.id == n.id {
panic("cannot depend on yourself")
panic("you cannot depend on yourself")
}

// The target node might be removed or modified in other places, for example
// after its corresponding transaction has been executed.
target.mu.Lock()
defer target.mu.Unlock()

// Add the node to the target's dependers if the target is not removed.
if target.assignedTo != unassigned {
// The target has already been assigned to a worker.
// In this case, record the worker ID in `resolvedList`, and this node
// probably can be sent to the same worker and executed sequentially.
resolvedDependencies = stdatomic.AddInt32(&n.resolvedDependencies, 1)
stdatomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], target.assignedTo)
}

// Add the node to the target's dependers if the target has not been removed.
if target.removed {
// The target has already been removed.
stdatomic.AddInt32(&n.removedDependencies, 1)
removedDependencies = stdatomic.AddInt32(&n.removedDependencies, 1)
} else if _, exist := target.getOrCreateDependers().ReplaceOrInsert(n); exist {
// Should never depend on a target redundantly.
panic("should never exist")
Expand All @@ -126,19 +153,24 @@ func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int)
// ?: why gen new ID here?
n.id = genNextNodeID()

// `totalDependcies` must be initialized before depending on any targets.
// `totalDependcies` and `resolvedList` must be initialized before depending on any targets.
n.totalDependencies = int32(len(dependencyNodes) + noDependencyKeyCnt)
n.removedDependencies = int32(noDependencyKeyCnt)
n.resolvedList = make([]int64, 0, n.totalDependencies)
for i := 0; i < int(n.totalDependencies); i++ {
n.resolvedList = append(n.resolvedList, unassigned)
}

for _, node := range dependencyNodes {
depend(node)
}
for i := 0; i < noDependencyKeyCnt; i++ {
depend(nil)
}

n.maybeReadyToRun()
n.maybeResolve(resolvedDependencies, removedDependencies)
}

// Remove implements interface internal.SlotNode.
// Remove will be called after related transaction is executed.
func (n *Node) Remove() {
n.mu.Lock()
defer n.mu.Unlock()
Expand All @@ -147,8 +179,8 @@ func (n *Node) Remove() {
if n.dependers != nil {
// `mu` must be holded during accessing dependers.
n.dependers.Ascend(func(node *Node) bool {
stdatomic.AddInt32(&node.removedDependencies, 1)
node.maybeReadyToRun()
removedDependencies := stdatomic.AddInt32(&node.removedDependencies, 1)
node.maybeResolve(0, removedDependencies)
return true
})
n.dependers.Clear(true)
Expand All @@ -167,7 +199,7 @@ func (n *Node) Free() {
}

n.id = invalidNodeID
n.SendToWorker = nil
n.OnResolved = nil
n.RandWorkerID = nil

// TODO: reuse node if necessary. Currently it's impossible if async-notify is used.
Expand All @@ -182,41 +214,96 @@ func (n *Node) assignTo(workerID int64) bool {
defer n.mu.Unlock()

if n.assignedTo != unassigned {
// Already handled by some other guys.
// Already resolved by some other guys.
return false
}

n.assignedTo = workerID
if n.SendToWorker != nil {
n.SendToWorker(workerID)
n.SendToWorker = nil
if n.OnResolved != nil {
n.OnResolved(workerID)
n.OnResolved = nil
}

if n.dependers != nil {
// `mu` must be holded during accessing dependers.
n.dependers.Ascend(func(node *Node) bool {
resolvedDependencies := stdatomic.AddInt32(&node.resolvedDependencies, 1)
stdatomic.StoreInt64(&node.resolvedList[resolvedDependencies-1], n.assignedTo)
node.maybeResolve(resolvedDependencies, 0)
return true
})
}

return true
}

func (n *Node) maybeReadyToRun() {
if ok := n.checkReadiness(); ok {
// Assign the node to the worker directly.
n.assignTo(n.RandWorkerID())
func (n *Node) maybeResolve(resolvedDependencies, removedDependencies int32) {
if workerNum, ok := n.tryResolve(resolvedDependencies, removedDependencies); ok {
if workerNum < 0 {
panic("Node.tryResolve must return a valid worker ID")
}
if n.OnNotified != nil {
// Notify the conflict detector background worker to assign the node to the worker asynchronously.
n.OnNotified(func() { n.assignTo(workerNum) })
} else {
// Assign the node to the worker directly.
n.assignTo(workerNum)
}
}
}

// tryResolve try to find a worker to assign the node to.
// Returns (_, false) if there is a conflict,
// returns (rand, true) if there is no conflict,
// returns (N, true) if only worker N can be used.
func (n *Node) tryResolve(resolvedDependencies, removedDependencies int32) (int64, bool) {
assignedTo, resolved := n.doResolve(resolvedDependencies, removedDependencies)
if resolved && assignedTo == assignedToAny {
assignedTo = n.RandWorkerID()
}
return assignedTo, resolved
}

// checkReadiness check if all dependencies have been removed.
// Returns false if there are some conflicts, returns true if all dependencies
// are removed.
func (n *Node) checkReadiness() bool {
func (n *Node) doResolve(resolvedDependencies, removedDependencies int32) (int64, bool) {
if n.totalDependencies == 0 {
// No conflicts, can select any workers.
return true
return assignedToAny, true
}

if resolvedDependencies == n.totalDependencies {
firstDep := stdatomic.LoadInt64(&n.resolvedList[0])
hasDiffDep := false
for i := 1; i < int(n.totalDependencies); i++ {
curr := stdatomic.LoadInt64(&n.resolvedList[i])
// // Todo: simplify assign to logic, only resolve dependencies nodes after
// // corresponding transactions are executed.
// //
// // In DependOn, depend(nil) set resolvedList[i] to assignedToAny
// // for these no dependecy keys.
// if curr == assignedToAny {
// continue
// }
if firstDep != curr {
hasDiffDep = true
break
}
}
if !hasDiffDep {
// If all dependency nodes are assigned to the same worker, we can assign
// this node to the same worker directly, and they will execute sequentially.
// On the other hand, if dependency nodes are assigned to different workers,
// This node has to wait all dependency txn executed and all depencecy nodes
// are removed.
return firstDep, true
}
}

removedDependencies := stdatomic.LoadInt32(&n.removedDependencies)
if removedDependencies > n.totalDependencies {
panic(fmt.Sprintf("removedDependencies %d > totalDependencies %d which is not expected",
removedDependencies, n.totalDependencies))
// All dependcies are removed, so assign the node to any worker is fine.
if removedDependencies == n.totalDependencies {
return assignedToAny, true
}
return removedDependencies == n.totalDependencies

return unassigned, false
}

func (n *Node) getOrCreateDependers() *btree.BTreeG[*Node] {
Expand Down
17 changes: 5 additions & 12 deletions pkg/causality/internal/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,14 @@ func TestNodeDependOn(t *testing.T) {
func TestNodeSingleDependency(t *testing.T) {
t.Parallel()

// Node B depends on A, without any other removed dependencies.
// Node B depends on A, without any other resolved dependencies.
nodeA := NewNode()
nodeB := NewNode()
nodeB.RandWorkerID = func() workerID { return 100 }
nodeB.DependOn(map[int64]*Node{nodeA.NodeID(): nodeA}, 0)
require.True(t, nodeA.assignTo(1))
require.Equal(t, workerID(1), nodeA.assignedWorkerID())
// Node B should be unassigned before Node A is removed.
require.Equal(t, unassigned, nodeB.assignedWorkerID())
nodeA.Remove()
// Node B should be assigned to random worker after Node A is removed.
require.Equal(t, workerID(100), nodeB.assignedWorkerID())
require.Equal(t, workerID(1), nodeB.assignedWorkerID())

// Node D depends on C, with some other resolved dependencies.
nodeC := NewNode()
Expand Down Expand Up @@ -128,14 +124,11 @@ func TestNodeResolveImmediately(t *testing.T) {
nodeD := NewNode()
nodeD.RandWorkerID = func() workerID { return workerID(100) }
nodeD.DependOn(map[int64]*Node{nodeB.NodeID(): nodeB, nodeC.NodeID(): nodeC}, 0)
// NodeD should be unassigned before Node B and C are removed.
require.Equal(t, unassigned, nodeD.assignedWorkerID())
nodeB.Remove()
nodeC.Remove()
// NodeD should be assigned to random worker after Node B and C are removed.
require.Equal(t, workerID(100), nodeD.assignedWorkerID())
require.Equal(t, workerID(1), nodeD.assignedWorkerID())

// Node E depends on B and C and some other resolved dependencies.
nodeB.Remove()
nodeC.Remove()
nodeE := NewNode()
nodeE.RandWorkerID = func() workerID { return workerID(100) }
nodeE.DependOn(map[int64]*Node{nodeB.NodeID(): nodeB, nodeC.NodeID(): nodeC}, 999)
Expand Down
2 changes: 1 addition & 1 deletion pkg/causality/internal/slots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestSlotsConcurrentOps(t *testing.T) {
freeNodeChan <- newNode()
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// test concurrent add and remove won't panic
Expand Down

0 comments on commit 41ee557

Please sign in to comment.