Skip to content

Commit

Permalink
Handle Deadlocks.
Browse files Browse the repository at this point in the history
  • Loading branch information
aoli-al committed Mar 28, 2024
1 parent f417063 commit 639822f
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 35 deletions.
60 changes: 38 additions & 22 deletions core/src/main/kotlin/cmu/pasta/sfuzz/core/GlobalContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ object GlobalContext {
for (thread in registeredThreads.values) {
if (thread.state == ThreadState.Paused) {
thread.state = ThreadState.Enabled
lockManager.threadUnblockedDueToDeadlock(thread.thread)
break
}
}
Expand All @@ -96,9 +97,9 @@ object GlobalContext {
fun done(result: AnalysisResult) {
loggers.forEach { it.executionDone(result) }

lockManager.done()
assert(lockManager.waitingThreads.isEmpty())
assert(syncManager.synchronizationPoints.isEmpty())
lockManager.done()
registeredThreads.clear()
scheduler.done()
}
Expand Down Expand Up @@ -138,7 +139,7 @@ object GlobalContext {
val t = Thread.currentThread()
if (!registeredThreads[t.id]!!.unparkSignaled) {
registeredThreads[t.id]?.pendingOperation = PausedOperation()
registeredThreads[t.id]?.state = ThreadState.Parked
registeredThreads[t.id]?.state = ThreadState.Paused
scheduleNextOperation(false)
} else {
registeredThreads[t.id]!!.unparkSignaled = false
Expand All @@ -153,14 +154,14 @@ object GlobalContext {
if (context.state == ThreadState.Running) {
return
}
assert(context.state == ThreadState.Parked)
assert(context.state == ThreadState.Paused)
syncManager.signal(t)
context.block()
}

fun threadUnpark(t: Thread) {
val context = registeredThreads[t.id]!!
if (context.state != ThreadState.Parked) {
if (context.state != ThreadState.Paused) {
context.unparkSignaled = true
} else {
syncManager.createWait(t, 1)
Expand All @@ -170,7 +171,7 @@ object GlobalContext {
}

fun threadUnparkDone(t: Thread) {
if (registeredThreads[t.id]!!.state == ThreadState.Parked) {
if (registeredThreads[t.id]!!.state == ThreadState.Paused) {
// SFuzz only needs to wait if `t` is parked and then
// waken up by this `unpark` operation.
syncManager.wait(t)
Expand All @@ -193,6 +194,7 @@ object GlobalContext {
}
objectNotifyAll(t)
registeredThreads[t.id]?.state = ThreadState.Completed
lockManager.threadUnblockedDueToDeadlock(t)
// We do not want to send notify all because
// we don't have monitor lock here.
var size = 0
Expand All @@ -216,20 +218,7 @@ object GlobalContext {
unlockImpl(t, t.id, false, false, true)
syncManager.synchronizationPoints.remove(System.identityHashCode(t))
}
try {
scheduleNextOperation(false)
} catch (e: TargetTerminateException) {
// If deadlock detected let's try to unblock one thread and continue.
if (e.status == -1) {
for (thread in registeredThreads.values) {
if (thread.state == ThreadState.Paused) {
thread.state = ThreadState.Running
thread.unblock()
break
}
}
}
}
scheduleNextOperationAndCheckDeadlock(false)
}
}

Expand All @@ -251,9 +240,10 @@ object GlobalContext {
context.pendingOperation = ThreadResumeOperation()
context.state = ThreadState.Enabled
} else {
lockManager.addWaitingThread(waitingObject, Thread.currentThread())
context.pendingOperation = PausedOperation()
context.state = ThreadState.Paused
checkDeadLock()
lockManager.addWaitingThread(waitingObject, Thread.currentThread())
}
unlockImpl(lockObject, t, true, true, lockObject == waitingObject)

Expand All @@ -266,7 +256,7 @@ object GlobalContext {
Thread.yield()
}
lockUnlockDone(lockObject)
scheduleNextOperation(false)
scheduleNextOperationAndCheckDeadlock(false)
}
}

Expand Down Expand Up @@ -296,6 +286,7 @@ object GlobalContext {
// We want to also catch interrupt exception here.
}
}
lockManager.threadWaitsFor.remove(t.id)
// If a thread is enabled, the lock must be available.
assert(lockManager.lock(lockObject, t.id, false, true))
context.checkInterrupt()
Expand Down Expand Up @@ -551,11 +542,12 @@ object GlobalContext {
registeredThreads[t]?.pendingOperation = PausedOperation()
registeredThreads[t]?.state = ThreadState.Paused
}
checkDeadLock()
executor.submit {
while (registeredThreads[t]!!.thread.state == Thread.State.RUNNABLE) {
Thread.yield()
}
scheduleNextOperation(false)
scheduleNextOperationAndCheckDeadlock(false)
}
}

Expand Down Expand Up @@ -587,6 +579,30 @@ object GlobalContext {
}
}

fun scheduleNextOperationAndCheckDeadlock(shouldBlockCurrentThread: Boolean) {
try {
scheduleNextOperation(shouldBlockCurrentThread)
} catch (e: TargetTerminateException) {
for (thread in registeredThreads.values) {
if (thread.state == ThreadState.Paused) {
thread.state = ThreadState.Enabled
lockManager.threadUnblockedDueToDeadlock(thread.thread)
scheduleNextOperation(shouldBlockCurrentThread)
break
}
}
}
}

fun checkDeadLock() {
val deadLock = registeredThreads.values.toList().none { it.schedulable() }
if (deadLock) {
registeredThreads[Thread.currentThread().id]!!.state = ThreadState.Enabled
lockManager.threadUnblockedDueToDeadlock(Thread.currentThread())
throw TargetTerminateException(-1)
}
}

fun scheduleNextOperation(shouldBlockCurrentThread: Boolean) {
// Our current design makes sure that reschedule is only called
// by scheduled thread.
Expand Down
32 changes: 23 additions & 9 deletions core/src/main/kotlin/cmu/pasta/sfuzz/core/RuntimeDelegate.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,17 @@ class RuntimeDelegate : Delegate() {
}

override fun onThreadStart(t: Thread) {
if (checkEntered()) return
if (checkEntered()) {
skipFunctionEntered.set(1 + skipFunctionEntered.get())
return
}
GlobalContext.threadStart(t)
skipFunctionEntered.set(1 + skipFunctionEntered.get())
entered.set(false)
}

override fun onThreadStartDone(t: Thread) {
skipFunctionEntered.set(skipFunctionEntered.get() - 1)
if (checkEntered()) return
GlobalContext.threadStartDone(t)
entered.set(false)
Expand All @@ -68,8 +73,11 @@ class RuntimeDelegate : Delegate() {

override fun onObjectWait(o: Any) {
if (checkEntered()) return
GlobalContext.objectWait(o)
entered.set(false)
try {
GlobalContext.objectWait(o)
} finally {
entered.set(false)
}
}

override fun onObjectWaitDone(o: Any) {
Expand Down Expand Up @@ -166,9 +174,12 @@ class RuntimeDelegate : Delegate() {
skipFunctionEntered.set(1 + skipFunctionEntered.get())
return
}
GlobalContext.conditionAwait(o)
entered.set(false)
skipFunctionEntered.set(1 + skipFunctionEntered.get())
try {
GlobalContext.conditionAwait(o)
} finally {
entered.set(false)
skipFunctionEntered.set(1 + skipFunctionEntered.get())
}
}

override fun onConditionAwaitDone(o: Condition) {
Expand Down Expand Up @@ -353,9 +364,12 @@ class RuntimeDelegate : Delegate() {
skipFunctionEntered.set(1 + skipFunctionEntered.get())
return
}
GlobalContext.latchAwait(latch)
entered.set(false)
skipFunctionEntered.set(skipFunctionEntered.get() + 1)
try {
GlobalContext.latchAwait(latch)
} finally {
entered.set(false)
skipFunctionEntered.set(skipFunctionEntered.get() + 1)
}
}

override fun onLatchAwaitDone(latch: CountDownLatch) {
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/kotlin/cmu/pasta/sfuzz/core/ThreadContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@ enum class ThreadState {
Enabled,
Running,
Paused,
Parked,
Completed,
// Thread is started but not yet available.
STARTED,
}

class ThreadContext(val thread: Thread, val index: Int) {
var state = ThreadState.STARTED
var state = ThreadState.Paused
var unparkSignaled = false
var interruptSignaled = false

Expand All @@ -31,6 +28,8 @@ class ThreadContext(val thread: Thread, val index: Int) {
sync.block()
}

fun schedulable() = state == ThreadState.Enabled || state == ThreadState.Running

fun unblock() {
if (sync.isBlocked) {
sync.unblock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,19 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock
class LockManager {
val lockContextManager = ReferencedContextManager<LockContext> { ReentrantLockContext() }
val waitingThreads = mutableMapOf<Int, MutableList<Long>>()
val threadWaitsFor = mutableMapOf<Long, Int>()
val conditionToLock = mutableMapOf<Condition, Lock>()
val lockToConditions = mutableMapOf<Lock, MutableList<Condition>>()

fun threadUnblockedDueToDeadlock(t: Thread) {
val id = threadWaitsFor[t.id] ?: return
waitingThreads[id]?.remove(t.id)
if (waitingThreads[id]?.isEmpty() == true) {
waitingThreads.remove(id)
}
threadWaitsFor.remove(t.id)
}

fun getLockContext(lock: Any): LockContext {
return lockContextManager.getLockContext(lock)
}
Expand All @@ -38,13 +48,16 @@ class LockManager {
if (id !in waitingThreads) {
waitingThreads[id] = mutableListOf()
}
assert(t.id !in waitingThreads[id]!!)
waitingThreads[id]!!.add(t.id)
threadWaitsFor[t.id] = id
}

// TODO(aoli): can we merge this logic with `objectNotifyImply`?
fun threadInterruptDuringObjectWait(waitingObject: Any, lockObject: Any, context: ThreadContext) {
val id = System.identityHashCode(waitingObject)
val lockContext = getLockContext(lockObject)
threadWaitsFor.remove(context.thread.id)
waitingThreads[id]?.remove(context.thread.id)
if (waitingThreads[id]?.isEmpty() == true) {
waitingThreads.remove(id)
Expand Down Expand Up @@ -83,6 +96,7 @@ class LockManager {

fun done() {
assert(waitingThreads.isEmpty())
assert(threadWaitsFor.isEmpty())
conditionToLock.clear()
lockToConditions.clear()
lockContextManager.done()
Expand Down

0 comments on commit 639822f

Please sign in to comment.