Skip to content

Commit

Permalink
fix(win): sliding delay lost previous input (#3471)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying authored Dec 25, 2024
1 parent c64ea40 commit a838e88
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions internal/topo/node/window_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
}
} else {
// clear inputs if condition not matched
inputs = o.gcInputs(inputs, d.Timestamp.Add(o.window.Delay), ctx)
// TS add 1 to prevent remove current input
inputs = o.gcInputs(inputs, d.Timestamp.Add(1), ctx)
}
case ast.SESSION_WINDOW:
if timeoutTicker != nil {
Expand Down Expand Up @@ -568,7 +569,7 @@ func (o *WindowOperator) handleInputs(ctx api.StreamContext, inputs []*xsql.Tupl
content := make([]xsql.Row, 0, len(inputs))
// Sync table
left := right.Add(-length).Add(-delta)
log.Debugf("triggerTime: %d, length: %d, delta: %d, leftmost: %d", left.UnixMilli(), length, delta, left.UnixMilli())
log.Debugf("triggerTime: %d, length: %d, delta: %d, leftmost: %d", right.UnixMilli(), length, delta, left.UnixMilli())
nextleft := -1
// Assume the inputs are sorted by timestamp
for i, tuple := range inputs {
Expand Down

0 comments on commit a838e88

Please sign in to comment.