Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

may I get all node status? #598

Open
zey1996 opened this issue Jun 9, 2024 · 0 comments
Open

may I get all node status? #598

zey1996 opened this issue Jun 9, 2024 · 0 comments

Comments

@zey1996
Copy link

zey1996 commented Jun 9, 2024

I am not good for raft, but I want use it to resolve my problem. But I have some question.

  1. may I get cluster status? I want to notice my cluster change, And the status of each node I don't konw how to do. I must do it by myselfe?
  2. I can use raft.Apply to send a log to all node. And I notice Apply in fsm can return a interface, May I use it?, If I have some node, How can I get all response?

Thank you for my reply!

this is my demo code:

package main

import (
	"encoding/json"
	"fmt"
	"io"
	"net"
	"os"
	"strconv"
	"time"

	"github.com/hashicorp/raft"
)

var ra *raft.Raft

type item struct {
	ID   string
	Addr string
}

func main() {
	idx, _ := strconv.Atoi(os.Args[1])

	items := []raft.Server{
		{
			ID:      "node0",
			Address: "localhost:12000",
		},
		{
			ID:      "node1",
			Address: "localhost:12001",
		},
		{
			ID:      "node2",
			Address: "localhost:12002",
		},
	}
	fmt.Println(
		"localID:", items[idx].ID,
		"raftAddr:", items[idx].Address,
	)
	addr, _ := net.ResolveTCPAddr("tcp", string(items[idx].Address))

	config := &raft.Config{
		ProtocolVersion:    raft.ProtocolVersionMax,
		HeartbeatTimeout:   1000 * time.Millisecond,
		ElectionTimeout:    1000 * time.Millisecond,
		CommitTimeout:      50 * time.Millisecond,
		MaxAppendEntries:   64,
		ShutdownOnRemove:   true,
		TrailingLogs:       10240,
		SnapshotInterval:   120 * time.Second,
		SnapshotThreshold:  8192,
		LeaderLeaseTimeout: 1000 * time.Millisecond,
		LogLevel:           "INFO",
	}
	config.LocalID = items[idx].ID
	logStore := raft.NewInmemStore()
	stableStore := raft.NewInmemStore()
	s := &Store{
		RaftBind: string(items[idx].Address),
	}

	snapshots := raft.NewInmemSnapshotStore()

	transport, err := raft.NewTCPTransport(s.RaftBind, addr, 3, 10*time.Second, os.Stderr)
	if err != nil {
		panic(err)
	}
	ra, err = raft.NewRaft(config, (*fsm)(s), logStore, stableStore, snapshots, transport)
	if err != nil {
		panic(err)
	}
	configuration := raft.Configuration{
		Servers: items,
	}
	fmt.Println("transport.LocalAddr()", transport.LocalAddr())
	ra.BootstrapCluster(configuration)
	go func() {
		tick := time.NewTicker(time.Second * 10)
		for t := range tick.C {
			fmt.Println("I am leader:", ra.State().String())
			if ra.State() == raft.Leader {
				c := &command{
					Op:    "set",
					Key:   "key",
					Value: t.String(),
				}
				marshal, _ := json.Marshal(c)
				fu := ra.Apply(marshal, time.Second)
				if err := fu.Error(); err != nil {
					fmt.Println(err)
				}
				fmt.Println(fu.Response())
			}
			fmt.Println("Servers:", ra.GetConfiguration().Configuration().Servers)
		}
	}()

	go func() {
		for lc := range ra.LeaderCh() {
			server, addr := ra.LeaderWithID()
			fmt.Println("leader change:", lc, server, addr)
		}
	}()

	select {}
}

type Store struct {
	RaftBind string
	raft     *raft.Raft // The consensus mechanism

}
type fsm Store

type command struct {
	Op    string `json:"op,omitempty"`
	Key   string `json:"key,omitempty"`
	Value string `json:"value,omitempty"`
}

// Apply applies a Raft log entry to the key-value store.
func (f *fsm) Apply(l *raft.Log) interface{} {
	var c command
	if err := json.Unmarshal(l.Data, &c); err != nil {
		panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error()))
	}

	fmt.Println("OP:", c.Op, c.Value)
	return &command{Op: "response", Value: f.RaftBind}
}

type fsmSnapshot struct {
}

func (f fsmSnapshot) Persist(sink raft.SnapshotSink) error {
	return nil
}

func (f fsmSnapshot) Release() {
}

// Snapshot returns a snapshot of the key-value store.
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
	return &fsmSnapshot{}, nil
}

// Restore stores the key-value store to a previous state.
func (f *fsm) Restore(rc io.ReadCloser) error {
	return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant