diff --git a/internal/topo/node/window_op.go b/internal/topo/node/window_op.go index 9381c22d0..88d426c43 100644 --- a/internal/topo/node/window_op.go +++ b/internal/topo/node/window_op.go @@ -358,7 +358,8 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x } } else { // clear inputs if condition not matched - inputs = o.gcInputs(inputs, d.Timestamp.Add(o.window.Delay), ctx) + // TS add 1 to prevent remove current input + inputs = o.gcInputs(inputs, d.Timestamp.Add(1), ctx) } case ast.SESSION_WINDOW: if timeoutTicker != nil { @@ -568,7 +569,7 @@ func (o *WindowOperator) handleInputs(ctx api.StreamContext, inputs []*xsql.Tupl content := make([]xsql.Row, 0, len(inputs)) // Sync table left := right.Add(-length).Add(-delta) - log.Debugf("triggerTime: %d, length: %d, delta: %d, leftmost: %d", left.UnixMilli(), length, delta, left.UnixMilli()) + log.Debugf("triggerTime: %d, length: %d, delta: %d, leftmost: %d", right.UnixMilli(), length, delta, left.UnixMilli()) nextleft := -1 // Assume the inputs are sorted by timestamp for i, tuple := range inputs {