Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/consensus: fixed instance_io race #3416

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 12 additions & 27 deletions core/consensus/utils/instance_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package utils

import (
"sync/atomic"
"time"

"google.golang.org/protobuf/proto"
Expand All @@ -17,23 +18,20 @@ const (
// NewInstanceIO returns a new instanceIO.
func NewInstanceIO[T any]() *InstanceIO[T] {
return &InstanceIO[T]{
Participated: make(chan struct{}),
Proposed: make(chan struct{}),
Running: make(chan struct{}),
RecvBuffer: make(chan T, RecvBufferSize),
HashCh: make(chan [32]byte, 1),
ValueCh: make(chan proto.Message, 1),
ErrCh: make(chan error, 1),
DecidedAtCh: make(chan time.Time, 1),
RecvBuffer: make(chan T, RecvBufferSize),
HashCh: make(chan [32]byte, 1),
ValueCh: make(chan proto.Message, 1),
ErrCh: make(chan error, 1),
DecidedAtCh: make(chan time.Time, 1),
}
}

// InstanceIO defines the async input and output channels of a
// single consensus instance in the Component.
type InstanceIO[T any] struct {
Participated chan struct{} // Closed when Participate was called for this instance.
Proposed chan struct{} // Closed when Propose was called for this instance.
Running chan struct{} // Closed when runInstance was already called.
Participated int32 // Closed when Participate was called for this instance.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change those comments, there are no channels involved in this anymore.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not an atomic.Bool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, changed as suggested.

Proposed int32 // Closed when Propose was called for this instance.
Running int32 // Closed when runInstance was already called.
RecvBuffer chan T // Outer receive buffers.
HashCh chan [32]byte // Async input hash channel.
ValueCh chan proto.Message // Async input value channel.
Expand All @@ -44,11 +42,8 @@ type InstanceIO[T any] struct {
// MarkParticipated marks the instance as participated.
// It returns an error if the instance was already marked as participated.
func (io *InstanceIO[T]) MarkParticipated() error {
select {
case <-io.Participated:
if !atomic.CompareAndSwapInt32(&io.Participated, 0, 1) {
return errors.New("already participated")
default:
close(io.Participated)
}

return nil
Expand All @@ -57,11 +52,8 @@ func (io *InstanceIO[T]) MarkParticipated() error {
// MarkProposed marks the instance as proposed.
// It returns an error if the instance was already marked as proposed.
func (io *InstanceIO[T]) MarkProposed() error {
select {
case <-io.Proposed:
if !atomic.CompareAndSwapInt32(&io.Proposed, 0, 1) {
return errors.New("already proposed")
default:
close(io.Proposed)
}

return nil
Expand All @@ -70,12 +62,5 @@ func (io *InstanceIO[T]) MarkProposed() error {
// MaybeStart returns true if the instance wasn't running and has been started by this call,
// otherwise it returns false if the instance was started in the past and is either running now or has completed.
func (io *InstanceIO[T]) MaybeStart() bool {
select {
case <-io.Running:
return false
default:
close(io.Running)
}

return true
return atomic.CompareAndSwapInt32(&io.Running, 0, 1)
}
6 changes: 5 additions & 1 deletion core/consensus/utils/instance_io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ func TestMaybeStart(t *testing.T) {
ok := io.MaybeStart()
require.True(t, ok)

// Second call fails.
// Subsequent calls fail.
ok = io.MaybeStart()
require.False(t, ok)
ok = io.MaybeStart()
require.False(t, ok)
ok = io.MaybeStart()
require.False(t, ok)
}
4 changes: 2 additions & 2 deletions testutil/compose/static/vouch/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM wealdtech/ethdo:1.36.1 as ethdo
FROM wealdtech/ethdo:1.35.2 as ethdo

FROM attestant/vouch:1.9.2
FROM attestant/vouch:1.9.0

COPY --from=ethdo /app/ethdo /app/ethdo

Expand Down
Loading