Skip to content

Commit

Permalink
Implement OverrideLeader & LeaderOverriden method for conductor
Browse files Browse the repository at this point in the history
  • Loading branch information
0x00101010 committed Oct 14, 2024
1 parent e7085e5 commit 2c048d1
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 31 deletions.
40 changes: 29 additions & 11 deletions op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ func NewOpConductor(
oc.loopActionFn = oc.loopAction

// explicitly set all atomic.Bool values
oc.leader.Store(false) // upon start, it should not be the leader unless specified otherwise by raft bootstrap, in that case, it'll receive a leadership update from consensus.
oc.healthy.Store(true) // default to healthy unless reported otherwise by health monitor.
oc.seqActive.Store(false) // explicitly set to false by default, the real value will be reported after sequencer control initialization.
oc.leader.Store(false) // upon start, it should not be the leader unless specified otherwise by raft bootstrap, in that case, it'll receive a leadership update from consensus.
oc.leaderOverride.Store(false) // default to no override.
oc.healthy.Store(true) // default to healthy unless reported otherwise by health monitor.
oc.seqActive.Store(false) // explicitly set to false by default, the real value will be reported after sequencer control initialization.
oc.paused.Store(cfg.Paused)
oc.stopped.Store(false)

Expand Down Expand Up @@ -287,11 +288,12 @@ type OpConductor struct {
cons consensus.Consensus
hmon health.HealthMonitor

leader atomic.Bool
seqActive atomic.Bool
healthy atomic.Bool
hcerr error // error from health check
prevState *state
leader atomic.Bool
leaderOverride atomic.Bool
seqActive atomic.Bool
healthy atomic.Bool
hcerr error // error from health check
prevState *state

healthUpdateCh <-chan error
leaderUpdateCh <-chan bool
Expand Down Expand Up @@ -472,13 +474,29 @@ func (oc *OpConductor) HTTPEndpoint() string {
return fmt.Sprintf("http://%s", oc.rpcServer.Endpoint())
}

func (oc *OpConductor) OverrideLeader(_ context.Context, override bool) {
oc.leaderOverride.Store(override)
}

func (oc *OpConductor) LeaderOverridden(_ context.Context) bool {
return oc.leaderOverride.Load()
}

// Leader returns true if OpConductor is the leader.
func (oc *OpConductor) Leader(_ context.Context) bool {
return oc.cons.Leader()
func (oc *OpConductor) Leader(ctx context.Context) bool {
return oc.LeaderOverridden(ctx) || oc.cons.Leader()
}

// LeaderWithID returns the current leader's server ID and address.
func (oc *OpConductor) LeaderWithID(_ context.Context) *consensus.ServerInfo {
func (oc *OpConductor) LeaderWithID(ctx context.Context) *consensus.ServerInfo {
if oc.LeaderOverridden(ctx) {
return &consensus.ServerInfo{
ID: "N/A (Leader overridden)",
Addr: "N/A",
Suffrage: 0,
}
}

return oc.cons.LeaderWithID()
}

Expand Down
8 changes: 6 additions & 2 deletions op-conductor/rpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ var ErrNotLeader = errors.New("refusing to proxy request to non-leader sequencer

// API defines the interface for the op-conductor API.
type API interface {
// OverrideLeader is used to override the leader status, this is only used to return true for Leader() & LeaderWithID() calls.
// OverrideLeader is used to override or clear override for the leader status.
// It does not impact the actual raft consensus leadership status. It is supposed to be used when the cluster is unhealthy
// and the node is the only one up, to allow batcher to be able to connect to the node, so that it could download blocks from the manually started sequencer.
OverrideLeader(ctx context.Context) error
// override: true => force current conductor to be treated as leader regardless of the actual leadership status in raft.
// override: false => clear the override, return the actual leadership status in raft.
OverrideLeader(ctx context.Context, override bool) error
// LeaderOverridden returns true if the leader status is overridden.
LeaderOverridden(ctx context.Context) (bool, error)
// Pause pauses op-conductor.
Pause(ctx context.Context) error
// Resume resumes op-conductor.
Expand Down
27 changes: 12 additions & 15 deletions op-conductor/rpc/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rpc

import (
"context"
"sync/atomic"

"github.com/ethereum/go-ethereum/log"

Expand All @@ -11,6 +10,8 @@ import (
)

type conductor interface {
OverrideLeader(ctx context.Context, override bool)
LeaderOverridden(ctx context.Context) bool
Pause(ctx context.Context) error
Resume(ctx context.Context) error
Stop(ctx context.Context) error
Expand All @@ -32,9 +33,8 @@ type conductor interface {
// APIBackend is the backend implementation of the API.
// TODO: (https://github.com/ethereum-optimism/protocol-quest/issues/45) Add metrics tracer here.
type APIBackend struct {
log log.Logger
con conductor
leaderOverride atomic.Bool
log log.Logger
con conductor
}

// NewAPIBackend creates a new APIBackend instance.
Expand All @@ -48,11 +48,16 @@ func NewAPIBackend(log log.Logger, con conductor) *APIBackend {
var _ API = (*APIBackend)(nil)

// OverrideLeader implements API.
func (api *APIBackend) OverrideLeader(ctx context.Context) error {
api.leaderOverride.Store(true)
func (api *APIBackend) OverrideLeader(ctx context.Context, override bool) error {
api.con.OverrideLeader(ctx, override)
return nil
}

// LeaderOverridden implements API.
func (api *APIBackend) LeaderOverridden(ctx context.Context) (bool, error) {
return api.con.LeaderOverridden(ctx), nil
}

// Paused implements API.
func (api *APIBackend) Paused(ctx context.Context) (bool, error) {
return api.con.Paused(), nil
Expand Down Expand Up @@ -90,19 +95,11 @@ func (api *APIBackend) CommitUnsafePayload(ctx context.Context, payload *eth.Exe

// Leader implements API, returns true if current conductor is leader of the cluster.
func (api *APIBackend) Leader(ctx context.Context) (bool, error) {
return api.leaderOverride.Load() || api.con.Leader(ctx), nil
return api.con.Leader(ctx), nil
}

// LeaderWithID implements API, returns the leader's server ID and address (not necessarily the current conductor).
func (api *APIBackend) LeaderWithID(ctx context.Context) (*consensus.ServerInfo, error) {
if api.leaderOverride.Load() {
return &consensus.ServerInfo{
ID: "N/A (Leader overridden)",
Addr: "N/A",
Suffrage: 0,
}, nil
}

return api.con.LeaderWithID(ctx), nil
}

Expand Down
11 changes: 9 additions & 2 deletions op-conductor/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,15 @@ func prefixRPC(method string) string {
}

// OverrideLeader implements API.
func (c *APIClient) OverrideLeader(ctx context.Context) error {
return c.c.CallContext(ctx, nil, prefixRPC("overrideLeader"))
func (c *APIClient) OverrideLeader(ctx context.Context, override bool) error {
return c.c.CallContext(ctx, nil, prefixRPC("overrideLeader"), override)
}

// LeaderOverridden implements API.
func (c *APIClient) LeaderOverridden(ctx context.Context) (bool, error) {
var overridden bool
err := c.c.CallContext(ctx, &overridden, prefixRPC("leaderOverridden"))
return overridden, err
}

// Paused implements API.
Expand Down
39 changes: 38 additions & 1 deletion op-e2e/system/conductor/sequencer_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"

"github.com/ethereum-optimism/optimism/op-conductor/consensus"
Expand Down Expand Up @@ -232,9 +233,45 @@ func TestSequencerFailover_DisasterRecovery_OverrideLeader(t *testing.T) {
require.NoError(t, err)
require.True(t, active, "Expected sequencer to be active")

err = conductors[Sequencer3Name].client.OverrideLeader(ctx)
err = conductors[Sequencer3Name].client.OverrideLeader(ctx, true)
require.NoError(t, err)
leader, err := conductors[Sequencer3Name].client.Leader(ctx)
require.NoError(t, err)
require.True(t, leader, "Expected conductor to return leader true after override")
overridden, err := conductors[Sequencer3Name].client.LeaderOverridden(ctx)
require.NoError(t, err)
require.True(t, overridden, "Expected conductor to return leader overridden true after override")

// make sure all proxied method are working correctly.
proxy, err := rpc.DialContext(ctx, conductors[Sequencer3Name].RPCEndpoint())
require.NoError(t, err)
err = proxy.CallContext(ctx, &active, "admin_sequencerActive")
require.NoError(t, err)
require.True(t, active, "Expected sequencer to be active")
err = proxy.CallContext(ctx, nil, "optimism_syncStatus")
require.NoError(t, err)
var block map[string]any
err = proxy.CallContext(ctx, &block, "eth_getBlockByNumber", "latest", false)
require.NoError(t, err)
err = proxy.CallContext(ctx, nil, "optimism_outputAtBlock", block["number"])
require.NoError(t, err)
err = proxy.CallContext(ctx, nil, "optimism_rollupConfig")
require.NoError(t, err)

err = conductors[Sequencer3Name].client.OverrideLeader(ctx, false)
require.NoError(t, err)
overridden, err = conductors[Sequencer3Name].client.LeaderOverridden(ctx)
require.NoError(t, err)
require.False(t, overridden, "Expected conductor to return leader overridden false after override")

err = proxy.CallContext(ctx, &active, "admin_sequencerActive")
require.ErrorContains(t, err, "refusing to proxy request to non-leader sequencer", "Expected sequencer to fail to get active status")
err = proxy.CallContext(ctx, nil, "optimism_syncStatus")
require.ErrorContains(t, err, "refusing to proxy request to non-leader sequencer", "Expected sequencer to fail to get sync status")
err = proxy.CallContext(ctx, nil, "eth_getBlockByNumber", "latest", false)
require.ErrorContains(t, err, "refusing to proxy request to non-leader sequencer", "Expected sequencer to fail to get block by number")
err = proxy.CallContext(ctx, nil, "optimism_outputAtBlock", block["number"])
require.ErrorContains(t, err, "refusing to proxy request to non-leader sequencer", "Expected sequencer to fail to get output at block")
err = proxy.CallContext(ctx, nil, "optimism_rollupConfig")
require.ErrorContains(t, err, "refusing to proxy request to non-leader sequencer", "Expected sequencer to fail to get rollup config")
}

0 comments on commit 2c048d1

Please sign in to comment.