Skip to content

Commit

Permalink
Overhaul of stream.Stream function signature
Browse files Browse the repository at this point in the history
  • Loading branch information
jpfourny committed Oct 19, 2024
1 parent e175a64 commit f2cff28
Show file tree
Hide file tree
Showing 27 changed files with 573 additions and 643 deletions.
24 changes: 10 additions & 14 deletions internal/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ type Store[K, V any] interface {
Size() int
Get(key K) opt.Optional[V]
Put(key K, value V)
ForEach(func(key K, value V) bool) bool
ForEachKey(func(key K) bool) bool
ForEach(func(key K, value V) bool)
ForEachKey(func(key K) bool)
}

// NewMapped creates a new Store backed by a map.
Expand Down Expand Up @@ -66,22 +66,20 @@ func (s mappedStore[K, V]) Put(key K, value V) {
s[key] = value
}

func (s mappedStore[K, V]) ForEach(yield func(K, V) bool) bool {
func (s mappedStore[K, V]) ForEach(yield func(K, V) bool) {
for k, v := range s {
if !yield(k, v) {
return false
break
}
}
return true
}

func (s mappedStore[K, V]) ForEachKey(yield func(K) bool) bool {
func (s mappedStore[K, V]) ForEachKey(yield func(K) bool) {
for k := range s {
if !yield(k) {
return false
break
}
}
return true
}

// sortedStore provides an implementation of Store using sorted slices and binary-search.
Expand Down Expand Up @@ -121,20 +119,18 @@ func (s *sortedStore[K, V]) indexOf(key K) (int, bool) {
return slices.BinarySearchFunc(s.keys, key, s.compare)
}

func (s *sortedStore[K, V]) ForEach(yield func(K, V) bool) bool {
func (s *sortedStore[K, V]) ForEach(yield func(K, V) bool) {
for i, k := range s.keys {
if !yield(k, s.values[i]) {
return false
break
}
}
return true
}

func (s *sortedStore[K, V]) ForEachKey(yield func(K) bool) bool {
func (s *sortedStore[K, V]) ForEachKey(yield func(K) bool) {
for _, k := range s.keys {
if !yield(k) {
return false
break
}
}
return true
}
4 changes: 4 additions & 0 deletions pkg/opt/none.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (n None[V]) IfPresentElse(_ func(V), f func()) bool {
return false
}

func (n None[V]) Tap(_ func(V)) Optional[V] {
return n
}

func (n None[V]) String() string {
return "None"
}
4 changes: 4 additions & 0 deletions pkg/opt/optional.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type Optional[V any] interface {
// Filter returns an Optional containing the value contained in the Optional if the provided predicate returns true for that value.
// If the Optional is empty, an empty Optional is returned.
Filter(func(V) bool) Optional[V]

// Tap calls the provided function with the value contained in the Optional if the Optional is not empty.
// Returns the Optional itself for chaining.
Tap(func(V)) Optional[V]
}

// Empty returns an empty Optional (None).
Expand Down
30 changes: 30 additions & 0 deletions pkg/opt/optional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,36 @@ func TestIfPresentElse(t *testing.T) {
})
}

func TestTap(t *testing.T) {
t.Run("Some", func(t *testing.T) {
var called bool
o := Of(42)
o2 := o.Tap(func(i int) {
called = true
})
if o != o2 {
t.Errorf("expected Tap to return the Optional")
}
if !called {
t.Errorf("expected callback to be called")
}
})

t.Run("None", func(t *testing.T) {
var called bool
o := Empty[int]()
o2 := o.Tap(func(i int) {
called = true
})
if o != o2 {
t.Errorf("expected Tap to return the Optional")
}
if called {
t.Errorf("expected callback to not be called")
}
})
}

func TestString(t *testing.T) {
t.Run("Some", func(t *testing.T) {
o := Of(42)
Expand Down
5 changes: 5 additions & 0 deletions pkg/opt/some.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ func (s Some[V]) IfPresentElse(f func(V), _ func()) bool {
return true
}

func (s Some[V]) Tap(f func(V)) Optional[V] {
f(s.Value)
return s
}

func (s Some[V]) String() string {
return fmt.Sprintf("Some(%#v)", s.Value)
}
1 change: 1 addition & 0 deletions pkg/ptr/pointer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func TestRef(t *testing.T) {
p := Ref(i)
if p == nil {
t.Errorf("Ref(%v) = %v; want non-nil", i, p)
return
}
if *p != i {
t.Errorf("Ref(%v) = %v; want %v", i, p, i)
Expand Down
6 changes: 3 additions & 3 deletions pkg/stream/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type OptionalCombiner[E1, E2, F any] func(E1, E2) opt.Optional[F]
// )
// out := stream.DebugString(s) // "<foo1>"
func CombineOrDiscard[E1, E2, F any](s1 Stream[E1], s2 Stream[E2], combine OptionalCombiner[E1, E2, F]) Stream[F] {
return func(yield Consumer[F]) bool {
return func(yield Consumer[F]) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -65,12 +65,12 @@ func CombineOrDiscard[E1, E2, F any](s1 Stream[E1], s2 Stream[E2], combine Optio
e1, ok1 := <-ch1
e2, ok2 := <-ch2
if !ok1 || !ok2 {
return true
return
}

if o := combine(e1, e2); o.Present() {
if !yield(o.GetOrZero()) {
return false
return
}
}
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/stream/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type Predicate[E any] func(e E) (pass bool)
// })
// out := stream.DebugString(s) // "<2>"
func Filter[E any](s Stream[E], p Predicate[E]) Stream[E] {
return func(yield Consumer[E]) bool {
return s(func(e E) bool {
return func(yield Consumer[E]) {
s(func(e E) bool {
if p(e) {
return yield(e)
}
Expand All @@ -39,9 +39,9 @@ func Filter[E any](s Stream[E], p Predicate[E]) Stream[E] {
// })
// out := stream.DebugString(s) // "<(1, 2), (3, 4)>"
func FilterIndexed[E any](s Stream[E], p func(E) bool) Stream[pair.Pair[int64, E]] {
return func(yield Consumer[pair.Pair[int64, E]]) bool {
return func(yield Consumer[pair.Pair[int64, E]]) {
var i int64
return s(func(e E) bool {
s(func(e E) bool {
i++
if p(e) {
return yield(pair.Of(i-1, e))
Expand All @@ -59,12 +59,12 @@ func FilterIndexed[E any](s Stream[E], p func(E) bool) Stream[pair.Pair[int64, E
// s := stream.Limit(stream.Of(1, 2, 3), 2)
// out := stream.DebugString(s) // "<1, 2>"
func Limit[E any](s Stream[E], n int64) Stream[E] {
return func(yield Consumer[E]) bool {
return func(yield Consumer[E]) {
n := n // Shadow with a copy.
if n <= 0 {
return true
return // Nothing to do.
}
return s(func(e E) bool {
s(func(e E) bool {
n--
return yield(e) && n > 0
})
Expand All @@ -79,9 +79,9 @@ func Limit[E any](s Stream[E], n int64) Stream[E] {
// s := stream.Skip(stream.Of(1, 2, 3), 2)
// out := stream.DebugString(s) // "<3>"
func Skip[E any](s Stream[E], n int64) Stream[E] {
return func(yield Consumer[E]) bool {
return func(yield Consumer[E]) {
n := n // Shadow with a copy.
return s(func(e E) bool {
s(func(e E) bool {
if n > 0 {
n--
return true
Expand Down Expand Up @@ -124,9 +124,9 @@ func DistinctBy[E any](s Stream[E], compare cmp.Comparer[E]) Stream[E] {
}

func distinct[E any](s Stream[E], kv kvstore.Maker[E, struct{}]) Stream[E] {
return func(yield Consumer[E]) bool {
return func(yield Consumer[E]) {
seen := kv()
return s(func(e E) bool {
s(func(e E) bool {
if seen.Get(e).Present() {
return true // Skip.
}
Expand Down
48 changes: 20 additions & 28 deletions pkg/stream/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ import (
// s := stream.FromSlice([]int{1, 2, 3})
// out := stream.DebugString(s) // "<1, 2, 3>"
func FromSlice[E any](s []E) Stream[E] {
return func(yield Consumer[E]) bool {
return func(yield Consumer[E]) {
for _, e := range s {
if !yield(e) {
return false // Consumer saw enough.
break // Consumer saw enough.
}
}
return true
}
}

Expand All @@ -32,13 +31,12 @@ func FromSlice[E any](s []E) Stream[E] {
// s := stream.FromSliceBackwards([]int{1, 2, 3})
// out := stream.DebugString(s) // "<3, 2, 1>"
func FromSliceBackwards[E any](s []E) Stream[E] {
return func(yield Consumer[E]) bool {
return func(yield Consumer[E]) {
for i := len(s) - 1; i >= 0; i-- {
if !yield(s[i]) {
return false // Consumer saw enough.
break // Consumer saw enough.
}
}
return true
}
}

Expand All @@ -51,13 +49,12 @@ func FromSliceBackwards[E any](s []E) Stream[E] {
// s := stream.FromSliceWithIndex([]int{1, 2, 3})
// out := stream.DebugString(s) // "<(0, 1), (1, 2), (2, 3)>"
func FromSliceWithIndex[E any](s []E) Stream[pair.Pair[int, E]] {
return func(yield Consumer[pair.Pair[int, E]]) bool {
return func(yield Consumer[pair.Pair[int, E]]) {
for i, e := range s {
if !yield(pair.Of(i, e)) {
return false // Consumer saw enough.
break // Consumer saw enough.
}
}
return true
}
}

Expand All @@ -70,13 +67,12 @@ func FromSliceWithIndex[E any](s []E) Stream[pair.Pair[int, E]] {
// s := stream.FromSliceWithIndexBackwards([]int{1, 2, 3})
// out := stream.DebugString(s) // "<(2, 3), (1, 2), (0, 1)>"
func FromSliceWithIndexBackwards[E any](s []E) Stream[pair.Pair[int, E]] {
return func(yield Consumer[pair.Pair[int, E]]) bool {
return func(yield Consumer[pair.Pair[int, E]]) {
for i := len(s) - 1; i >= 0; i-- {
if !yield(pair.Of(i, s[i])) {
return false // Consumer saw enough.
break // Consumer saw enough.
}
}
return true
}
}

Expand All @@ -89,13 +85,12 @@ func FromSliceWithIndexBackwards[E any](s []E) Stream[pair.Pair[int, E]] {
// s := stream.FromMap(map[int]string{1: "foo", 2: "bar"})
// out := stream.DebugString(s) // "<(1, foo), (2, bar)>"
func FromMap[K comparable, V any](m map[K]V) Stream[pair.Pair[K, V]] {
return func(yield Consumer[pair.Pair[K, V]]) bool {
return func(yield Consumer[pair.Pair[K, V]]) {
for k, v := range m {
if !yield(pair.Of(k, v)) {
return false // Consumer saw enough.
break // Consumer saw enough.
}
}
return true
}
}

Expand All @@ -107,13 +102,12 @@ func FromMap[K comparable, V any](m map[K]V) Stream[pair.Pair[K, V]] {
// s := stream.FromMapKeys(map[int]string{1: "foo", 2: "bar"})
// out := stream.DebugString(s) // "<1, 2>" // Order not guaranteed.
func FromMapKeys[K comparable, V any](m map[K]V) Stream[K] {
return func(yield Consumer[K]) bool {
return func(yield Consumer[K]) {
for k := range m {
if !yield(k) {
return false // Consumer saw enough.
break // Consumer saw enough.
}
}
return true
}
}

Expand All @@ -125,13 +119,12 @@ func FromMapKeys[K comparable, V any](m map[K]V) Stream[K] {
// s := stream.FromMapValues(map[int]string{1: "foo", 2: "bar"})
// out := stream.DebugString(s) // "<foo, bar>" // Order not guaranteed.
func FromMapValues[K comparable, V any](m map[K]V) Stream[V] {
return func(yield Consumer[V]) bool {
return func(yield Consumer[V]) {
for _, v := range m {
if !yield(v) {
return false
break // Consumer saw enough.
}
}
return true
}
}

Expand All @@ -151,13 +144,12 @@ func FromMapValues[K comparable, V any](m map[K]V) Stream[V] {
// s := stream.FromChannel(ch)
// out := stream.DebugString(s) // "<1, 2>"
func FromChannel[E any](ch <-chan E) Stream[E] {
return func(yield Consumer[E]) bool {
return func(yield Consumer[E]) {
for e := range ch {
if !yield(e) {
return false // Consumer saw enough.
break // Consumer saw enough.
}
}
return true
}
}

Expand All @@ -176,18 +168,18 @@ func FromChannel[E any](ch <-chan E) Stream[E] {
// s := stream.FromChannelCtx(ctx, ch)
// out := stream.DebugString(s) // "<1, 2>"
func FromChannelCtx[E any](ctx context.Context, ch <-chan E) Stream[E] {
return func(yield Consumer[E]) bool {
return func(yield Consumer[E]) {
for {
select {
case e, ok := <-ch:
if !ok {
return true
return // Channel closed.
}
if !yield(e) {
return false
return // Consumer saw enough.
}
case <-ctx.Done():
return true
return // Context done.
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type Generator[E any] func() E
// s := stream.Generate(func() int { return 1 })
// out := stream.DebugString(s) // "<1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...>"
func Generate[E any](next Generator[E]) Stream[E] {
return func(yield Consumer[E]) bool {
return func(yield Consumer[E]) {
for {
if !yield(next()) {
return false
return
}
}
}
Expand Down
Loading

0 comments on commit f2cff28

Please sign in to comment.