Skip to content

Commit

Permalink
fix: close subscription resource release
Browse files Browse the repository at this point in the history
Signed-off-by: xdlbdy <xdlbdy@gmail.com>
  • Loading branch information
xdlbdy committed Apr 6, 2023
1 parent f90ea26 commit 0f9c99d
Showing 1 changed file with 17 additions and 3 deletions.
20 changes: 17 additions & 3 deletions internal/trigger/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,11 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i
ec.Extensions[primitive.XVanusEventbus] = t.subscription.RetryEventbusID.Key()
var writeAttempt int
for {
select {
case <-ctx.Done():
return
default:
}
writeAttempt++
startTime := time.Now()
_, err := api.AppendOne(ctx, t.timerEventWriter, e)
Expand All @@ -471,7 +476,9 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i
if writeAttempt >= t.config.MaxWriteAttempt {
return
}
time.Sleep(time.Second)
if !util.SleepWithContext(ctx, time.Second) {
return
}
} else {
break
}
Expand All @@ -491,6 +498,11 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso
ec.Extensions[primitive.DeadLetterReason] = reason
var writeAttempt int
for {
select {
case <-ctx.Done():
return
default:
}
writeAttempt++
startTime := time.Now()
_, err := api.AppendOne(ctx, t.dlEventWriter, e)
Expand All @@ -505,7 +517,9 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso
if writeAttempt >= t.config.MaxWriteAttempt {
return
}
time.Sleep(time.Second)
if !util.SleepWithContext(ctx, time.Second) {
return
}
} else {
break
}
Expand Down Expand Up @@ -602,12 +616,12 @@ func (t *trigger) Stop(ctx context.Context) error {
t.reader.Close()
t.retryEventReader.Close()
t.stop()
t.pool.Release()
close(t.eventCh)
close(t.retryEventCh)
close(t.sendCh)
close(t.batchSendCh)
t.wg.Wait()
t.pool.Release()
t.offsetManager.Close()
t.state = TriggerStopped
log.Info(ctx).
Expand Down

0 comments on commit 0f9c99d

Please sign in to comment.