From a838e88e0e7c7d0b834b78cd11129b75e527ff66 Mon Sep 17 00:00:00 2001 From: ngjaying Date: Wed, 25 Dec 2024 18:39:03 +0800 Subject: [PATCH] fix(win): sliding delay lost previous input (#3471) Signed-off-by: Jiyong Huang --- internal/topo/node/window_op.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/topo/node/window_op.go b/internal/topo/node/window_op.go index 9381c22d00..88d426c43b 100644 --- a/internal/topo/node/window_op.go +++ b/internal/topo/node/window_op.go @@ -358,7 +358,8 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x } } else { // clear inputs if condition not matched - inputs = o.gcInputs(inputs, d.Timestamp.Add(o.window.Delay), ctx) + // TS add 1 to prevent remove current input + inputs = o.gcInputs(inputs, d.Timestamp.Add(1), ctx) } case ast.SESSION_WINDOW: if timeoutTicker != nil { @@ -568,7 +569,7 @@ func (o *WindowOperator) handleInputs(ctx api.StreamContext, inputs []*xsql.Tupl content := make([]xsql.Row, 0, len(inputs)) // Sync table left := right.Add(-length).Add(-delta) - log.Debugf("triggerTime: %d, length: %d, delta: %d, leftmost: %d", left.UnixMilli(), length, delta, left.UnixMilli()) + log.Debugf("triggerTime: %d, length: %d, delta: %d, leftmost: %d", right.UnixMilli(), length, delta, left.UnixMilli()) nextleft := -1 // Assume the inputs are sorted by timestamp for i, tuple := range inputs {