Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hashicorp Raft node failed to make requestVote RPC: invalid byte descriptor for decoding bytes, got 0x13 #599

Open
mattrltrent opened this issue Jun 12, 2024 · 0 comments

Comments

@mattrltrent
Copy link

mattrltrent commented Jun 12, 2024

Persistent bug seemingly with msgpack

Stackoverflow version of post (with bounty).


I'm working on a peer-to-peer network project using Raft for consensus and libp2p for peer-to-peer communication. I encountered an issue where the RequestVote RPC in Raft is failing with a Msgpack decode error. Here are the error logs:

2024-06-12T23:16:22.856Z [ERROR] raft: failed to make requestVote RPC: target="{Voter QmWaqhWAmGpg4GRTJTQYxrfHHTnz2h2C144VsiRpcxp67K 127.0.0.1:8080}" error="msgpack decode error [pos 1]: invalid byte descriptor for decoding bytes, got: 0x13" term=17
...

Here are some verbose logs (includes output from bits of code I deem not related to this issue, but I don't see a point in pruning their output):

kurlox-client-1     | 2024-06-12 23:29:11 | ***DEBUG*** | #1 ---------------------------------------------------- here
kurlox-client-1     | 2024-06-12 23:29:11 | ***DEBUG*** | Local ID: QmWaqhWAmGpg4GRTJTQYxrfHHTnz2h2C144VsiRpcxp67K
kurlox-client-1     | 2024-06-12 23:29:11 | ***DEBUG*** | RAFT ADDR: 127.0.0.1:9000
kurlox-client-1     | 2024-06-12 23:29:11 | ***DEBUG*** | #3 ---------------------------------------------------- here
kurlox-client-1     | 
kurlox-client-1     | 2024-06-12T23:29:11.648Z [INFO]  raft: initial configuration: index=0 servers=[]2024-06-12T23:29:11.649Z [INFO]  raft: entering follower state: follower="Node at 127.0.0.1:9000 [Follower]" leader-address= leader-id=
kurlox-client-1     | 2024-06-12 23:29:11 | ***DEBUG*** | #1 ---------------------------------------------------- here
kurlox-client-1     | 2024-06-12 23:29:11 | ***DEBUG*** | Local ID: QmYcBxutfuTTTWXJsLucyXFS4Pm4eBnTov5Ud1Bm993q6q
kurlox-client-1     | 2024-06-12 23:29:11 | ***DEBUG*** | RAFT ADDR: 127.0.0.1:9001
kurlox-client-1     | 2024-06-12 23:29:11 | ***DEBUG*** | #3 ---------------------------------------------------- here
kurlox-client-1     | 2024-06-12T23:29:11.649Z [INFO]  raft: initial configuration: index=0 servers=[]
kurlox-client-1     | 2024-06-12T23:29:11.649Z [INFO]  raft: entering follower state: follower="Node at 127.0.0.1:9001 [Follower]" leader-address= leader-id=
kurlox-client-1     | 2024-06-12T23:29:12.932Z [WARN]  raft: heartbeat timeout reached, starting election: last-leader-addr= last-leader-id=
kurlox-client-1     | 2024-06-12T23:29:12.932Z [INFO]  raft: entering candidate state: node="Node at 127.0.0.1:9000 [Candidate]" term=2
kurlox-client-1     | 2024-06-12T23:29:12.932Z [DEBUG] raft: voting for self: term=2 id=QmWaqhWAmGpg4GRTJTQYxrfHHTnz2h2C144VsiRpcxp67K
kurlox-client-1     | 2024-06-12T23:29:12.933Z [DEBUG] raft: asking for vote: term=2 from=QmYcBxutfuTTTWXJsLucyXFS4Pm4eBnTov5Ud1Bm993q6q address=127.0.0.1:8081
kurlox-client-1     | 2024-06-12T23:29:12.933Z [DEBUG] raft: calculated votes needed: needed=2 term=2
kurlox-client-1     | 2024-06-12T23:29:12.933Z [DEBUG] raft: vote granted: from=QmWaqhWAmGpg4GRTJTQYxrfHHTnz2h2C144VsiRpcxp67K term=2 tally=1
kurlox-client-1     | 2024-06-12T23:29:12.934Z [ERROR] raft: failed to make requestVote RPC: target="{Voter QmYcBxutfuTTTWXJsLucyXFS4Pm4eBnTov5Ud1Bm993q6q 127.0.0.1:8081}" error="msgpack decode error [pos 1]: invalid byte descriptor for decoding bytes, got: 0x13" term=2

The relevant parts of my code where I handle the FSM (Finite State Machine) and Msgpack encoding/decoding are as follows:

package p2p

import (
    "context"
    "crypto/ed25519"
    "fmt"
    "io"
    "net"
    "regexp"
    "sync"
    "time"

    "github.com/hashicorp/go-msgpack/codec"
    "github.com/hashicorp/raft"
    "github.com/libp2p/go-libp2p"
    dht "github.com/libp2p/go-libp2p-kad-dht"
    "github.com/libp2p/go-libp2p/core/crypto"
    "github.com/libp2p/go-libp2p/core/discovery"
    "github.com/libp2p/go-libp2p/core/network"
    "github.com/libp2p/go-libp2p/core/peer"
    "github.com/libp2p/go-libp2p/core/peerstore"
    mdns "github.com/libp2p/go-libp2p/p2p/discovery/mdns"
    routing "github.com/libp2p/go-libp2p/p2p/discovery/routing"
    tls "github.com/libp2p/go-libp2p/p2p/security/tls"
    "github.com/multiformats/go-multiaddr"
)

type FSM struct {
    mu    sync.Mutex
    state map[string]string
}

func NewFSM() *FSM {
    return &FSM{
        state: make(map[string]string),
    }
}

// Apply applies a Raft log entry to the FSM.
func (fsm *FSM) Apply(log *raft.Log) interface{} {
    fsm.mu.Lock()
    defer fsm.mu.Unlock()

    var handle codec.MsgpackHandle
    dec := codec.NewDecoderBytes(log.Data, &handle)
    var cmd map[string]string
    if err := dec.Decode(&cmd); err != nil {
        panic(fmt.Sprintf("failed to unmarshal command: %s", err))
    }

    for k, v := range cmd {
        fsm.state[k] = v
    }

    return nil
}

// Restore restores the FSM to a previous state based on a snapshot.
func (fsm *FSM) Restore(rc io.ReadCloser) error {
    fsm.mu.Lock()
    defer fsm.mu.Unlock()

    var handle codec.MsgpackHandle
    dec := codec.NewDecoder(rc, &handle)
    var state map[string]string
    if err := dec.Decode(&state); err != nil {
        return fmt.Errorf("failed to decode snapshot: %s", err)
    }

    fsm.state = state
    return nil
}

// Snapshot returns a snapshot of the FSM's state.
func (fsm *FSM) Snapshot() (raft.FSMSnapshot, error) {
    fsm.mu.Lock()
    defer fsm.mu.Unlock()

    // Create a copy of the state to include in the snapshot
    state := make(map[string]string)
    for k, v := range fsm.state {
        state[k] = v
    }
    return &FSMSnapshot{state: state}, nil
}

// FSMSnapshot represents a snapshot of the FSM's state.
type FSMSnapshot struct {
    state map[string]string
}

// Persist saves the FSM's state to the given sink.
func (s *FSMSnapshot) Persist(sink raft.SnapshotSink) error {
    var handle codec.MsgpackHandle
    var buf []byte
    enc := codec.NewEncoderBytes(&buf, &handle)
    if err := enc.Encode(s.state); err != nil {
        sink.Cancel()
        return fmt.Errorf("failed to marshal snapshot data: %s", err)
    }

    if _, err := sink.Write(buf); err != nil {
        sink.Cancel()
        return fmt.Errorf("failed to write snapshot data: %s", err)
    }

    if err := sink.Close(); err != nil {
        sink.Cancel()
        return fmt.Errorf("failed to close snapshot sink: %s", err)
    }

    return nil
}

// Release is called when the snapshot is no longer needed.
func (s *FSMSnapshot) Release() {}

Additionally, here is the part of my code where I initialize and start Raft:

func initializeRaftStoresAndTransport(localID raft.ServerID, raftAddr string) (raft.LogStore, raft.StableStore, raft.SnapshotStore, raft.Transport, error) {
    // Initialize Raft stores
    logStore := raft.NewInmemStore()    // Only for testing; not for production
    stableStore := raft.NewInmemStore() // Only for testing; not for production
    snapshotStore := raft.NewInmemSnapshotStore()

    // Initialize Raft transport with the provided address
    tcpAddr, err := net.ResolveTCPAddr("tcp", raftAddr)
    if err != nil {
        return nil, nil, nil, nil, err
    }
    transport, err := raft.NewTCPTransportWithConfig(
        tcpAddr.String(),
        tcpAddr,
        &raft.NetworkTransportConfig{
            Logger:                  nil,
            MsgpackUseNewTimeFormat: true,
        },
    )
    if err != nil {
        return nil, nil, nil, nil, err
    }

    return logStore, stableStore, snapshotStore, transport, nil
}

func (p *P2PNetwork) InitAndStartRaft(ctx context.Context, raftPeers []raft.Server) error {
    if p.Raft != nil {
        return nil // Raft already started
    }

    // Initialize Raft stores and transport with the actual address
    address := p.raftAddr
    localID := raft.ServerID(p.Host.ID().String())
    logStore, stableStore, snapshotStore, transport, err := initializeRaftStoresAndTransport(localID, address)
    if err != nil {
        return err
    }

    // Initialize FSM and Raft node
    fsm := NewFSM()
    config := raft.DefaultConfig()
    config.LocalID = localID
    raftNode, err := raft.NewRaft(config, fsm, logStore, stableStore, snapshotStore, transport)
    if err != nil {
        return err
    }

    p.Raft = &RaftHandler{
        RaftNode: raftNode,
        TustedNodes: make(map[string]ed25519.PublicKey),
    }

    // Convert multiaddrs to IP:Port format before bootstrapping Raft cluster
    var convertedRaftPeers []raft.Server
    for _, peer := range raftPeers {
        address, err := multiaddr.NewMultiaddr(string(peer.Address))
        if err != nil {
            return err
        }
        ipPort, err := ExtractIPAndPortFromMultiaddr(address)
        if err != nil {
            return err
        }
        convertedRaftPeers = append(convertedRaftPeers, raft.Server{
            ID:      peer.ID,
            Address: raft.ServerAddress(ipPort),
        })
    }

    if len(convertedRaftPeers) == 0 {
        return fmt.Errorf("no peers provided for Raft bootstrap")
    }

    err = raftNode.BootstrapCluster(raft.Configuration{
        Servers: convertedRaftPeers,
    }).Error()
    if err != nil {
        return err
    }

    return nil
}

Attempted Solutions:

  • Set MsgpackUseNewTimeFormat: true. I saw a similar post that said to do this. It didn't work.
  • I've ensured all my nodes are running the same versions of things (just testing with 2 nodes locally).

Environment:

Go version: 1.21

Libraries:

Steps to Reproduce:

  1. Start the Raft cluster with multiple nodes.
  2. Nodes attempt to elect a leader.
  3. The error occurs during the RequestVote RPC.

Suspected Reasons for Issue:

I think it could be: msgpack package error, differing protocols between lib-p2p and Raft, TCP vs. HTTP encoding issues, some invalid character passed, or something perhaps completely different.

Question:

What could be causing the msgpack decode error [pos 1]: invalid byte descriptor for decoding bytes, got: 0x13? How can I fix this issue?

Any help or pointers would be greatly appreciated! Let me know if more context is needed. I've spent almost an entire day trying to debug this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant