Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add error param to operation queue, 'nack' function #701

Merged
merged 1 commit into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/batch/cutter/cutter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type OperationQueue interface {
// - The operations that are to be removed.
// - The 'Ack' function that must be called to commit the remove.
// - The 'Nack' function that must be called to roll back the remove.
Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(), err error)
Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(error), err error)
// Peek returns (up to) the given number of operations from the head of the queue but does not remove them.
Peek(num uint) (operation.QueuedOperationsAtTime, error)
// Len returns the number of operation in the queue.
Expand All @@ -48,7 +48,7 @@ type Result struct {
// Ack commits the remove from the queue and returns the number of pending operations.
Ack func() uint
// Nack rolls back the remove so that a retry may occur.
Nack func()
Nack func(error)
}

// BatchCutter implements batch cutting.
Expand Down
3 changes: 2 additions & 1 deletion pkg/batch/cutter/cutter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package cutter

import (
"errors"
"fmt"
"testing"

Expand Down Expand Up @@ -68,7 +69,7 @@ func TestBatchCutter(t *testing.T) {
require.Zero(t, result.Pending)
require.Equal(t, uint64(10), result.ProtocolVersion)

result.Nack()
result.Nack(errors.New("injected error"))

// After a rollback, the operations should still be in the queue
result, err = r.Cut(true)
Expand Down
4 changes: 2 additions & 2 deletions pkg/batch/opqueue/memqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (q *MemQueue) Peek(num uint) (operation.QueuedOperationsAtTime, error) {
}

// Remove removes (up to) the given number of items from the head of the queue.
func (q *MemQueue) Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(), err error) {
func (q *MemQueue) Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(error), err error) {
q.mutex.Lock()
defer q.mutex.Unlock()

Expand All @@ -64,7 +64,7 @@ func (q *MemQueue) Remove(num uint) (ops operation.QueuedOperationsAtTime, ack f

return uint(len(q.items))
},
func() {
func(error) {
q.mutex.Lock()
defer q.mutex.Unlock()

Expand Down
3 changes: 2 additions & 1 deletion pkg/batch/opqueue/memqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package opqueue

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -76,7 +77,7 @@ func TestMemQueue(t *testing.T) {
require.Equal(t, *op2, ops[0].QueuedOperation)
require.Equal(t, *op3, ops[1].QueuedOperation)

nack()
nack(errors.New("injected error"))

ops, ack, _, err = q.Remove(5)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/batch/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (r *Writer) cutAndProcess(forceCut bool) (numProcessed int, pending uint, e
if err != nil {
r.logger.Error("Error processing batch operations", logfields.WithTotal(len(result.Operations)), log.WithError(err))

result.Nack()
result.Nack(err)

return 0, result.Pending + uint(len(result.Operations)), err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/batch/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func TestProcessError(t *testing.T) {

q.LenReturns(1)
q.PeekReturns(invalidQueue, nil)
q.RemoveReturns(nil, func() uint { return 0 }, func() {}, nil)
q.RemoveReturns(nil, func() uint { return 0 }, func(error) {}, nil)

ctx := newMockContext()
ctx.ProtocolClient.Protocol.MaxOperationCount = 1
Expand Down
Loading