Skip to content

Commit

Permalink
update(CombineLatest*): do not start goroutines for subscribing obser…
Browse files Browse the repository at this point in the history
…vables
  • Loading branch information
b97tsk committed Apr 1, 2024
1 parent 1ae2cc8 commit ae42a82
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 54 deletions.
7 changes: 4 additions & 3 deletions combinelatest2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ func CombineLatest2[T1, T2, R any](
chan1 := make(chan Notification[T1])
chan2 := make(chan Notification[T2])

c.Go(func() { obs1.Subscribe(c, channelObserver(chan1, noop)) })
c.Go(func() { obs2.Subscribe(c, channelObserver(chan2, noop)) })

c.Go(func() {
var s combineLatestState2[T1, T2]

Expand All @@ -36,6 +33,10 @@ func CombineLatest2[T1, T2, R any](
}
}
})

_ = true &&
channelSubscribe(c, obs1, chan1, noop) &&
channelSubscribe(c, obs2, chan2, noop)
}
}

Expand Down
9 changes: 5 additions & 4 deletions combinelatest3.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ func CombineLatest3[T1, T2, T3, R any](
chan2 := make(chan Notification[T2])
chan3 := make(chan Notification[T3])

c.Go(func() { obs1.Subscribe(c, channelObserver(chan1, noop)) })
c.Go(func() { obs2.Subscribe(c, channelObserver(chan2, noop)) })
c.Go(func() { obs3.Subscribe(c, channelObserver(chan3, noop)) })

c.Go(func() {
var s combineLatestState3[T1, T2, T3]

Expand All @@ -41,6 +37,11 @@ func CombineLatest3[T1, T2, T3, R any](
}
}
})

_ = true &&
channelSubscribe(c, obs1, chan1, noop) &&
channelSubscribe(c, obs2, chan2, noop) &&
channelSubscribe(c, obs3, chan3, noop)
}
}

Expand Down
11 changes: 6 additions & 5 deletions combinelatest4.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ func CombineLatest4[T1, T2, T3, T4, R any](
chan3 := make(chan Notification[T3])
chan4 := make(chan Notification[T4])

c.Go(func() { obs1.Subscribe(c, channelObserver(chan1, noop)) })
c.Go(func() { obs2.Subscribe(c, channelObserver(chan2, noop)) })
c.Go(func() { obs3.Subscribe(c, channelObserver(chan3, noop)) })
c.Go(func() { obs4.Subscribe(c, channelObserver(chan4, noop)) })

c.Go(func() {
var s combineLatestState4[T1, T2, T3, T4]

Expand All @@ -46,6 +41,12 @@ func CombineLatest4[T1, T2, T3, T4, R any](
}
}
})

_ = true &&
channelSubscribe(c, obs1, chan1, noop) &&
channelSubscribe(c, obs2, chan2, noop) &&
channelSubscribe(c, obs3, chan3, noop) &&
channelSubscribe(c, obs4, chan4, noop)
}
}

Expand Down
13 changes: 7 additions & 6 deletions combinelatest5.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@ func CombineLatest5[T1, T2, T3, T4, T5, R any](
chan4 := make(chan Notification[T4])
chan5 := make(chan Notification[T5])

c.Go(func() { obs1.Subscribe(c, channelObserver(chan1, noop)) })
c.Go(func() { obs2.Subscribe(c, channelObserver(chan2, noop)) })
c.Go(func() { obs3.Subscribe(c, channelObserver(chan3, noop)) })
c.Go(func() { obs4.Subscribe(c, channelObserver(chan4, noop)) })
c.Go(func() { obs5.Subscribe(c, channelObserver(chan5, noop)) })

c.Go(func() {
var s combineLatestState5[T1, T2, T3, T4, T5]

Expand All @@ -51,6 +45,13 @@ func CombineLatest5[T1, T2, T3, T4, T5, R any](
}
}
})

_ = true &&
channelSubscribe(c, obs1, chan1, noop) &&
channelSubscribe(c, obs2, chan2, noop) &&
channelSubscribe(c, obs3, chan3, noop) &&
channelSubscribe(c, obs4, chan4, noop) &&
channelSubscribe(c, obs5, chan5, noop)
}
}

Expand Down
15 changes: 8 additions & 7 deletions combinelatest6.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ func CombineLatest6[T1, T2, T3, T4, T5, T6, R any](
chan5 := make(chan Notification[T5])
chan6 := make(chan Notification[T6])

c.Go(func() { obs1.Subscribe(c, channelObserver(chan1, noop)) })
c.Go(func() { obs2.Subscribe(c, channelObserver(chan2, noop)) })
c.Go(func() { obs3.Subscribe(c, channelObserver(chan3, noop)) })
c.Go(func() { obs4.Subscribe(c, channelObserver(chan4, noop)) })
c.Go(func() { obs5.Subscribe(c, channelObserver(chan5, noop)) })
c.Go(func() { obs6.Subscribe(c, channelObserver(chan6, noop)) })

c.Go(func() {
var s combineLatestState6[T1, T2, T3, T4, T5, T6]

Expand All @@ -56,6 +49,14 @@ func CombineLatest6[T1, T2, T3, T4, T5, T6, R any](
}
}
})

_ = true &&
channelSubscribe(c, obs1, chan1, noop) &&
channelSubscribe(c, obs2, chan2, noop) &&
channelSubscribe(c, obs3, chan3, noop) &&
channelSubscribe(c, obs4, chan4, noop) &&
channelSubscribe(c, obs5, chan5, noop) &&
channelSubscribe(c, obs6, chan6, noop)
}
}

Expand Down
17 changes: 9 additions & 8 deletions combinelatest7.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ func CombineLatest7[T1, T2, T3, T4, T5, T6, T7, R any](
chan6 := make(chan Notification[T6])
chan7 := make(chan Notification[T7])

c.Go(func() { obs1.Subscribe(c, channelObserver(chan1, noop)) })
c.Go(func() { obs2.Subscribe(c, channelObserver(chan2, noop)) })
c.Go(func() { obs3.Subscribe(c, channelObserver(chan3, noop)) })
c.Go(func() { obs4.Subscribe(c, channelObserver(chan4, noop)) })
c.Go(func() { obs5.Subscribe(c, channelObserver(chan5, noop)) })
c.Go(func() { obs6.Subscribe(c, channelObserver(chan6, noop)) })
c.Go(func() { obs7.Subscribe(c, channelObserver(chan7, noop)) })

c.Go(func() {
var s combineLatestState7[T1, T2, T3, T4, T5, T6, T7]

Expand All @@ -61,6 +53,15 @@ func CombineLatest7[T1, T2, T3, T4, T5, T6, T7, R any](
}
}
})

_ = true &&
channelSubscribe(c, obs1, chan1, noop) &&
channelSubscribe(c, obs2, chan2, noop) &&
channelSubscribe(c, obs3, chan3, noop) &&
channelSubscribe(c, obs4, chan4, noop) &&
channelSubscribe(c, obs5, chan5, noop) &&
channelSubscribe(c, obs6, chan6, noop) &&
channelSubscribe(c, obs7, chan7, noop)
}
}

Expand Down
19 changes: 10 additions & 9 deletions combinelatest8.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,6 @@ func CombineLatest8[T1, T2, T3, T4, T5, T6, T7, T8, R any](
chan7 := make(chan Notification[T7])
chan8 := make(chan Notification[T8])

c.Go(func() { obs1.Subscribe(c, channelObserver(chan1, noop)) })
c.Go(func() { obs2.Subscribe(c, channelObserver(chan2, noop)) })
c.Go(func() { obs3.Subscribe(c, channelObserver(chan3, noop)) })
c.Go(func() { obs4.Subscribe(c, channelObserver(chan4, noop)) })
c.Go(func() { obs5.Subscribe(c, channelObserver(chan5, noop)) })
c.Go(func() { obs6.Subscribe(c, channelObserver(chan6, noop)) })
c.Go(func() { obs7.Subscribe(c, channelObserver(chan7, noop)) })
c.Go(func() { obs8.Subscribe(c, channelObserver(chan8, noop)) })

c.Go(func() {
var s combineLatestState8[T1, T2, T3, T4, T5, T6, T7, T8]

Expand All @@ -66,6 +57,16 @@ func CombineLatest8[T1, T2, T3, T4, T5, T6, T7, T8, R any](
}
}
})

_ = true &&
channelSubscribe(c, obs1, chan1, noop) &&
channelSubscribe(c, obs2, chan2, noop) &&
channelSubscribe(c, obs3, chan3, noop) &&
channelSubscribe(c, obs4, chan4, noop) &&
channelSubscribe(c, obs5, chan5, noop) &&
channelSubscribe(c, obs6, chan6, noop) &&
channelSubscribe(c, obs7, chan7, noop) &&
channelSubscribe(c, obs8, chan8, noop)
}
}

Expand Down
21 changes: 11 additions & 10 deletions combinelatest9.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,6 @@ func CombineLatest9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any](
chan8 := make(chan Notification[T8])
chan9 := make(chan Notification[T9])

c.Go(func() { obs1.Subscribe(c, channelObserver(chan1, noop)) })
c.Go(func() { obs2.Subscribe(c, channelObserver(chan2, noop)) })
c.Go(func() { obs3.Subscribe(c, channelObserver(chan3, noop)) })
c.Go(func() { obs4.Subscribe(c, channelObserver(chan4, noop)) })
c.Go(func() { obs5.Subscribe(c, channelObserver(chan5, noop)) })
c.Go(func() { obs6.Subscribe(c, channelObserver(chan6, noop)) })
c.Go(func() { obs7.Subscribe(c, channelObserver(chan7, noop)) })
c.Go(func() { obs8.Subscribe(c, channelObserver(chan8, noop)) })
c.Go(func() { obs9.Subscribe(c, channelObserver(chan9, noop)) })

c.Go(func() {
var s combineLatestState9[T1, T2, T3, T4, T5, T6, T7, T8, T9]

Expand Down Expand Up @@ -71,6 +61,17 @@ func CombineLatest9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any](
}
}
})

_ = true &&
channelSubscribe(c, obs1, chan1, noop) &&
channelSubscribe(c, obs2, chan2, noop) &&
channelSubscribe(c, obs3, chan3, noop) &&
channelSubscribe(c, obs4, chan4, noop) &&
channelSubscribe(c, obs5, chan5, noop) &&
channelSubscribe(c, obs6, chan6, noop) &&
channelSubscribe(c, obs7, chan7, noop) &&
channelSubscribe(c, obs8, chan8, noop) &&
channelSubscribe(c, obs9, chan9, noop)
}
}

Expand Down
15 changes: 13 additions & 2 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,22 @@ func resistReentrance(f func()) func() {
}
}

func channelObserver[T any](c chan<- Notification[T], noop <-chan struct{}) Observer[T] {
func channelObserver[T any](ch chan<- Notification[T], noop <-chan struct{}) Observer[T] {
return func(n Notification[T]) {
select {
case c <- n:
case ch <- n:
case <-noop:
}
}
}

func channelSubscribe[T any](c Context, obs Observable[T], ch chan<- Notification[T], noop <-chan struct{}) bool {
obs.Subscribe(c, channelObserver(ch, noop))

select {
case <-noop:
return false
default:
return true
}
}

0 comments on commit ae42a82

Please sign in to comment.