Skip to content

Commit

Permalink
update(Observer): refactor Serialized method
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Feb 24, 2024
1 parent 3926694 commit 95c20b9
Showing 1 changed file with 52 additions and 11 deletions.
63 changes: 52 additions & 11 deletions observer.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package rx

import "sync"

// An Observer is a consumer of notifications delivered by an [Observable].
type Observer[T any] func(n Notification[T])

Expand Down Expand Up @@ -47,20 +49,59 @@ func (sink Observer[T]) OnLastNotification(f func()) Observer[T] {
// Serialized creates an Observer that passes incoming emissions to sink
// in a mutually exclusive way.
func (sink Observer[T]) Serialized() Observer[T] {
c := make(chan Observer[T], 1)
c <- sink
var x struct {
sync.Mutex
Done bool
Emitting bool
Queue []Notification[T]
}

return func(n Notification[T]) {
if sink, ok := <-c; ok {
switch n.Kind {
case KindNext:
sink(n)
c <- sink
case KindError, KindComplete:
close(c)
x.Lock()

if x.Done {
x.Unlock()
return
}

switch n.Kind {
case KindError, KindComplete:
x.Done = true
}

if x.Emitting {
x.Queue = append(x.Queue, n)
x.Unlock()
return
}

x.Emitting = true

x.Unlock()

sink(n)

switch n.Kind {
case KindError, KindComplete:
return
}

for {
x.Lock()

if x.Queue == nil {
x.Emitting = false
x.Unlock()
return
}

q := x.Queue
x.Queue = nil

x.Unlock()

for _, n := range q {
sink(n)
default: // Unknown kind.
c <- sink
}
}
}
Expand Down

0 comments on commit 95c20b9

Please sign in to comment.