Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'firstCompleted' and 'firstCompetedFuture' to 'asyncfutures2' #339

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 105 additions & 12 deletions chronos/asyncfutures2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {.
## Returns a future which will complete once both ``fut1`` and ``fut2``
## complete.
##
## If cancelled, ``fut1`` and ``fut2`` futures WILL NOT BE cancelled.
## On cancellation, ``fut1`` and ``fut2`` futures WILL NOT BE cancelled.
var retFuture = newFuture[void]("chronos.`and`")
proc cb(data: pointer) =
if not(retFuture.finished()):
Expand Down Expand Up @@ -648,7 +648,7 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
## is failed, the result future will also be failed, if ``fut1`` future is
## completed, the result future will also be completed.
##
## If cancelled, ``fut1`` and ``fut2`` futures WILL NOT BE cancelled.
## On cancellation, ``fut1`` and ``fut2`` futures WILL NOT BE cancelled.
var retFuture = newFuture[void]("chronos.or")
var cb: proc(udata: pointer) {.gcsafe, raises: [Defect].}
cb = proc(udata: pointer) {.gcsafe, raises: [Defect].} =
Expand Down Expand Up @@ -854,11 +854,11 @@ proc cancelAndWait*[T](fut: Future[T]): Future[void] =

proc allFutures*(futs: varargs[FutureBase]): Future[void] =
## Returns a future which will complete only when all futures in ``futs``
## will be completed, failed or canceled.
## are completed, failed or cancelled.
##
## If the argument is empty, the returned future COMPLETES immediately.
##
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
## On cancellation, all the awaited futures ``futs`` WILL NOT BE cancelled.
var retFuture = newFuture[void]("chronos.allFutures()")
let totalFutures = len(futs)
var completedFutures = 0
Expand Down Expand Up @@ -892,11 +892,11 @@ proc allFutures*(futs: varargs[FutureBase]): Future[void] =

proc allFutures*[T](futs: varargs[Future[T]]): Future[void] =
## Returns a future which will complete only when all futures in ``futs``
## will be completed, failed or canceled.
## are completed, failed or cancelled.
##
## If the argument is empty, the returned future COMPLETES immediately.
##
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
## On cancellation, all the awaited futures ``futs`` WILL NOT BE cancelled.
# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts: seq[FutureBase]
for future in futs:
Expand All @@ -905,14 +905,14 @@ proc allFutures*[T](futs: varargs[Future[T]]): Future[void] =

proc allFinished*[T](futs: varargs[Future[T]]): Future[seq[Future[T]]] =
## Returns a future which will complete only when all futures in ``futs``
## will be completed, failed or canceled.
## will be completed, failed or cancelled.
##
## Returned sequence will hold all the Future[T] objects passed to
## ``allCompleted`` with the order preserved.
##
## If the argument is empty, the returned future COMPLETES immediately.
##
## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled.
## On cancellation, all the awaited futures ``futs`` WILL NOT BE cancelled.
var retFuture = newFuture[seq[Future[T]]]("chronos.allFinished()")
let totalFutures = len(futs)
var completedFutures = 0
Expand Down Expand Up @@ -945,13 +945,13 @@ proc allFinished*[T](futs: varargs[Future[T]]): Future[seq[Future[T]]] =

proc one*[T](futs: varargs[Future[T]]): Future[Future[T]] =
## Returns a future which will complete and return completed Future[T] inside,
## when one of the futures in ``futs`` will be completed, failed or canceled.
## when one of the futures in ``futs`` will be completed, failed or cancelled.
##
## If the argument is empty, the returned future FAILS immediately.
##
## On success returned Future will hold finished Future[T].
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
## On cancellation, futures in ``futs`` WILL NOT BE cancelled.
var retFuture = newFuture[Future[T]]("chronos.one()")

# Because we can't capture varargs[T] in closures we need to create copy.
Expand Down Expand Up @@ -990,15 +990,108 @@ proc one*[T](futs: varargs[Future[T]]): Future[Future[T]] =
retFuture.cancelCallback = cancellation
return retFuture

proc firstCompletedFuture*(futs: varargs[FutureBase]): Future[FutureBase] =
## Returns a future which will complete and return completed FutureBase,
## when one of the futures in ``futs`` is completed.
##
## If the argument is empty, the returned future FAILS immediately.
##
## On success, the returned Future will hold the completed FutureBase.
##
## If all futures fail naturally or due to cancellation, the returned
## future will be failed as well.
##
## On cancellation, futures in ``futs`` WILL NOT BE cancelled.

var retFuture = newFuture[FutureBase]("chronos.firstCompletedFuture()")

# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts = @futs

# If one of the Future[T] already finished we return it as result
for fut in nfuts:
if fut.completed():
retFuture.complete(fut)
return retFuture

if len(nfuts) == 0:
retFuture.fail(newException(ValueError, "Empty Future[T] list"))
return

var failedFutures = 0

var cb: proc(udata: pointer) {.gcsafe, raises: [Defect].}
cb = proc(udata: pointer) {.gcsafe, raises: [Defect].} =
if not(retFuture.finished()):
var res: FutureBase
var rfut = cast[FutureBase](udata)
if rfut.completed:
for i in 0..<len(nfuts):
if nfuts[i] != rfut:
nfuts[i].removeCallback(cb)
else:
res = nfuts[i]
retFuture.complete(res)
else:
inc failedFutures
if failedFutures == nfuts.len:
retFuture.fail(newException(CatchableError,
"None of the operations completed successfully"))

proc cancellation(udata: pointer) =
# On cancel we remove all our callbacks only.
for i in 0..<len(nfuts):
if not(nfuts[i].finished()):
nfuts[i].removeCallback(cb)

for fut in nfuts:
fut.addCallback(cb, cast[pointer](fut))

retFuture.cancelCallback = cancellation
return retFuture

proc firstCompleted*[T](futs: varargs[Future[T]]): Future[T] =
## On success, the returned Future will hold the result of the first
## completed imput Future.
##
## If the varargs list is empty, the returned future FAILS immediately.
##
## If all futures fail naturally or due to cancellation, the returned
## future will be failed as well.
##
## On cancellation, futures in ``futs`` WILL NOT BE cancelled.

let subFuture = firstCompletedFuture(futs)
if subFuture.completed:
return Future[T](subFuture.read)

var retFuture = newFuture[T]("chronos.firstCompleted()")

if subFuture.finished: # It must be failed ot cancelled
retFuture.fail(subFuture.error)
return retFuture

proc cb(udata: pointer) {.gcsafe, raises: [Defect].} =
let subFuture = cast[Future[FutureBase]](udata)
if subFuture.completed:
retFuture.complete(Future[T](subFuture.read).read)
else:
retFuture.fail(subFuture.error)

subFuture.addCallback(cb, cast[pointer](subFuture))

retFuture.cancelCallback = proc (udata: pointer) =
subFuture.cancel()

proc race*(futs: varargs[FutureBase]): Future[FutureBase] =
## Returns a future which will complete and return completed FutureBase,
## when one of the futures in ``futs`` will be completed, failed or canceled.
## when one of the futures in ``futs`` is completed, failed or cancelled.
##
## If the argument is empty, the returned future FAILS immediately.
##
## On success returned Future will hold finished FutureBase.
##
## On cancel futures in ``futs`` WILL NOT BE cancelled.
## On cancellation, futures in ``futs`` WILL NOT BE cancelled.
var retFuture = newFuture[FutureBase]("chronos.race()")

# Because we can't capture varargs[T] in closures we need to create copy.
Expand Down