Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): Revert changes related to the conflict detector (#10897) #10986

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions pkg/causality/conflict_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct {
// nextWorkerID is used to dispatch transactions round-robin.
nextWorkerID atomic.Int64

// Used to run a background goroutine to GC.
garbageNodes *chann.DrainableChann[txnFinishedEvent]
wg sync.WaitGroup
closeCh chan struct{}
// Used to run a background goroutine to GC or notify nodes.
notifiedNodes *chann.DrainableChann[func()]
garbageNodes *chann.DrainableChann[txnFinishedEvent]
wg sync.WaitGroup
closeCh chan struct{}
}

type txnFinishedEvent struct {
Expand All @@ -53,11 +54,12 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent](
numSlots uint64,
) *ConflictDetector[Worker, Txn] {
ret := &ConflictDetector[Worker, Txn]{
workers: workers,
slots: internal.NewSlots[*internal.Node](numSlots),
numSlots: numSlots,
garbageNodes: chann.NewAutoDrainChann[txnFinishedEvent](),
closeCh: make(chan struct{}),
workers: workers,
slots: internal.NewSlots[*internal.Node](numSlots),
numSlots: numSlots,
notifiedNodes: chann.NewAutoDrainChann[func()](),
garbageNodes: chann.NewAutoDrainChann[txnFinishedEvent](),
closeCh: make(chan struct{}),
}

ret.wg.Add(1)
Expand All @@ -76,23 +78,23 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent](
func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) {
sortedKeysHash := txn.GenSortedDedupKeysHash(d.numSlots)
node := internal.NewNode()
// SendToWorker is called after all dependencies are removed.
node.SendToWorker = func(workerID int64) {
node.OnResolved = func(workerID int64) {
// This callback is called after the transaction is executed.
postTxnExecuted := func() {
// After this transaction is executed, we can remove the node from the graph,
// and remove related dependencies for these transacitons which depend on this
// and resolve related dependencies for these transacitons which depend on this
// executed transaction.
node.Remove()

// Send this node to garbageNodes to GC it from the slots if this node is still
// occupied related slots.
d.garbageNodes.In() <- txnFinishedEvent{node, sortedKeysHash}
}
// Send this txn to related worker as soon as all dependencies are removed.
// Send this txn to related worker as soon as all dependencies are resolved.
d.sendToWorker(txn, postTxnExecuted, workerID)
}
node.RandWorkerID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.workers)) }
node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback }
d.slots.Add(node, sortedKeysHash)
}

Expand All @@ -104,12 +106,17 @@ func (d *ConflictDetector[Worker, Txn]) Close() {

func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() {
defer func() {
d.notifiedNodes.CloseAndDrain()
d.garbageNodes.CloseAndDrain()
}()
for {
select {
case <-d.closeCh:
return
case notifyCallback := <-d.notifiedNodes.Out():
if notifyCallback != nil {
notifyCallback()
}
case event := <-d.garbageNodes.Out():
if event.node != nil {
d.slots.Free(event.node, event.sortedKeysHash)
Expand Down
175 changes: 125 additions & 50 deletions pkg/causality/internal/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@
package internal

import (
"fmt"
"sync"
stdatomic "sync/atomic"
"time"

"github.com/google/btree"
"github.com/pingcap/failpoint"
"go.uber.org/atomic"
)

Expand All @@ -30,6 +27,7 @@

const (
unassigned = workerID(-2)
assignedToAny = workerID(-1)
invalidNodeID = int64(-1)
)

Expand All @@ -50,14 +48,18 @@
// Immutable fields.
id int64

// SendToWorker is used to send the node to a worker.
SendToWorker func(id workerID)
// RandWorkerID is used to select a worker randomly.
// Called when all dependencies are resolved.
OnResolved func(id workerID)
// Set the id generator to get a random ID.
RandWorkerID func() workerID
// Set the callback that the node is notified.
OnNotified func(callback func())

// Following fields are used for notifying a node's dependers lock-free.
totalDependencies int32
removedDependencies int32
totalDependencies int32
resolvedDependencies int32
removedDependencies int32
resolvedList []int64

// Following fields are protected by `mu`.
mu sync.Mutex
Expand All @@ -83,10 +85,12 @@
func NewNode() (ret *Node) {
defer func() {
ret.id = genNextNodeID()
ret.SendToWorker = nil
ret.OnResolved = nil
ret.RandWorkerID = nil
ret.totalDependencies = 0
ret.resolvedDependencies = 0
ret.removedDependencies = 0
ret.resolvedList = nil
ret.assignedTo = unassigned
ret.removed = false
}()
Expand All @@ -102,20 +106,42 @@

// DependOn implements interface internal.SlotNode.
func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) {
resolvedDependencies, removedDependencies := int32(0), int32(0)

depend := func(target *Node) {
if target == nil {
// For a given Node, every dependency corresponds to a target.
// If target is nil it means the dependency doesn't conflict
// with any other nodes. However it's still necessary to track
// it because Node.tryResolve needs to counting the number of
// resolved dependencies.
resolvedDependencies = stdatomic.AddInt32(&n.resolvedDependencies, 1)
stdatomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], assignedToAny)
removedDependencies = stdatomic.AddInt32(&n.removedDependencies, 1)
return
}

if target.id == n.id {
panic("cannot depend on yourself")
panic("you cannot depend on yourself")
}

// The target node might be removed or modified in other places, for example
// after its corresponding transaction has been executed.
target.mu.Lock()
defer target.mu.Unlock()

// Add the node to the target's dependers if the target is not removed.
if target.assignedTo != unassigned {
// The target has already been assigned to a worker.
// In this case, record the worker ID in `resolvedList`, and this node
// probably can be sent to the same worker and executed sequentially.
resolvedDependencies = stdatomic.AddInt32(&n.resolvedDependencies, 1)
stdatomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], target.assignedTo)
}

// Add the node to the target's dependers if the target has not been removed.
if target.removed {
// The target has already been removed.
stdatomic.AddInt32(&n.removedDependencies, 1)
removedDependencies = stdatomic.AddInt32(&n.removedDependencies, 1)
} else if _, exist := target.getOrCreateDependers().ReplaceOrInsert(n); exist {
// Should never depend on a target redundantly.
panic("should never exist")
Expand All @@ -127,19 +153,24 @@
// ?: why gen new ID here?
n.id = genNextNodeID()

// `totalDependcies` must be initialized before depending on any targets.
// `totalDependcies` and `resolvedList` must be initialized before depending on any targets.
n.totalDependencies = int32(len(dependencyNodes) + noDependencyKeyCnt)
n.removedDependencies = int32(noDependencyKeyCnt)
n.resolvedList = make([]int64, 0, n.totalDependencies)
for i := 0; i < int(n.totalDependencies); i++ {
n.resolvedList = append(n.resolvedList, unassigned)
}

for _, node := range dependencyNodes {
depend(node)
}
for i := 0; i < noDependencyKeyCnt; i++ {
depend(nil)
}

n.maybeReadyToRun()
n.maybeResolve(resolvedDependencies, removedDependencies)
}

// Remove implements interface internal.SlotNode.
// Remove will be called after related transaction is executed.
func (n *Node) Remove() {
n.mu.Lock()
defer n.mu.Unlock()
Expand All @@ -148,12 +179,8 @@
if n.dependers != nil {
// `mu` must be holded during accessing dependers.
n.dependers.Ascend(func(node *Node) bool {
stdatomic.AddInt32(&node.removedDependencies, 1)
// use to simulate call A's maybeReadyToRun after node A may be removed
failpoint.Inject("SleepBeforeCallmaybeReadyToRun", func() {
time.Sleep(time.Millisecond * 10)
})
node.maybeReadyToRun()
removedDependencies := stdatomic.AddInt32(&node.removedDependencies, 1)
node.maybeResolve(0, removedDependencies)
return true
})
n.dependers.Clear(true)
Expand All @@ -172,7 +199,7 @@
}

n.id = invalidNodeID
n.SendToWorker = nil
n.OnResolved = nil
n.RandWorkerID = nil

// TODO: reuse node if necessary. Currently it's impossible if async-notify is used.
Expand All @@ -181,54 +208,102 @@
// or not.
}

// assigns a node to a worker. Returns `true` on success.
func (n *Node) assign() bool {
// assignTo assigns a node to a worker. Returns `true` on success.
func (n *Node) assignTo(workerID int64) bool {
n.mu.Lock()
defer n.mu.Unlock()

if n.assignedTo != unassigned {
// Already handled by some other guys.
// Already resolved by some other guys.
return false
}

workerID := n.RandWorkerID()

n.assignedTo = workerID
if n.SendToWorker != nil {
n.SendToWorker(workerID)
n.SendToWorker = nil
if n.OnResolved != nil {
n.OnResolved(workerID)
n.OnResolved = nil
}

Check warning on line 225 in pkg/causality/internal/node.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/internal/node.go#L223-L225

Added lines #L223 - L225 were not covered by tests

if n.dependers != nil {
// `mu` must be holded during accessing dependers.
n.dependers.Ascend(func(node *Node) bool {
resolvedDependencies := stdatomic.AddInt32(&node.resolvedDependencies, 1)
stdatomic.StoreInt64(&node.resolvedList[resolvedDependencies-1], n.assignedTo)
node.maybeResolve(resolvedDependencies, 0)
return true
})
}

return true
}

// Please attention that maybeReadyToRun maybe called after the node is removed.
// Consider the following scenario:
// A only depends B, and B call B's remove first, and reduce A's removedDependencies to 0
// Then A just call A's maybeReadyToRun, and assign to a worker and remove itself
// Simultaneously, B call A's maybeReadyToRun.
// Thus maybeReadyToRun maybe called after the node is removed.
func (n *Node) maybeReadyToRun() {
if ok := n.checkReadiness(); ok {
n.assign()
func (n *Node) maybeResolve(resolvedDependencies, removedDependencies int32) {
if workerNum, ok := n.tryResolve(resolvedDependencies, removedDependencies); ok {
if workerNum < 0 {
panic("Node.tryResolve must return a valid worker ID")

Check warning on line 243 in pkg/causality/internal/node.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/internal/node.go#L243

Added line #L243 was not covered by tests
}
if n.OnNotified != nil {
// Notify the conflict detector background worker to assign the node to the worker asynchronously.
n.OnNotified(func() { n.assignTo(workerNum) })

Check warning on line 247 in pkg/causality/internal/node.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/internal/node.go#L246-L247

Added lines #L246 - L247 were not covered by tests
} else {
// Assign the node to the worker directly.
n.assignTo(workerNum)
}
}
}

// tryResolve try to find a worker to assign the node to.
// Returns (_, false) if there is a conflict,
// returns (rand, true) if there is no conflict,
// returns (N, true) if only worker N can be used.
func (n *Node) tryResolve(resolvedDependencies, removedDependencies int32) (int64, bool) {
assignedTo, resolved := n.doResolve(resolvedDependencies, removedDependencies)
if resolved && assignedTo == assignedToAny {
assignedTo = n.RandWorkerID()
}
return assignedTo, resolved
}

// checkReadiness check if all dependencies have been removed.
// Returns false if there are some conflicts, returns true if all dependencies
// are removed.
func (n *Node) checkReadiness() bool {
func (n *Node) doResolve(resolvedDependencies, removedDependencies int32) (int64, bool) {
if n.totalDependencies == 0 {
// No conflicts, can select any workers.
return true
return assignedToAny, true
}

Check warning on line 271 in pkg/causality/internal/node.go

View check run for this annotation

Codecov / codecov/patch

pkg/causality/internal/node.go#L270-L271

Added lines #L270 - L271 were not covered by tests

if resolvedDependencies == n.totalDependencies {
firstDep := stdatomic.LoadInt64(&n.resolvedList[0])
hasDiffDep := false
for i := 1; i < int(n.totalDependencies); i++ {
curr := stdatomic.LoadInt64(&n.resolvedList[i])
// // Todo: simplify assign to logic, only resolve dependencies nodes after
// // corresponding transactions are executed.
// //
// // In DependOn, depend(nil) set resolvedList[i] to assignedToAny
// // for these no dependecy keys.
// if curr == assignedToAny {
// continue
// }
if firstDep != curr {
hasDiffDep = true
break
}
}
if !hasDiffDep {
// If all dependency nodes are assigned to the same worker, we can assign
// this node to the same worker directly, and they will execute sequentially.
// On the other hand, if dependency nodes are assigned to different workers,
// This node has to wait all dependency txn executed and all depencecy nodes
// are removed.
return firstDep, true
}
}

removedDependencies := stdatomic.LoadInt32(&n.removedDependencies)
if removedDependencies > n.totalDependencies {
panic(fmt.Sprintf("removedDependencies %d > totalDependencies %d which is not expected",
removedDependencies, n.totalDependencies))
// All dependcies are removed, so assign the node to any worker is fine.
if removedDependencies == n.totalDependencies {
return assignedToAny, true
}
return removedDependencies == n.totalDependencies

return unassigned, false
}

func (n *Node) getOrCreateDependers() *btree.BTreeG[*Node] {
Expand Down
Loading
Loading