diff --git a/server/trigger/config.go b/server/trigger/config.go index fce8e3643..3a336864d 100644 --- a/server/trigger/config.go +++ b/server/trigger/config.go @@ -38,5 +38,7 @@ type Config struct { // var client read event from segment batch size. PullEventBatchSize int `yaml:"pull_event_batch_size"` // max uack event number - MaxUACKEventNumber int `yaml:"max_uack_event_number"` + MaxUACKEventNumber int `yaml:"max_uack_event_number"` + DisableDeadLetter *bool `yaml:"disable_dead_letter"` + OrderEvent *bool `yaml:"order_event"` } diff --git a/server/trigger/trigger/trigger.go b/server/trigger/trigger/trigger.go index 42a854350..42262fd30 100644 --- a/server/trigger/trigger/trigger.go +++ b/server/trigger/trigger/trigger.go @@ -499,7 +499,9 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i for { writeAttempt++ startTime := time.Now() - _, err := api.AppendOne(ctx, t.timerEventWriter, e) + timeoutCtx, cancel := context.WithTimeout(ctx, t.getConfig().DeliveryTimeout) + _, err := api.AppendOne(timeoutCtx, t.timerEventWriter, e) + cancel() metrics.TriggerRetryEventAppendSecond.WithLabelValues(t.subscriptionIDStr). Observe(time.Since(startTime).Seconds()) if err != nil { @@ -534,7 +536,9 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso for { writeAttempt++ startTime := time.Now() - _, err := api.AppendOne(ctx, t.dlEventWriter, e) + timeoutCtx, cancel := context.WithTimeout(ctx, t.getConfig().DeliveryTimeout) + _, err := api.AppendOne(timeoutCtx, t.dlEventWriter, e) + cancel() metrics.TriggerDeadLetterEventAppendSecond.WithLabelValues(t.subscriptionIDStr). Observe(time.Since(startTime).Seconds()) if err != nil { diff --git a/server/trigger/worker.go b/server/trigger/worker.go index 866c4c795..22c6e4e38 100644 --- a/server/trigger/worker.go +++ b/server/trigger/worker.go @@ -253,11 +253,20 @@ func (w *worker) getAllSubscriptionInfo(ctx context.Context) []*metapb.Subscript func (w *worker) getTriggerOptions(subscription *primitive.Subscription) []trigger.Option { opts := []trigger.Option{trigger.WithControllers(w.config.ControllerAddr)} config := subscription.Config + // todo use subscription config replace global config + disableDeadLetter := config.DisableDeadLetter + if w.config.DisableDeadLetter != nil { + disableDeadLetter = *w.config.DisableDeadLetter + } + orderEvent := config.OrderedEvent + if w.config.OrderEvent != nil { + orderEvent = *w.config.OrderEvent + } opts = append(opts, trigger.WithRateLimit(config.RateLimit), trigger.WithDeliveryTimeout(config.DeliveryTimeout), trigger.WithMaxRetryAttempts(config.GetMaxRetryAttempts()), - trigger.WithDisableDeadLetter(config.DisableDeadLetter), - trigger.WithOrdered(config.OrderedEvent), + trigger.WithDisableDeadLetter(disableDeadLetter), + trigger.WithOrdered(orderEvent), trigger.WithGoroutineSize(w.config.SendEventGoroutineSize), trigger.WithSendBatchSize(w.config.SendEventBatchSize), trigger.WithPullBatchSize(w.config.PullEventBatchSize),