Skip to content

Commit

Permalink
update(*): rewrite *Complete* -> *Success*
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Mar 28, 2024
1 parent d857fa6 commit dcb6267
Show file tree
Hide file tree
Showing 180 changed files with 850 additions and 850 deletions.
20 changes: 10 additions & 10 deletions audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func (obs auditObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
sink = sink.OnLastNotification(cancel)

var x struct {
Context atomic.Value
Complete atomic.Bool
Latest struct {
Context atomic.Value
Success atomic.Bool
Latest struct {
sync.Mutex
Value T
}
Expand Down Expand Up @@ -85,18 +85,18 @@ func (obs auditObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
}
})

if x.Context.CompareAndSwap(w.Context, c.Context) && x.Complete.Load() && x.Context.CompareAndSwap(c.Context, sentinel) {
sink.Complete()
if x.Context.CompareAndSwap(w.Context, c.Context) && x.Success.Load() && x.Context.CompareAndSwap(c.Context, sentinel) {
sink.Success()
}

case KindError:
if x.Context.Swap(sentinel) != sentinel {
sink.Error(n.Error)
}

case KindComplete:
if x.Context.CompareAndSwap(w.Context, c.Context) && x.Complete.Load() && x.Context.CompareAndSwap(c.Context, sentinel) {
sink.Complete()
case KindSuccess:
if x.Context.CompareAndSwap(w.Context, c.Context) && x.Success.Load() && x.Context.CompareAndSwap(c.Context, sentinel) {
sink.Success()
}
}
})
Expand All @@ -123,8 +123,8 @@ func (obs auditObservable[T, U]) Subscribe(c Context, sink Observer[T]) {
sink(n)
}

case KindComplete:
x.Complete.Store(true)
case KindSuccess:
x.Success.Store(true)

if x.Context.CompareAndSwap(c.Context, sentinel) {
sink(n)
Expand Down
10 changes: 5 additions & 5 deletions audit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestAudit(t *testing.T) {
},
),
),
"B", "D", "E", ErrComplete,
"B", "D", "E", ErrSuccess,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
Expand All @@ -32,7 +32,7 @@ func TestAudit(t *testing.T) {
},
),
),
ErrComplete,
ErrSuccess,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
Expand All @@ -46,7 +46,7 @@ func TestAudit(t *testing.T) {
},
),
),
ErrComplete,
ErrSuccess,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
Expand Down Expand Up @@ -77,13 +77,13 @@ func TestAudit(t *testing.T) {
AddLatencyToValues[string](1, 2),
rx.AuditTime[string](Step(3)),
),
"B", "D", "E", ErrComplete,
"B", "D", "E", ErrSuccess,
).Case(
rx.Pipe1(
rx.Empty[string](),
rx.AuditTime[string](Step(3)),
),
ErrComplete,
ErrSuccess,
).Case(
rx.Pipe1(
rx.Throw[string](ErrTest),
Expand Down
10 changes: 5 additions & 5 deletions blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (obs Observable[T]) BlockingFirst(parent Context) (v T, err error) {
}

switch n.Kind {
case KindNext, KindError, KindComplete:
case KindNext, KindError, KindSuccess:
noop = true

switch n.Kind {
Expand Down Expand Up @@ -81,7 +81,7 @@ func (obs Observable[T]) BlockingLast(parent Context) (v T, err error) {
}

switch n.Kind {
case KindError, KindComplete:
case KindError, KindSuccess:
cancel()
}
})
Expand Down Expand Up @@ -149,7 +149,7 @@ func (obs Observable[T]) BlockingSingle(parent Context) (v T, err error) {
}

switch n.Kind {
case KindError, KindComplete:
case KindError, KindSuccess:
cancel()
}
})
Expand Down Expand Up @@ -187,7 +187,7 @@ func (obs Observable[T]) BlockingSubscribe(parent Context, sink Observer[T]) err
res = n
sink(n)
switch n.Kind {
case KindError, KindComplete:
case KindError, KindSuccess:
cancel()
}
})
Expand All @@ -203,7 +203,7 @@ func (obs Observable[T]) BlockingSubscribe(parent Context, sink Observer[T]) err
switch res.Kind {
case KindError:
return res.Error
case KindComplete:
case KindSuccess:
return nil
default:
panic("unreachable")
Expand Down
4 changes: 2 additions & 2 deletions buffercount.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (obs bufferCountObservable[T]) Subscribe(c Context, sink Observer[[]T]) {
case KindError:
sink.Error(n.Error)

case KindComplete:
case KindSuccess:
if len(s) != 0 {
for {
Try1(sink, Next(s), func() { sink.Error(ErrOops) })
Expand All @@ -93,7 +93,7 @@ func (obs bufferCountObservable[T]) Subscribe(c Context, sink Observer[[]T]) {
}
}

sink.Complete()
sink.Success()
}
})
}
10 changes: 5 additions & 5 deletions buffercount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,35 @@ func TestBufferCount(t *testing.T) {
rx.BufferCount[string](2),
ToString[[]string](),
),
"[A B]", "[C D]", "[E]", ErrComplete,
"[A B]", "[C D]", "[E]", ErrSuccess,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
rx.BufferCount[string](3),
ToString[[]string](),
),
"[A B C]", "[D E]", ErrComplete,
"[A B C]", "[D E]", ErrSuccess,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
rx.BufferCount[string](3).WithStartBufferEvery(1),
ToString[[]string](),
),
"[A B C]", "[B C D]", "[C D E]", "[D E]", "[E]", ErrComplete,
"[A B C]", "[B C D]", "[C D E]", "[D E]", "[E]", ErrSuccess,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
rx.BufferCount[string](3).WithStartBufferEvery(2),
ToString[[]string](),
),
"[A B C]", "[C D E]", "[E]", ErrComplete,
"[A B C]", "[C D E]", "[E]", ErrSuccess,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
rx.BufferCount[string](3).WithStartBufferEvery(4),
ToString[[]string](),
),
"[A B C]", "[E]", ErrComplete,
"[A B C]", "[E]", ErrSuccess,
).Case(
rx.Pipe2(
rx.Throw[string](ErrTest),
Expand Down
20 changes: 10 additions & 10 deletions catch.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package rx

// Catch handles errors on the source Observable by mirroring a new Observable
// returned by selector.
// Catch mirrors the source or switches to another Observable, returned from
// a call to selector, instead if the source emits an error notification.
//
// Catch does not catch context cancellations.
func Catch[T any](selector func(err error) Observable[T]) Operator[T, T] {
Expand All @@ -24,7 +24,7 @@ func Catch[T any](selector func(err error) Observable[T]) Operator[T, T] {
obs := Try11(selector, n.Error, func() { sink.Error(ErrOops) })
obs.Subscribe(c, sink)

case KindComplete:
case KindSuccess:
sink(n)
}
})
Expand Down Expand Up @@ -56,7 +56,7 @@ func OnErrorResumeWith[T any](obs Observable[T]) Operator[T, T] {

obs.Subscribe(c, sink)

case KindComplete:
case KindSuccess:
sink(n)
}
})
Expand All @@ -65,11 +65,11 @@ func OnErrorResumeWith[T any](obs Observable[T]) Operator[T, T] {
)
}

// OnErrorComplete mirrors the source Observable, or completes if the source
// emits an error notification.
// OnErrorSucceed mirrors the source Observable, or succeeds instead
// if the source emits an error notification.
//
// OnErrorComplete does not complete after context cancellation.
func OnErrorComplete[T any]() Operator[T, T] {
// OnErrorSucceed does not succeed after context cancellation.
func OnErrorSucceed[T any]() Operator[T, T] {
return NewOperator(
func(source Observable[T]) Observable[T] {
return func(c Context, sink Observer[T]) {
Expand All @@ -86,9 +86,9 @@ func OnErrorComplete[T any]() Operator[T, T] {
return
}

sink.Complete()
sink.Success()

case KindComplete:
case KindSuccess:
sink(n)
}
})
Expand Down
20 changes: 10 additions & 10 deletions catch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestCatch(t *testing.T) {
rx.Just("A", "B", "C"),
rx.Catch(f),
),
"A", "B", "C", ErrComplete,
"A", "B", "C", ErrSuccess,
).Case(
rx.Pipe1(
rx.Concat(
Expand All @@ -29,7 +29,7 @@ func TestCatch(t *testing.T) {
),
rx.Catch(f),
),
"A", "B", "C", "D", "E", ErrComplete,
"A", "B", "C", "D", "E", ErrSuccess,
).Case(
rx.Pipe1(
rx.Throw[string](ErrTest),
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestOnErrorResumeWith(t *testing.T) {
rx.Just("A", "B", "C"),
rx.OnErrorResumeWith(rx.Just("D", "E")),
),
"A", "B", "C", ErrComplete,
"A", "B", "C", ErrSuccess,
).Case(
rx.Pipe1(
rx.Concat(
Expand All @@ -67,7 +67,7 @@ func TestOnErrorResumeWith(t *testing.T) {
),
rx.OnErrorResumeWith(rx.Just("D", "E")),
),
"A", "B", "C", "D", "E", ErrComplete,
"A", "B", "C", "D", "E", ErrSuccess,
)

ctx, cancel := rx.NewBackgroundContext().WithTimeout(Step(1))
Expand All @@ -82,24 +82,24 @@ func TestOnErrorResumeWith(t *testing.T) {
)
}

func TestOnErrorComplete(t *testing.T) {
func TestOnErrorSucceed(t *testing.T) {
t.Parallel()

NewTestSuite[string](t).Case(
rx.Pipe1(
rx.Just("A", "B", "C"),
rx.OnErrorComplete[string](),
rx.OnErrorSucceed[string](),
),
"A", "B", "C", ErrComplete,
"A", "B", "C", ErrSuccess,
).Case(
rx.Pipe1(
rx.Concat(
rx.Just("A", "B", "C"),
rx.Throw[string](ErrTest),
),
rx.OnErrorComplete[string](),
rx.OnErrorSucceed[string](),
),
"A", "B", "C", ErrComplete,
"A", "B", "C", ErrSuccess,
)

ctx, cancel := rx.NewBackgroundContext().WithTimeout(Step(1))
Expand All @@ -108,7 +108,7 @@ func TestOnErrorComplete(t *testing.T) {
NewTestSuite[string](t).WithContext(ctx).Case(
rx.Pipe1(
rx.Never[string](),
rx.OnErrorComplete[string](),
rx.OnErrorSucceed[string](),
),
context.DeadlineExceeded,
)
Expand Down
2 changes: 1 addition & 1 deletion channelize.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func Channelize[T any](join func(upstream <-chan Notification[T], downstream cha
c.Go(func() { drain(downstream) })
sink.Error(ErrOops)
})
case KindError, KindComplete:
case KindError, KindSuccess:
defer drain(downstream)
sink(n)
return
Expand Down
4 changes: 2 additions & 2 deletions channelize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestChannelize(t *testing.T) {
switch n.Kind {
case rx.KindNext:
downstream <- n
case rx.KindError, rx.KindComplete:
case rx.KindError, rx.KindSuccess:
downstream <- n
return
}
Expand All @@ -27,7 +27,7 @@ func TestChannelize(t *testing.T) {
rx.Just("A", "B", "C"),
rx.Channelize(join),
),
"A", "B", "C", ErrComplete,
"A", "B", "C", ErrSuccess,
).Case(
rx.Pipe1(
rx.Just("A", "B", "C"),
Expand Down
4 changes: 2 additions & 2 deletions combinelatest2.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func combineLatestTry2[T1, T2, R, X any](
sink.Error(n.Error)
return false

case KindComplete:
case KindSuccess:
if s.CBits |= bit; s.CBits == FullBits {
sink.Complete()
sink.Success()
return false
}
}
Expand Down
2 changes: 1 addition & 1 deletion combinelatest2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestCombineLatest2(t *testing.T) {
"[A1 B1]",
"[A2 B1]",
"[A2 B2]",
ErrComplete,
ErrSuccess,
).Case(
rx.CombineLatest2(
rx.Throw[string](ErrTest),
Expand Down
4 changes: 2 additions & 2 deletions combinelatest3.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func combineLatestTry3[T1, T2, T3, R, X any](
sink.Error(n.Error)
return false

case KindComplete:
case KindSuccess:
if s.CBits |= bit; s.CBits == FullBits {
sink.Complete()
sink.Success()
return false
}
}
Expand Down
2 changes: 1 addition & 1 deletion combinelatest3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestCombineLatest3(t *testing.T) {
"[A2 B1 C1]",
"[A2 B2 C1]",
"[A2 B2 C2]",
ErrComplete,
ErrSuccess,
).Case(
rx.CombineLatest3(
rx.Throw[string](ErrTest),
Expand Down
Loading

0 comments on commit dcb6267

Please sign in to comment.