Skip to content

Commit

Permalink
feat: add trigger log
Browse files Browse the repository at this point in the history
  • Loading branch information
xdlbdy committed Oct 19, 2023
1 parent efef6d3 commit 9303a7d
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions server/trigger/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,10 @@ func (t *trigger) runRetryEventFilterTransform(ctx context.Context) {
if err != nil {
log.Info(ctx).Err(err).
Str("event_id", event.record.Event.ID()).
Interface("event_offset", event.record.OffsetInfo).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Str(log.KeyEventbusID, t.eventbusIDStr).
Stringer(log.KeyEventlogID, event.record.EventlogID).
Uint64("event_offset", event.record.OffsetInfo.Offset).
Msg("event transform error")
t.writeFailEvent(ctx, record.Event, ErrTransformCode, err)
t.offsetManager.EventCommit(record.OffsetInfo)
Expand Down Expand Up @@ -321,7 +324,10 @@ func (t *trigger) runEventFilterTransform(ctx context.Context) {
if err != nil {
log.Info(ctx).Err(err).
Str("event_id", event.record.Event.ID()).
Interface("event_offset", event.record.OffsetInfo).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Str(log.KeyEventbusID, t.eventbusIDStr).
Stringer(log.KeyEventlogID, event.record.EventlogID).
Uint64("event_offset", event.record.OffsetInfo.Offset).
Msg("event transform error")
t.writeFailEvent(ctx, record.Event, ErrTransformCode, err)
t.offsetManager.EventCommit(record.OffsetInfo)
Expand Down Expand Up @@ -412,7 +418,11 @@ func (t *trigger) processEvent(ctx context.Context, events ...*toSendEvent) {
Int("code", r.StatusCode).
Int("count", l).
Str("event_id", events[0].record.Event.ID()).
Interface("event_offset", events[0].record.OffsetInfo).
Interface("target", t.subscription.Sink).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Str(log.KeyEventbusID, t.eventbusIDStr).
Stringer(log.KeyEventlogID, events[0].record.EventlogID).
Uint64("event_offset", events[0].record.OffsetInfo.Offset).
Msg("send event fail")
code := r.StatusCode
if t.config.Ordered {
Expand Down Expand Up @@ -494,7 +504,7 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i
Observe(time.Since(startTime).Seconds())
if err != nil {
log.Info(ctx).Err(err).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Int("attempt", writeAttempt).
Interface("event", e).
Msg("write retry event error")
Expand All @@ -508,7 +518,7 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i
}
}
log.Debug(ctx).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Interface("event", e).
Msg("write retry event success")
}
Expand All @@ -529,7 +539,7 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso
Observe(time.Since(startTime).Seconds())
if err != nil {
log.Info(ctx).Err(err).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Int("attempt", writeAttempt).
Interface("event", e).
Msg("write dl event error")
Expand All @@ -542,7 +552,7 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso
}
}
log.Debug(ctx).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Interface("event", e).
Msg("write dl event success")
}
Expand Down Expand Up @@ -602,7 +612,7 @@ func (t *trigger) Init(ctx context.Context) error {

func (t *trigger) Start(ctx context.Context) error {
log.Info(ctx).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Msg("trigger start...")
ctx, cancel := context.WithCancel(context.Background())
t.stop = cancel
Expand All @@ -623,14 +633,14 @@ func (t *trigger) Start(ctx context.Context) error {
t.wg.StartWithContext(ctx, t.runRetryEventFilterTransform)
t.state = TriggerRunning
log.Info(ctx).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Msg("trigger started")
return nil
}

func (t *trigger) Stop(ctx context.Context) error {
log.Info(ctx).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Msg("trigger stop...")

if t.state == TriggerStopped {
Expand All @@ -648,7 +658,7 @@ func (t *trigger) Stop(ctx context.Context) error {
t.offsetManager.Close()
t.state = TriggerStopped
log.Info(ctx).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Msg("trigger stopped")
return nil
}
Expand Down

0 comments on commit 9303a7d

Please sign in to comment.