Skip to content

Commit

Permalink
update(*): rewrite OnLastNotification -> OnTermination
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Mar 28, 2024
1 parent d857fa6 commit 3a4dc3e
Show file tree
Hide file tree
Showing 57 changed files with 72 additions and 72 deletions.
2 changes: 1 addition & 1 deletion audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type auditObservable[T, U any] struct {

func (obs auditObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var x struct {
Context atomic.Value
Expand Down
2 changes: 1 addition & 1 deletion channelize.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func Channelize[T any](join func(upstream <-chan Notification[T], downstream cha
func(source Observable[T]) Observable[T] {
return func(c Context, sink Observer[T]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

upstream := make(chan Notification[T])
downstream := make(chan Notification[T])
Expand Down
2 changes: 1 addition & 1 deletion combinelatest2.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func CombineLatest2[T1, T2, R any](
return func(c Context, sink Observer[R]) {
c, cancel := c.WithCancel()
noop := make(chan struct{})
sink = sink.OnLastNotification(func() {
sink = sink.OnTermination(func() {
cancel()
close(noop)
})
Expand Down
2 changes: 1 addition & 1 deletion combinelatest3.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func CombineLatest3[T1, T2, T3, R any](
return func(c Context, sink Observer[R]) {
c, cancel := c.WithCancel()
noop := make(chan struct{})
sink = sink.OnLastNotification(func() {
sink = sink.OnTermination(func() {
cancel()
close(noop)
})
Expand Down
2 changes: 1 addition & 1 deletion combinelatest4.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func CombineLatest4[T1, T2, T3, T4, R any](
return func(c Context, sink Observer[R]) {
c, cancel := c.WithCancel()
noop := make(chan struct{})
sink = sink.OnLastNotification(func() {
sink = sink.OnTermination(func() {
cancel()
close(noop)
})
Expand Down
2 changes: 1 addition & 1 deletion combinelatest5.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func CombineLatest5[T1, T2, T3, T4, T5, R any](
return func(c Context, sink Observer[R]) {
c, cancel := c.WithCancel()
noop := make(chan struct{})
sink = sink.OnLastNotification(func() {
sink = sink.OnTermination(func() {
cancel()
close(noop)
})
Expand Down
2 changes: 1 addition & 1 deletion combinelatest6.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func CombineLatest6[T1, T2, T3, T4, T5, T6, R any](
return func(c Context, sink Observer[R]) {
c, cancel := c.WithCancel()
noop := make(chan struct{})
sink = sink.OnLastNotification(func() {
sink = sink.OnTermination(func() {
cancel()
close(noop)
})
Expand Down
2 changes: 1 addition & 1 deletion combinelatest7.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func CombineLatest7[T1, T2, T3, T4, T5, T6, T7, R any](
return func(c Context, sink Observer[R]) {
c, cancel := c.WithCancel()
noop := make(chan struct{})
sink = sink.OnLastNotification(func() {
sink = sink.OnTermination(func() {
cancel()
close(noop)
})
Expand Down
2 changes: 1 addition & 1 deletion combinelatest8.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func CombineLatest8[T1, T2, T3, T4, T5, T6, T7, T8, R any](
return func(c Context, sink Observer[R]) {
c, cancel := c.WithCancel()
noop := make(chan struct{})
sink = sink.OnLastNotification(func() {
sink = sink.OnTermination(func() {
cancel()
close(noop)
})
Expand Down
2 changes: 1 addition & 1 deletion combinelatest9.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func CombineLatest9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any](
return func(c Context, sink Observer[R]) {
c, cancel := c.WithCancel()
noop := make(chan struct{})
sink = sink.OnLastNotification(func() {
sink = sink.OnTermination(func() {
cancel()
close(noop)
})
Expand Down
4 changes: 2 additions & 2 deletions concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (obs concatMapObservable[T, R]) Subscribe(c Context, sink Observer[R]) {
}

c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var noop bool

Expand All @@ -162,7 +162,7 @@ func (obs concatMapObservable[T, R]) Subscribe(c Context, sink Observer[R]) {

func (obs concatMapObservable[T, R]) SubscribeWithBuffering(c Context, sink Observer[R]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var x struct {
Context atomic.Value
Expand Down
2 changes: 1 addition & 1 deletion connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type connectObservable[T, R any] struct {

func (obs connectObservable[T, R]) Subscribe(c Context, sink Observer[R]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)
oops := func() { sink.Error(ErrOops) }
subject := Try01(obs.Connector, oops)
Try11(obs.Selector, subject.Observable, oops).Subscribe(c, sink)
Expand Down
4 changes: 2 additions & 2 deletions contains.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func Contains[T any](pred func(v T) bool) Operator[T, bool] {
func(source Observable[T]) Observable[bool] {
return func(c Context, sink Observer[bool]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var noop bool

Expand Down Expand Up @@ -42,7 +42,7 @@ func ContainsElement[T comparable](v T) Operator[T, bool] {
func(source Observable[T]) Observable[bool] {
return func(c Context, sink Observer[bool]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var noop bool

Expand Down
2 changes: 1 addition & 1 deletion debounce.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type debounceObservable[T, U any] struct {

func (obs debounceObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var x struct {
Context atomic.Value
Expand Down
2 changes: 1 addition & 1 deletion delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type delayObservable[T any] struct {

func (obs delayObservable[T]) Subscribe(c Context, sink Observer[T]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var x struct {
Context atomic.Value
Expand Down
2 changes: 1 addition & 1 deletion dematerialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func Dematerialize[_ Notification[T], T any]() Operator[Notification[T], T] {
func(source Observable[Notification[T]]) Observable[T] {
return func(c Context, sink Observer[T]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var noop bool

Expand Down
8 changes: 4 additions & 4 deletions do.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ func OnError[T any](f func(err error)) Operator[T, T] {
)
}

// OnLastNotification mirrors the source Observable, and calls f when
// the source completes or emits an error notification.
func OnLastNotification[T any](f func()) Operator[T, T] {
// OnTermination mirrors the source Observable, and calls f when the source
// emits a notification of error or completion.
func OnTermination[T any](f func()) Operator[T, T] {
return NewOperator(
func(source Observable[T]) Observable[T] {
return func(c Context, sink Observer[T]) {
source.Subscribe(c, sink.OnLastNotification(f))
source.Subscribe(c, sink.OnTermination(f))
}
},
)
Expand Down
4 changes: 2 additions & 2 deletions do_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestDo(t *testing.T) {
doTest(t, func(f func()) rx.Operator[int, int] {
return rx.OnError[int](func(error) { f() })
}, 0, 0, 0, 1)
doTest(t, rx.OnLastNotification[int], 1, 2, 3, 4)
doTest(t, rx.OnTermination[int], 1, 2, 3, 4)

NewTestSuite[string](t).Case(
rx.Pipe1(rx.Just("A"), rx.Do(func(n rx.Notification[string]) { panic(ErrTest) })),
Expand All @@ -35,7 +35,7 @@ func TestDo(t *testing.T) {
rx.Pipe1(rx.Throw[string](ErrTest), rx.OnError[string](func(err error) { panic(err) })),
rx.ErrOops, ErrTest,
).Case(
rx.Pipe1(rx.Throw[string](ErrTest), rx.OnLastNotification[string](func() { panic(ErrTest) })),
rx.Pipe1(rx.Throw[string](ErrTest), rx.OnTermination[string](func() { panic(ErrTest) })),
rx.ErrOops, ErrTest,
)
}
Expand Down
2 changes: 1 addition & 1 deletion every.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func Every[T any](pred func(v T) bool) Operator[T, bool] {
func(source Observable[T]) Observable[bool] {
return func(c Context, sink Observer[bool]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var noop bool

Expand Down
2 changes: 1 addition & 1 deletion exhaust.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type exhaustMapObservable[T, R any] struct {

func (obs exhaustMapObservable[T, R]) Subscribe(c Context, sink Observer[R]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var x struct {
Context atomic.Value
Expand Down
2 changes: 1 addition & 1 deletion find.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func Find[T any](pred func(v T) bool) Operator[T, T] {
func(source Observable[T]) Observable[T] {
return func(c Context, sink Observer[T]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var noop bool

Expand Down
8 changes: 4 additions & 4 deletions observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ func (sink Observer[T]) ElementsOnly(n Notification[T]) {
}
}

// OnLastNotification creates an Observer that passes incoming emissions to
// sink, and when a notification of error or completion passes in, calls f
// just before passing it to sink.
func (sink Observer[T]) OnLastNotification(f func()) Observer[T] {
// OnTermination creates an Observer that passes incoming emissions to sink,
// and when a notification of error or completion passes in, calls f just
// before passing it to sink.
func (sink Observer[T]) OnTermination(f func()) Observer[T] {
return func(n Notification[T]) {
switch n.Kind {
case KindNext:
Expand Down
2 changes: 1 addition & 1 deletion sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type sampleObservable[T, U any] struct {

func (obs sampleObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var x struct {
Context atomic.Value
Expand Down
2 changes: 1 addition & 1 deletion serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ package rx
func Serialize[T any](c Context, sink Observer[T]) (Context, Observer[T]) {
c, cancel := c.WithCancel()
u := new(unicast[T])
u.subscribe(c, sink.OnLastNotification(cancel))
u.subscribe(c, sink.OnTermination(cancel))
return c, u.emit
}
2 changes: 1 addition & 1 deletion share.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (obs *shareObservable[T]) Subscribe(c Context, sink Observer[T]) {
}

c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

obs.Subject.Subscribe(c, sink)

Expand Down
2 changes: 1 addition & 1 deletion single.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func Single[T any]() Operator[T, T] {
func(source Observable[T]) Observable[T] {
return func(c Context, sink Observer[T]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var first struct {
Value T
Expand Down
8 changes: 4 additions & 4 deletions skipuntil.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type skipUntilObservable[T, U any] struct {

func (obs skipUntilObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var x struct {
Context atomic.Value
Expand Down Expand Up @@ -65,7 +65,7 @@ func (obs skipUntilObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
)
}

finish := func(n Notification[T]) {
terminate := func(n Notification[T]) {
old := x.Context.Swap(sentinel)

cancel()
Expand All @@ -78,7 +78,7 @@ func (obs skipUntilObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
select {
default:
case <-c.Done():
finish(Error[T](c.Err()))
terminate(Error[T](c.Err()))
return
}

Expand All @@ -94,7 +94,7 @@ func (obs skipUntilObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
sink(n)
}
case KindError, KindComplete:
finish(n)
terminate(n)
}
})
}
2 changes: 1 addition & 1 deletion switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type switchMapObservable[T, R any] struct {

func (obs switchMapObservable[T, R]) Subscribe(c Context, sink Observer[R]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var x struct {
Context atomic.Value
Expand Down
2 changes: 1 addition & 1 deletion take.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func Take[T any](count int) Operator[T, T] {

return func(c Context, sink Observer[T]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var noop bool

Expand Down
8 changes: 4 additions & 4 deletions takeuntil.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type takeUntilObservable[T, U any] struct {

func (obs takeUntilObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var x struct {
Context atomic.Value
Expand Down Expand Up @@ -84,7 +84,7 @@ func (obs takeUntilObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
x.Source.Add(1)
x.Source.Unlock()

finish := func(n Notification[T]) {
terminate := func(n Notification[T]) {
defer x.Source.Done()

old := x.Context.Swap(sentinel)
Expand All @@ -99,7 +99,7 @@ func (obs takeUntilObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
select {
default:
case <-c.Done():
finish(Error[T](c.Err()))
terminate(Error[T](c.Err()))
return
}

Expand All @@ -110,7 +110,7 @@ func (obs takeUntilObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
sink(n)
}
case KindError, KindComplete:
finish(n)
terminate(n)
}
})
}
2 changes: 1 addition & 1 deletion takewhile.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func TakeWhile[T any](pred func(v T) bool) Operator[T, T] {
func(source Observable[T]) Observable[T] {
return func(c Context, sink Observer[T]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var noop bool

Expand Down
2 changes: 1 addition & 1 deletion throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type throttleObservable[T, U any] struct {

func (obs throttleObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel)
sink = sink.OnTermination(cancel)

var x struct {
Context atomic.Value
Expand Down
2 changes: 1 addition & 1 deletion unicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (u *unicast[T]) subscribe(c Context, sink Observer[T]) {
done := c.Done()
if done != nil {
stop := c.AfterFunc(func() { u.emit(Error[T](c.Err())) })
sink = sink.OnLastNotification(func() { stop() })
sink = sink.OnTermination(func() { stop() })
}

u.Context = c.Context
Expand Down
Loading

0 comments on commit 3a4dc3e

Please sign in to comment.