Skip to content

Commit

Permalink
feat: Add fail and complete state to Rx
Browse files Browse the repository at this point in the history
  • Loading branch information
Quenty committed Dec 15, 2024
1 parent 0a48559 commit 4bb843b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 28 deletions.
42 changes: 18 additions & 24 deletions src/rx/src/Shared/Rx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -242,34 +242,28 @@ function Rx.fromPromise(promise)
assert(Promise.isPromise(promise), "Bad promise")

return Observable.new(function(sub)
if promise:IsFulfilled() then
sub:Fire(promise:Wait())
sub:Complete()
return nil
-- Save a task.spawn call
if not promise:IsPending() then
local results = table.pack(promise:GetResults())

if results[1] then
sub:Fire(table.unpack(results, 2, results.n))
sub:Complete()
else
sub:Fail(table.unpack(results, 2, results.n))
end
end

local maid = Maid.new()
return task.spawn(function()
local results = table.pack(promise:Yield())

local pending = true
maid:GiveTask(function()
pending = false
if results[1] then
sub:Fire(table.unpack(results, 2, results.n))
sub:Complete()
else
sub:Fail(table.unpack(results, 2, results.n))
end
end)

promise:Then(
function(...)
if pending then
sub:Fire(...)
sub:Complete()
end
end,
function(...)
if pending then
sub:Fail(...)
sub:Complete()
end
end)

return maid
end)
end

Expand Down
11 changes: 7 additions & 4 deletions src/rx/src/Shared/Subscription.lua
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,17 @@ end

--[=[
Fails the subscription, preventing anything else from emitting.
@param ... any
]=]
function Subscription:Fail()
function Subscription:Fail(...)
if self._state ~= SubscriptionStateTypes.PENDING then
return
end

self._state = SubscriptionStateTypes.FAILED

if self._failCallback then
self._failCallback()
self._failCallback(...)
end

self:_doCleanup()
Expand Down Expand Up @@ -157,15 +158,17 @@ end
--[=[
Completes the subscription, preventing anything else from being
emitted.
@param ... any
]=]
function Subscription:Complete()
function Subscription:Complete(...)
if self._state ~= SubscriptionStateTypes.PENDING then
return
end

self._state = SubscriptionStateTypes.COMPLETE
if self._completeCallback then
self._completeCallback()
self._completeCallback(...)
end

self:_doCleanup()
Expand Down

0 comments on commit 4bb843b

Please sign in to comment.