Skip to content

Commit

Permalink
Use proper timers in sleepAsync.
Browse files Browse the repository at this point in the history
Fixes #7886.

Fixes #7758 (remember that `poll` waits for 500ms, change the
             timeout to something like 5s and you'll get
             1 poll call for that code)

Fixes #6929.

Fixes #3909.
  • Loading branch information
dom96 committed Jul 5, 2018
1 parent a1457bf commit 29bf869
Showing 1 changed file with 12 additions and 37 deletions.
49 changes: 12 additions & 37 deletions lib/pure/asyncdispatch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -165,35 +165,14 @@ export asyncfutures, asyncstreams

type
PDispatcherBase = ref object of RootRef
timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
callbacks*: Deque[proc ()]

proc processTimers(p: PDispatcherBase; didSomeWork: var bool) {.inline.} =
#Process just part if timers at a step
var count = p.timers.len
let t = epochTime()
while count > 0 and t >= p.timers[0].finishAt:
p.timers.pop().fut.complete()
dec count
didSomeWork = true

proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
while p.callbacks.len > 0:
var cb = p.callbacks.popFirst()
cb()
didSomeWork = true

proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
# If dispatcher has active timers this proc returns the timeout
# of the nearest timer. Returns `timeout` otherwise.
result = timeout
if p.timers.len > 0:
let timerTimeout = p.timers[0].finishAt
let curTime = epochTime()
if timeout == -1 or (curTime + (timeout / 1000)) > timerTimeout:
result = int((timerTimeout - curTime) * 1000)
if result < 0: result = 0

proc callSoon(cbproc: proc ()) {.gcsafe.}

proc initCallSoonProc =
Expand Down Expand Up @@ -249,7 +228,6 @@ when defined(windows) or defined(nimdoc):
new result
result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
result.handles = initSet[AsyncFD]()
result.timers.newHeapQueue()
result.callbacks = initDeque[proc ()](64)

var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
Expand Down Expand Up @@ -290,20 +268,19 @@ when defined(windows) or defined(nimdoc):
proc hasPendingOperations*(): bool =
## Returns `true` if the global dispatcher has pending operations.
let p = getGlobalDispatcher()
p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
p.handles.len != 0 or p.callbacks.len != 0

proc runOnce(timeout = 500): bool =
let p = getGlobalDispatcher()
if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
if p.handles.len == 0 and p.callbacks.len == 0:
raise newException(ValueError,
"No handles or timers registered in dispatcher.")

result = false
if p.handles.len != 0:
let at = p.adjustedTimeout(timeout)
var llTimeout =
if at == -1: winlean.INFINITE
else: at.int32
if timeout == -1: winlean.INFINITE
else: timeout.int32

var lpNumberOfBytesTransferred: Dword
var lpCompletionKey: ULONG_PTR
Expand Down Expand Up @@ -344,8 +321,6 @@ when defined(windows) or defined(nimdoc):
result = false
else: raiseOSError(errCode)

# Timer processing.
processTimers(p, result)
# Callback queue processing
processPendingCallbacks(p, result)

Expand Down Expand Up @@ -1088,7 +1063,6 @@ else:
proc newDispatcher*(): PDispatcher =
new result
result.selector = newSelector[AsyncData]()
result.timers.newHeapQueue()
result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize)

var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
Expand Down Expand Up @@ -1150,7 +1124,7 @@ else:

proc hasPendingOperations*(): bool =
let p = getGlobalDispatcher()
not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
not p.selector.isEmpty() or p.callbacks.len != 0

template processBasicCallbacks(ident, rwlist: untyped) =
# Process pending descriptor and AsyncEvent callbacks.
Expand Down Expand Up @@ -1226,14 +1200,14 @@ else:
let customSet = {Event.Timer, Event.Signal, Event.Process,
Event.Vnode}

if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
if p.selector.isEmpty() and p.callbacks.len == 0:
raise newException(ValueError,
"No handles or timers registered in dispatcher.")

result = false
if not p.selector.isEmpty():
var keys: array[64, ReadyKey]
var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys)
var count = p.selector.selectInto(timeout, keys)
for i in 0..<count:
var custom = false
let fd = keys[i].fd
Expand Down Expand Up @@ -1271,8 +1245,6 @@ else:
if wLength > 0: incl(newEvents, Event.Write)
p.selector.updateHandle(SocketHandle(fd), newEvents)

# Timer processing.
processTimers(p, result)
# Callback queue processing
processPendingCallbacks(p, result)

Expand Down Expand Up @@ -1517,8 +1489,11 @@ proc sleepAsync*(ms: int | float): Future[void] =
## Suspends the execution of the current async procedure for the next
## ``ms`` milliseconds.
var retFuture = newFuture[void]("sleepAsync")
let p = getGlobalDispatcher()
p.timers.push((epochTime() + (ms / 1000), retFuture))
addTimer(
ms,
oneshot=true,
proc (fd: AsyncFD): bool = retFuture.complete()
)
return retFuture

proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] =
Expand Down

0 comments on commit 29bf869

Please sign in to comment.