Skip to content

Commit

Permalink
add WaitGroup type
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Nov 8, 2023
1 parent c79d4ec commit ca9010b
Show file tree
Hide file tree
Showing 40 changed files with 360 additions and 476 deletions.
87 changes: 51 additions & 36 deletions blocking.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package rx

import (
"context"

"github.com/b97tsk/rx/internal/waitgroup"
)
import "context"

// BlockingFirst subscribes to the source Observable, and returns
// the first value emitted by the source.
Expand All @@ -17,18 +13,22 @@ import (
//
// Like any other Blocking methods, this method waits for every goroutine
// started during subscription to complete before returning.
// To have this works properly, one must use [Go] function rather than
// go statements to start new goroutines during subscription, especially
// when one need to subscribe to other Observables in a goroutine (otherwise,
// their program might panic randomly).
// To have this work properly, Observables must use [WaitGroupFromContext]
// to obtain a WaitGroup and use [WaitGroup.Go] rather than built-in go
// statements to start new goroutines during subscription, especially when
// they need to subscribe to other Observables in a goroutine; otherwise,
// runtime panicking might happen randomly (WaitGroup misuse).
func (obs Observable[T]) BlockingFirst(ctx context.Context) (v T, err error) {
child, cancel := context.WithCancel(ctx)
child, wg := waitgroup.Install(child)
var wg WaitGroup

child, cancel := context.WithCancel(WithWaitGroup(ctx, &wg))

res := Error[T](ErrEmpty)

var noop bool

wg.Add(1)

obs.Subscribe(child, func(n Notification[T]) {
if noop {
return
Expand Down Expand Up @@ -71,10 +71,11 @@ func (obs Observable[T]) BlockingFirst(ctx context.Context) (v T, err error) {
//
// Like any other Blocking methods, this method waits for every goroutine
// started during subscription to complete before returning.
// To have this works properly, one must use [Go] function rather than
// go statements to start new goroutines during subscription, especially
// when one need to subscribe to other Observables in a goroutine (otherwise,
// their program might panic randomly).
// To have this work properly, Observables must use [WaitGroupFromContext]
// to obtain a WaitGroup and use [WaitGroup.Go] rather than built-in go
// statements to start new goroutines during subscription, especially when
// they need to subscribe to other Observables in a goroutine; otherwise,
// runtime panicking might happen randomly (WaitGroup misuse).
func (obs Observable[T]) BlockingFirstOrElse(ctx context.Context, def T) T {
v, err := obs.BlockingFirst(ctx)
if err != nil {
Expand All @@ -95,16 +96,20 @@ func (obs Observable[T]) BlockingFirstOrElse(ctx context.Context, def T) T {
//
// Like any other Blocking methods, this method waits for every goroutine
// started during subscription to complete before returning.
// To have this works properly, one must use [Go] function rather than
// go statements to start new goroutines during subscription, especially
// when one need to subscribe to other Observables in a goroutine (otherwise,
// their program might panic randomly).
// To have this work properly, Observables must use [WaitGroupFromContext]
// to obtain a WaitGroup and use [WaitGroup.Go] rather than built-in go
// statements to start new goroutines during subscription, especially when
// they need to subscribe to other Observables in a goroutine; otherwise,
// runtime panicking might happen randomly (WaitGroup misuse).
func (obs Observable[T]) BlockingLast(ctx context.Context) (v T, err error) {
child, cancel := context.WithCancel(ctx)
child, wg := waitgroup.Install(child)
var wg WaitGroup

child, cancel := context.WithCancel(WithWaitGroup(ctx, &wg))

res := Error[T](ErrEmpty)

wg.Add(1)

obs.Subscribe(child, func(n Notification[T]) {
if n.HasValue || n.HasError {
res = n
Expand Down Expand Up @@ -143,10 +148,11 @@ func (obs Observable[T]) BlockingLast(ctx context.Context) (v T, err error) {
//
// Like any other Blocking methods, this method waits for every goroutine
// started during subscription to complete before returning.
// To have this works properly, one must use [Go] function rather than
// go statements to start new goroutines during subscription, especially
// when one need to subscribe to other Observables in a goroutine (otherwise,
// their program might panic randomly).
// To have this work properly, Observables must use [WaitGroupFromContext]
// to obtain a WaitGroup and use [WaitGroup.Go] rather than built-in go
// statements to start new goroutines during subscription, especially when
// they need to subscribe to other Observables in a goroutine; otherwise,
// runtime panicking might happen randomly (WaitGroup misuse).
func (obs Observable[T]) BlockingLastOrElse(ctx context.Context, def T) T {
v, err := obs.BlockingLast(ctx)
if err != nil {
Expand All @@ -168,18 +174,22 @@ func (obs Observable[T]) BlockingLastOrElse(ctx context.Context, def T) T {
//
// Like any other Blocking methods, this method waits for every goroutine
// started during subscription to complete before returning.
// To have this works properly, one must use [Go] function rather than
// go statements to start new goroutines during subscription, especially
// when one need to subscribe to other Observables in a goroutine (otherwise,
// their program might panic randomly).
// To have this work properly, Observables must use [WaitGroupFromContext]
// to obtain a WaitGroup and use [WaitGroup.Go] rather than built-in go
// statements to start new goroutines during subscription, especially when
// they need to subscribe to other Observables in a goroutine; otherwise,
// runtime panicking might happen randomly (WaitGroup misuse).
func (obs Observable[T]) BlockingSingle(ctx context.Context) (v T, err error) {
child, cancel := context.WithCancel(ctx)
child, wg := waitgroup.Install(child)
var wg WaitGroup

child, cancel := context.WithCancel(WithWaitGroup(ctx, &wg))

res := Error[T](ErrEmpty)

var noop bool

wg.Add(1)

obs.Subscribe(child, func(n Notification[T]) {
if noop {
return
Expand Down Expand Up @@ -232,14 +242,19 @@ func (obs Observable[T]) BlockingSingle(ctx context.Context) (v T, err error) {
//
// Like any other Blocking methods, this method waits for every goroutine
// started during subscription to complete before returning.
// To have this works properly, one must use [Go] function rather than
// go statements to start new goroutines during subscription, especially
// when one need to subscribe to other Observables in a goroutine (otherwise,
// their program might panic randomly).
// To have this work properly, Observables must use [WaitGroupFromContext]
// to obtain a WaitGroup and use [WaitGroup.Go] rather than built-in go
// statements to start new goroutines during subscription, especially when
// they need to subscribe to other Observables in a goroutine; otherwise,
// runtime panicking might happen randomly (WaitGroup misuse).
func (obs Observable[T]) BlockingSubscribe(ctx context.Context, sink Observer[T]) error {
var wg WaitGroup

child := WithWaitGroup(ctx, &wg)

var res Notification[T]

child, wg := waitgroup.Install(ctx)
wg.Add(1)

obs.Subscribe(child, func(n Notification[T]) {
res = n
Expand Down
15 changes: 5 additions & 10 deletions combinelatest2.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package rx

import (
"context"

"github.com/b97tsk/rx/internal/waitgroup"
)
import "context"

// CombineLatest2 combines multiple Observables to create an Observable that
// emits projection of latest values of each of its input Observables.
Expand All @@ -18,6 +14,7 @@ func CombineLatest2[T1, T2, R any](
}

return func(ctx context.Context, sink Observer[R]) {
wg := WaitGroupFromContext(ctx)
ctx, cancel := context.WithCancel(ctx)

noop := make(chan struct{})
Expand All @@ -30,9 +27,7 @@ func CombineLatest2[T1, T2, R any](
chan1 := make(chan Notification[T1])
chan2 := make(chan Notification[T2])

ctxHoisted := waitgroup.Hoist(ctx)

Go(ctxHoisted, func() {
wg.Go(func() {
var s combineLatestState2[T1, T2]

done := false
Expand All @@ -47,8 +42,8 @@ func CombineLatest2[T1, T2, R any](
}
})

Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) })
Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) })
wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) })
wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) })
}
}

Expand Down
17 changes: 6 additions & 11 deletions combinelatest3.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package rx

import (
"context"

"github.com/b97tsk/rx/internal/waitgroup"
)
import "context"

// CombineLatest3 combines multiple Observables to create an Observable that
// emits projection of latest values of each of its input Observables.
Expand All @@ -19,6 +15,7 @@ func CombineLatest3[T1, T2, T3, R any](
}

return func(ctx context.Context, sink Observer[R]) {
wg := WaitGroupFromContext(ctx)
ctx, cancel := context.WithCancel(ctx)

noop := make(chan struct{})
Expand All @@ -32,9 +29,7 @@ func CombineLatest3[T1, T2, T3, R any](
chan2 := make(chan Notification[T2])
chan3 := make(chan Notification[T3])

ctxHoisted := waitgroup.Hoist(ctx)

Go(ctxHoisted, func() {
wg.Go(func() {
var s combineLatestState3[T1, T2, T3]

done := false
Expand All @@ -51,9 +46,9 @@ func CombineLatest3[T1, T2, T3, R any](
}
})

Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) })
Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) })
Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) })
wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) })
wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) })
wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) })
}
}

Expand Down
19 changes: 7 additions & 12 deletions combinelatest4.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package rx

import (
"context"

"github.com/b97tsk/rx/internal/waitgroup"
)
import "context"

// CombineLatest4 combines multiple Observables to create an Observable that
// emits projection of latest values of each of its input Observables.
Expand All @@ -20,6 +16,7 @@ func CombineLatest4[T1, T2, T3, T4, R any](
}

return func(ctx context.Context, sink Observer[R]) {
wg := WaitGroupFromContext(ctx)
ctx, cancel := context.WithCancel(ctx)

noop := make(chan struct{})
Expand All @@ -34,9 +31,7 @@ func CombineLatest4[T1, T2, T3, T4, R any](
chan3 := make(chan Notification[T3])
chan4 := make(chan Notification[T4])

ctxHoisted := waitgroup.Hoist(ctx)

Go(ctxHoisted, func() {
wg.Go(func() {
var s combineLatestState4[T1, T2, T3, T4]

done := false
Expand All @@ -55,10 +50,10 @@ func CombineLatest4[T1, T2, T3, T4, R any](
}
})

Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) })
Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) })
Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) })
Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) })
wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) })
wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) })
wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) })
wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) })
}
}

Expand Down
21 changes: 8 additions & 13 deletions combinelatest5.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package rx

import (
"context"

"github.com/b97tsk/rx/internal/waitgroup"
)
import "context"

// CombineLatest5 combines multiple Observables to create an Observable that
// emits projection of latest values of each of its input Observables.
Expand All @@ -21,6 +17,7 @@ func CombineLatest5[T1, T2, T3, T4, T5, R any](
}

return func(ctx context.Context, sink Observer[R]) {
wg := WaitGroupFromContext(ctx)
ctx, cancel := context.WithCancel(ctx)

noop := make(chan struct{})
Expand All @@ -36,9 +33,7 @@ func CombineLatest5[T1, T2, T3, T4, T5, R any](
chan4 := make(chan Notification[T4])
chan5 := make(chan Notification[T5])

ctxHoisted := waitgroup.Hoist(ctx)

Go(ctxHoisted, func() {
wg.Go(func() {
var s combineLatestState5[T1, T2, T3, T4, T5]

done := false
Expand All @@ -59,11 +54,11 @@ func CombineLatest5[T1, T2, T3, T4, T5, R any](
}
})

Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) })
Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) })
Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) })
Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) })
Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) })
wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) })
wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) })
wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) })
wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) })
wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) })
}
}

Expand Down
23 changes: 9 additions & 14 deletions combinelatest6.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package rx

import (
"context"

"github.com/b97tsk/rx/internal/waitgroup"
)
import "context"

// CombineLatest6 combines multiple Observables to create an Observable that
// emits projection of latest values of each of its input Observables.
Expand All @@ -22,6 +18,7 @@ func CombineLatest6[T1, T2, T3, T4, T5, T6, R any](
}

return func(ctx context.Context, sink Observer[R]) {
wg := WaitGroupFromContext(ctx)
ctx, cancel := context.WithCancel(ctx)

noop := make(chan struct{})
Expand All @@ -38,9 +35,7 @@ func CombineLatest6[T1, T2, T3, T4, T5, T6, R any](
chan5 := make(chan Notification[T5])
chan6 := make(chan Notification[T6])

ctxHoisted := waitgroup.Hoist(ctx)

Go(ctxHoisted, func() {
wg.Go(func() {
var s combineLatestState6[T1, T2, T3, T4, T5, T6]

done := false
Expand All @@ -63,12 +58,12 @@ func CombineLatest6[T1, T2, T3, T4, T5, T6, R any](
}
})

Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) })
Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) })
Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) })
Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) })
Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) })
Go(ctxHoisted, func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) })
wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) })
wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) })
wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) })
wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) })
wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) })
wg.Go(func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) })
}
}

Expand Down
Loading

0 comments on commit ca9010b

Please sign in to comment.