Skip to content

Commit

Permalink
add Serialize function
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Mar 19, 2024
1 parent 19b76c9 commit f8354a5
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 135 deletions.
9 changes: 3 additions & 6 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ type mergeWithObservable[T any] struct {
}

func (obs mergeWithObservable[T]) Subscribe(c Context, sink Observer[T]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel).Serialized(c)
c, sink = Serialize(c, sink)

var num atomic.Uint32

Expand Down Expand Up @@ -157,8 +156,7 @@ func (obs mergeMapObservable[T, R]) Subscribe(c Context, sink Observer[R]) {
return
}

c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel).Serialized(c)
c, sink = Serialize(c, sink)

var x struct {
sync.Mutex
Expand Down Expand Up @@ -253,8 +251,7 @@ func (obs mergeMapObservable[T, R]) Subscribe(c Context, sink Observer[R]) {
}

func (obs mergeMapObservable[T, R]) SubscribeWithBuffering(c Context, sink Observer[R]) {
c, cancel := c.WithCancel()
sink = sink.OnLastNotification(cancel).Serialized(c)
c, sink = Serialize(c, sink)

var x struct {
sync.Mutex
Expand Down
5 changes: 1 addition & 4 deletions multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ func (m *multicast[T]) subscribe(c Context, sink Observer[T]) {

lastn := m.LastN
if lastn.Kind == 0 {
var cancel CancelFunc

c, cancel = c.WithCancel()
sink = sink.OnLastNotification(cancel).Serialized(c)
c, sink = Serialize(c, sink)

observer := sink
m.Mobs.Add(&observer)
Expand Down
5 changes: 1 addition & 4 deletions multicastreplay.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,7 @@ func (m *multicastReplay[T]) subscribe(c Context, sink Observer[T]) {

lastn := m.LastN
if lastn.Kind == 0 {
var cancel CancelFunc

c, cancel = c.WithCancel()
sink = sink.OnLastNotification(cancel).Serialized(c)
c, sink = Serialize(c, sink)

observer := sink
m.Mobs.Add(&observer)
Expand Down
4 changes: 1 addition & 3 deletions multicastreplay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ func TestMulticastReplay(t *testing.T) {

subscribeThenComplete := rx.NewObservable(
func(c rx.Context, sink rx.Observer[string]) {
c, cancel := c.WithCancel()
sink = sink.Serialized(c)
c, sink = rx.Serialize(c, sink)
m.Subscribe(c, sink)
sink.Complete()
cancel()
},
)

Expand Down
119 changes: 1 addition & 118 deletions observer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package rx

import (
"context"
"runtime"
"sync"
)
import "runtime"

// An Observer is a consumer of notifications delivered by an [Observable].
type Observer[T any] func(n Notification[T])
Expand Down Expand Up @@ -57,119 +53,6 @@ func (sink Observer[T]) OnLastNotification(f func()) Observer[T] {
}
}

// Serialized creates an Observer that passes incoming emissions to sink
// in a mutually exclusive way.
func (sink Observer[T]) Serialized(c Context) Observer[T] {
var x struct {
Mu sync.Mutex
Emitting bool
LastN Notification[struct{}]
Queue []T
Context context.Context
DoneChan <-chan struct{}
Observer Observer[T]
}

x.Context = c.Context
x.DoneChan = c.Done()
x.Observer = sink

return func(n Notification[T]) {
x.Mu.Lock()

if x.LastN.Kind != 0 {
x.Mu.Unlock()
return
}

switch n.Kind {
case KindError:
x.LastN = Error[struct{}](n.Error)
case KindComplete:
x.LastN = Complete[struct{}]()
}

if x.Emitting {
if n.Kind == KindNext {
x.Queue = append(x.Queue, n.Value)
}

x.Mu.Unlock()

return
}

x.Emitting = true

x.Mu.Unlock()

throw := func(err error) {
x.Mu.Lock()
x.Emitting = false
x.LastN = Error[struct{}](err)
x.Queue = nil
x.Mu.Unlock()
x.Observer.Error(err)
}

oops := func() { throw(ErrOops) }

sink := x.Observer

switch n.Kind {
case KindNext:
select {
default:
case <-x.DoneChan:
throw(x.Context.Err())
return
}

Try1(sink, n, oops)

case KindError, KindComplete:
sink(n)
return
}

for {
x.Mu.Lock()

if x.Queue == nil {
lastn := x.LastN

x.Emitting = false
x.Mu.Unlock()

switch lastn.Kind {
case KindError:
sink.Error(lastn.Error)
case KindComplete:
sink.Complete()
}

return
}

q := x.Queue
x.Queue = nil

x.Mu.Unlock()

for _, v := range q {
select {
default:
case <-x.DoneChan:
throw(x.Context.Err())
return
}

Try1(sink, Next(v), oops)
}
}
}
}

// WithRuntimeFinalizer creates an Observer with a runtime finalizer set to
// run sink.Error(ErrFinalized) in a goroutine.
// sink must be safe for concurrent use.
Expand Down
123 changes: 123 additions & 0 deletions serialize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package rx

import (
"context"
"sync"
)

// Serialize returns an Observer that passes incoming emissions to sink
// in a mutually exclusive way.
// Serialize also returns a copy of c that will be cancelled when sink is
// about to receive a notification of error or completion.
func Serialize[T any](c Context, sink Observer[T]) (Context, Observer[T]) {
c, cancel := c.WithCancel()

var x struct {
Mu sync.Mutex
Emitting bool
LastN Notification[struct{}]
Queue []T
Context context.Context
DoneChan <-chan struct{}
Observer Observer[T]
}

x.Context = c.Context
x.DoneChan = c.Done()
x.Observer = sink.OnLastNotification(cancel)

return c, func(n Notification[T]) {
x.Mu.Lock()

if x.LastN.Kind != 0 {
x.Mu.Unlock()
return
}

switch n.Kind {
case KindError:
x.LastN = Error[struct{}](n.Error)
case KindComplete:
x.LastN = Complete[struct{}]()
}

if x.Emitting {
if n.Kind == KindNext {
x.Queue = append(x.Queue, n.Value)
}

x.Mu.Unlock()

return
}

x.Emitting = true

x.Mu.Unlock()

throw := func(err error) {
x.Mu.Lock()
x.Emitting = false
x.LastN = Error[struct{}](err)
x.Queue = nil
x.Mu.Unlock()
x.Observer.Error(err)
}

oops := func() { throw(ErrOops) }

sink := x.Observer

switch n.Kind {
case KindNext:
select {
default:
case <-x.DoneChan:
throw(x.Context.Err())
return
}

Try1(sink, n, oops)

case KindError, KindComplete:
sink(n)
return
}

for {
x.Mu.Lock()

if x.Queue == nil {
lastn := x.LastN

x.Emitting = false
x.Mu.Unlock()

switch lastn.Kind {
case KindError:
sink.Error(lastn.Error)
case KindComplete:
sink.Complete()
}

return
}

q := x.Queue
x.Queue = nil

x.Mu.Unlock()

for _, v := range q {
select {
default:
case <-x.DoneChan:
throw(x.Context.Err())
return
}

Try1(sink, Next(v), oops)
}
}
}
}

0 comments on commit f8354a5

Please sign in to comment.