From 9286ee68db50a5074d3b76da1f79bd85a77e0982 Mon Sep 17 00:00:00 2001 From: kevburnsjr Date: Sat, 9 Mar 2024 05:16:41 -0800 Subject: [PATCH] Add shard client --- _example/kv1/controller.go | 131 --------------------------------- _example/kv1/handler.go | 18 +++-- _example/kv1/main.go | 17 ++--- agent.go | 28 ++++++-- agent_integration_test.go | 32 ++++++++- go.mod | 1 + go.sum | 2 + grpc_server.go | 4 ++ client.go => host_client.go | 36 +++++++--- shard_client.go | 76 ++++++++++++++++++++ shard_client_manager.go | 139 ++++++++++++++++++++++++++++++++++++ shard_client_option.go | 10 +++ shard_controller_test.go | 2 +- 13 files changed, 331 insertions(+), 165 deletions(-) delete mode 100644 _example/kv1/controller.go rename client.go => host_client.go (55%) create mode 100644 shard_client.go create mode 100644 shard_client_manager.go create mode 100644 shard_client_option.go diff --git a/_example/kv1/controller.go b/_example/kv1/controller.go deleted file mode 100644 index 43d7ea9..0000000 --- a/_example/kv1/controller.go +++ /dev/null @@ -1,131 +0,0 @@ -package main - -import ( - "bytes" - "context" - "log" - "math/rand" - "sync" - "time" - - "github.com/logbn/zongzi" -) - -type controller struct { - agent *zongzi.Agent - ctx context.Context - ctxCancel context.CancelFunc - members []*zongzi.Client - clients []*zongzi.Client - mutex sync.RWMutex - wg sync.WaitGroup - shard zongzi.Shard - index uint64 - lastHostID string -} - -func newController() *controller { - return &controller{} -} - -func (c *controller) Start(agent *zongzi.Agent) (err error) { - c.mutex.Lock() - c.agent = agent - c.ctx, c.ctxCancel = context.WithCancel(context.Background()) - go func() { - t := time.NewTicker(time.Second) - defer t.Stop() - for { - select { - case <-t.C: - err = c.tick() - case <-c.ctx.Done(): - return - } - if err != nil { - log.Printf("controller: %v", err) - } - } - }() - c.mutex.Unlock() - return c.tick() -} - -func (c *controller) tick() (err error) { - c.mutex.Lock() - defer c.mutex.Unlock() - var ok bool - if c.shard.ID == 0 { - c.agent.Read(c.ctx, func(state *zongzi.State) { - state.ShardIterate(func(s zongzi.Shard) bool { - if s.Type == uri { - c.shard = s - return false - } - return true - }) - }) - } - if c.shard.ID > 0 { - // Resolve replica clients - c.agent.Read(c.ctx, func(state *zongzi.State) { - if c.shard, ok = state.Shard(c.shard.ID); !ok { - return - } - if c.shard.Updated <= c.index && c.lastHostID == state.HostID() { - return - } - var members []*zongzi.Client - var clients []*zongzi.Client - state.ReplicaIterateByShardID(c.shard.ID, func(r zongzi.Replica) bool { - rc := c.agent.Client(r.HostID) - clients = append(clients, rc) - if !r.IsNonVoting { - members = append(members, rc) - } - return true - }) - c.members = members - c.clients = clients - c.index = c.shard.Updated - c.lastHostID = state.HostID() - - // Print snapshot - buf := bytes.NewBufferString("") - state.Save(buf) - log.Print(buf.String()) - }) - } - return -} - -func (c *controller) Stop() { - c.ctxCancel() - c.wg.Wait() -} - -func (c *controller) LeaderUpdated(info zongzi.LeaderInfo) { - c.mutex.Lock() - defer c.mutex.Unlock() - if c.shard.ID == 0 { - return - } - log.Printf("[%05d:%05d] LeaderUpdated: %05d", info.ShardID, info.ReplicaID, info.LeaderID) -} - -func (c *controller) getClient(random, member bool) (rc *zongzi.Client) { - c.mutex.RLock() - defer c.mutex.RUnlock() - if !random { - rc = c.agent.Client(c.agent.HostID()) - } else if member { - if len(c.members) > 0 { - rc = c.members[rand.Intn(len(c.members))] - } - } else { - if len(c.clients) > 0 { - rc = c.clients[rand.Intn(len(c.clients))] - } - } - return -} diff --git a/_example/kv1/handler.go b/_example/kv1/handler.go index e3d5301..e9cd43b 100644 --- a/_example/kv1/handler.go +++ b/_example/kv1/handler.go @@ -4,25 +4,35 @@ import ( "context" "encoding/json" "fmt" + "hash/fnv" "net/http" "strconv" "time" + + "github.com/logbn/zongzi" ) type handler struct { - ctrl *controller + clients []zongzi.ShardClient +} + +func hash(b []byte) uint32 { + hash := fnv.New32a() + hash.Write(b) + return hash.Sum32() } func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer w.Write([]byte("\n")) var err error + var shard = int(hash([]byte(r.URL.Path)) % uint32(len(h.clients))) ctx, _ := context.WithTimeout(context.Background(), time.Second) if r.Method == "GET" { query := kvQuery{ Op: queryOpRead, Key: r.URL.Path, } - code, data, err := h.ctrl.getClient(r.FormValue("local") != "true", false).Query(ctx, h.ctrl.shard.ID, query.MustMarshalBinary(), r.FormValue("stale") == "true") + code, data, err := h.clients[shard].Query(ctx, query.MustMarshalBinary(), r.FormValue("stale") == "true") if err != nil { w.WriteHeader(500) w.Write([]byte(err.Error())) @@ -56,9 +66,9 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var code uint64 var data []byte if r.FormValue("stale") == "true" { - err = h.ctrl.getClient(r.FormValue("local") != "true", true).Commit(ctx, h.ctrl.shard.ID, cmd.MustMarshalBinary()) + err = h.clients[shard].Commit(ctx, cmd.MustMarshalBinary()) } else { - code, data, err = h.ctrl.getClient(r.FormValue("local") != "true", true).Apply(ctx, h.ctrl.shard.ID, cmd.MustMarshalBinary()) + code, data, err = h.clients[shard].Apply(ctx, cmd.MustMarshalBinary()) if code == ResultCodeFailure { w.WriteHeader(400) w.Write(data) diff --git a/_example/kv1/main.go b/_example/kv1/main.go index 96d7086..659d50b 100644 --- a/_example/kv1/main.go +++ b/_example/kv1/main.go @@ -32,7 +32,6 @@ func main() { flag.Parse() zongzi.SetLogLevelDebug() ctx := context.Background() - ctrl := newController() agent, err := zongzi.NewAgent(*name, strings.Split(*peers, ","), zongzi.WithHostConfig(zongzi.HostConfig{ NodeHostDir: *dataDir + "/raft", @@ -43,7 +42,6 @@ func main() { zongzi.WithGossipAddress(*gossipAddr), zongzi.WithRaftAddress(*raftAddr), zongzi.WithApiAddress(*zongziAddr), - zongzi.WithRaftEventListener(ctrl), zongzi.WithHostTags( fmt.Sprintf(`geo:region=%s`, *region), fmt.Sprintf(`geo:zone=%s`, *zone))) @@ -54,9 +52,9 @@ func main() { if err = agent.Start(ctx); err != nil { panic(err) } - // var clients = make([]zongzi.ShardClient, *shards) - for i := 1; i <= *shards; i++ { - _, _, err := agent.RegisterShard(ctx, uri, + var clients = make([]zongzi.ShardClient, *shards) + for i := 0; i < *shards; i++ { + shard, _, err := agent.RegisterShard(ctx, uri, zongzi.WithName(fmt.Sprintf(`%s-%05d`, *name, i)), zongzi.WithPlacementVary(`geo:zone`), zongzi.WithPlacementMembers(3, `geo:region=`+*region)) @@ -64,23 +62,20 @@ func main() { if err != nil { panic(err) } - // clients[i] = agent.ShardClient(shard.ID) - } - if err = ctrl.Start(agent); err != nil { - panic(err) + clients[i] = agent.ShardClient(shard.ID) } // Start HTTP API go func(s *http.Server) { log.Fatal(s.ListenAndServe()) }(&http.Server{ Addr: *httpAddr, - Handler: &handler{ctrl}, + Handler: &handler{clients}, }) stop := make(chan os.Signal) signal.Notify(stop, os.Interrupt) signal.Notify(stop, syscall.SIGTERM) <-stop - ctrl.Stop() + agent.Stop() } diff --git a/agent.go b/agent.go index f66d840..7bd8d50 100644 --- a/agent.go +++ b/agent.go @@ -23,6 +23,7 @@ type Agent struct { clusterName string hostController *hostController shardControllerManager *shardControllerManager + shardClientManager *shardClientManager fsm *fsm grpcClientPool *grpcClientPool grpcServer *grpcServer @@ -66,6 +67,7 @@ func NewAgent(clusterName string, peers []string, opts ...AgentOption) (a *Agent status: AgentStatus_Pending, } a.shardControllerManager = newShardControllerManager(a) + a.shardClientManager = newShardClientManager(a) for _, opt := range append([]AgentOption{ WithApiAddress(DefaultApiAddress), WithGossipAddress(DefaultGossipAddress), @@ -75,7 +77,10 @@ func NewAgent(clusterName string, peers []string, opts ...AgentOption) (a *Agent opt(a) } a.hostController = newHostController(a) - a.hostConfig.RaftEventListener = newCompositeRaftEventListener(a.shardControllerManager, a.hostConfig.RaftEventListener) + a.hostConfig.RaftEventListener = newCompositeRaftEventListener( + a.shardControllerManager, + a.hostConfig.RaftEventListener, + ) a.replicaConfig.ShardID = ShardID a.hostConfig.DeploymentID = mustBase36Decode(clusterName) a.hostConfig.AddressByNodeHostID = true @@ -91,6 +96,7 @@ func (a *Agent) Start(ctx context.Context) (err error) { if err == nil { a.hostController.Start() a.shardControllerManager.Start() + a.shardClientManager.Start() a.setStatus(AgentStatus_Ready) } }() @@ -206,17 +212,24 @@ func (a *Agent) Start(ctx context.Context) (err error) { return } -// Client returns a Client for a specific host. -func (a *Agent) Client(hostID string) (c *Client) { +// HostClient returns a Client for a specific host. +func (a *Agent) HostClient(hostID string) (c HostClient) { a.Read(a.ctx, func(s *State) { host, ok := s.Host(hostID) if ok { - c = newClient(a, host) + c = newHostClient(a, host) } }) return } +// ShardClient returns a Client for a specific shard. +// It will send writes to the nearest member and send reads to the nearest replica (by ping). +func (a *Agent) ShardClient(shardID uint64) (c ShardClient) { + c, _ = newShardClient(a.shardClientManager, shardID) + return +} + // Status returns the agent status func (a *Agent) Status() AgentStatus { a.mutex.RLock() @@ -231,7 +244,7 @@ func (a *Agent) Status() AgentStatus { // return nil // }) // -// Linear reads are enable by default to achieve "Read Your Writes" consistency following a proposal. Pass optional +// Linear reads are enabled by default to achieve "Read Your Writes" consistency following a proposal. Pass optional // argument _stale_ as true to disable linearizable reads (for higher performance). State will always provide snapshot // isolation, even for stale reads. // @@ -250,6 +263,11 @@ func (a *Agent) Read(ctx context.Context, fn func(*State), stale ...bool) (err e return } +func (a *Agent) ReadStale(fn func(*State)) (err error) { + fn(a.fsm.state.withTxn(false)) + return +} + // RegisterShard creates a new shard. If shard name option is provided and shard already exists, // found shard is updated. func (a *Agent) RegisterShard(ctx context.Context, uri string, opts ...ShardOption) (shard Shard, created bool, err error) { diff --git a/agent_integration_test.go b/agent_integration_test.go index 66d3476..ff6b50b 100644 --- a/agent_integration_test.go +++ b/agent_integration_test.go @@ -143,9 +143,12 @@ func TestAgent(t *testing.T) { } for _, op := range []string{"update", "query"} { for _, linearity := range []string{"linear", "non-linear"} { - t.Run(fmt.Sprintf(`%s %s %s`, sm, op, linearity), func(t *testing.T) { + t.Run(fmt.Sprintf(`%s %s %s host client`, sm, op, linearity), func(t *testing.T) { runAgentSubTest(t, agents, shard, sm, op, linearity != "linear") }) + t.Run(fmt.Sprintf(`%s %s %s shard client`, sm, op, linearity), func(t *testing.T) { + runAgentSubTestByShard(t, agents, shard, sm, op, linearity != "linear") + }) } } } @@ -266,7 +269,7 @@ func runAgentSubTest(t *testing.T, agents []*Agent, shard Shard, sm, op string, return true } val = 0 - client := agents[0].Client(r.HostID) + client := agents[0].HostClient(r.HostID) require.NotNil(t, client) if op == "update" && stale { err = client.Commit(raftCtx(), shard.ID, bytes.Repeat([]byte("test"), i+1)) @@ -290,6 +293,31 @@ func runAgentSubTest(t *testing.T, agents []*Agent, shard Shard, sm, op string, } } +func runAgentSubTestByShard(t *testing.T, agents []*Agent, shard Shard, sm, op string, stale bool) { + var i = int(1e6) + var err error + var val uint64 + for _, a := range agents { + val = 0 + client := a.ShardClient(shard.ID) + require.NotNil(t, client) + if op == "update" && stale { + err = client.Commit(raftCtx(), bytes.Repeat([]byte("test"), i+1)) + } else if op == "update" && !stale { + val, _, err = client.Apply(raftCtx(), bytes.Repeat([]byte("test"), i+1)) + } else { + val, _, err = client.Query(raftCtx(), bytes.Repeat([]byte("test"), i+1), stale) + } + require.Nil(t, err, `%v, %v, %#v`, i, err, client) + if op == "update" && stale { + assert.Equal(t, uint64(0), val) + } else { + assert.Equal(t, uint64((i+1)*4), val) + } + i++ + } +} + func await(d, n time.Duration, fn func() bool) bool { for i := 0; i < int(n); i++ { if fn() { diff --git a/go.mod b/go.mod index 0dfc386..5ce496a 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( github.com/benbjohnson/clock v1.3.0 + github.com/elliotchance/orderedmap/v2 v2.2.0 github.com/hashicorp/go-memdb v1.3.4 github.com/hashicorp/golang-lru/v2 v2.0.2 github.com/lni/dragonboat/v4 v4.0.0-20230202152124-023bafb8e648 diff --git a/go.sum b/go.sum index 26dfc25..a0e0b77 100644 --- a/go.sum +++ b/go.sum @@ -62,6 +62,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= +github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4cyHeOFOnj4Vk= +github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= diff --git a/grpc_server.go b/grpc_server.go index 17b45f1..b74b2f2 100644 --- a/grpc_server.go +++ b/grpc_server.go @@ -26,6 +26,10 @@ func newGrpcServer(listenAddr string, opts ...grpc.ServerOption) *grpcServer { } } +func (s *grpcServer) Ping(ctx context.Context, req *internal.PingRequest) (res *internal.PingResponse, err error) { + return &internal.PingResponse{}, nil +} + func (s *grpcServer) Probe(ctx context.Context, req *internal.ProbeRequest) (res *internal.ProbeResponse, err error) { // s.agent.log.Debugf(`gRPC Req Probe: %#v`, req) return &internal.ProbeResponse{ diff --git a/client.go b/host_client.go similarity index 55% rename from client.go rename to host_client.go index 25ffbae..0dcc17e 100644 --- a/client.go +++ b/host_client.go @@ -2,42 +2,56 @@ package zongzi import ( "context" + "time" + + "github.com/benbjohnson/clock" "github.com/logbn/zongzi/internal" ) -type Client struct { +type HostClient interface { + Ping(ctx context.Context) (t time.Duration, err error) + Apply(ctx context.Context, shardID uint64, cmd []byte) (value uint64, data []byte, err error) + Commit(ctx context.Context, shardID uint64, cmd []byte) (err error) + Query(ctx context.Context, shardID uint64, query []byte, stale ...bool) (value uint64, data []byte, err error) +} + +type hostclient struct { agent *Agent + clock clock.Clock hostApiAddress string hostID string } -func newClient(a *Agent, host Host) *Client { - return &Client{ +func newHostClient(a *Agent, host Host) *hostclient { + return &hostclient{ agent: a, + clock: clock.New(), hostApiAddress: host.ApiAddress, hostID: host.ID, } } -func (c *Client) Ping(ctx context.Context) (err error) { +func (c *hostclient) Ping(ctx context.Context) (t time.Duration, err error) { if c.hostID == c.agent.HostID() { return } + start := c.clock.Now() _, err = c.agent.grpcClientPool.get(c.hostApiAddress).Ping(ctx, &internal.PingRequest{}) + t = c.clock.Since(start) return } -func (c *Client) Apply(ctx context.Context, shardID uint64, cmd []byte) (value uint64, data []byte, err error) { +func (c *hostclient) Apply(ctx context.Context, shardID uint64, cmd []byte) (value uint64, data []byte, err error) { var res *internal.ApplyResponse if c.hostID == c.agent.HostID() { - c.agent.log.Debugf(`gRPC Client Apply Local: %s`, string(cmd)) + c.agent.log.Debugf(`gRPC HostClient Apply Local: %s`, string(cmd)) res, err = c.agent.grpcServer.Apply(ctx, &internal.ApplyRequest{ ShardId: shardID, Data: cmd, }) } else { - c.agent.log.Debugf(`gRPC Client Apply Remote: %s`, string(cmd)) + c.agent.log.Debugf(`gRPC HostClient Apply Remote: %s`, string(cmd)) res, err = c.agent.grpcClientPool.get(c.hostApiAddress).Apply(ctx, &internal.ApplyRequest{ ShardId: shardID, Data: cmd, @@ -51,15 +65,15 @@ func (c *Client) Apply(ctx context.Context, shardID uint64, cmd []byte) (value u return } -func (c *Client) Commit(ctx context.Context, shardID uint64, cmd []byte) (err error) { +func (c *hostclient) Commit(ctx context.Context, shardID uint64, cmd []byte) (err error) { if c.hostID == c.agent.HostID() { - c.agent.log.Debugf(`gRPC Client Commit Local: %s`, string(cmd)) + c.agent.log.Debugf(`gRPC HostClient Commit Local: %s`, string(cmd)) _, err = c.agent.grpcServer.Commit(ctx, &internal.CommitRequest{ ShardId: shardID, Data: cmd, }) } else { - c.agent.log.Debugf(`gRPC Client Commit Remote: %s`, string(cmd)) + c.agent.log.Debugf(`gRPC HostClient Commit Remote: %s`, string(cmd)) _, err = c.agent.grpcClientPool.get(c.hostApiAddress).Commit(ctx, &internal.CommitRequest{ ShardId: shardID, Data: cmd, @@ -71,7 +85,7 @@ func (c *Client) Commit(ctx context.Context, shardID uint64, cmd []byte) (err er return } -func (c *Client) Query(ctx context.Context, shardID uint64, query []byte, stale ...bool) (value uint64, data []byte, err error) { +func (c *hostclient) Query(ctx context.Context, shardID uint64, query []byte, stale ...bool) (value uint64, data []byte, err error) { var res *internal.QueryResponse if c.hostID == c.agent.HostID() { res, err = c.agent.grpcServer.Query(ctx, &internal.QueryRequest{ diff --git a/shard_client.go b/shard_client.go new file mode 100644 index 0000000..339f3ae --- /dev/null +++ b/shard_client.go @@ -0,0 +1,76 @@ +package zongzi + +import ( + "context" +) + +// ShardClient can be used to interact with a shard regardless of its placement in the cluster +// Requests will be forwarded to the appropriate host based on ping +type ShardClient interface { + Apply(ctx context.Context, cmd []byte) (value uint64, data []byte, err error) + Commit(ctx context.Context, cmd []byte) (err error) + Query(ctx context.Context, query []byte, stale ...bool) (value uint64, data []byte, err error) +} + +// The shardClient +type shardClient struct { + manager *shardClientManager + shardID uint64 + retries int +} + +func newShardClient(manager *shardClientManager, shardID uint64, opts ...ShardClientOption) (c *shardClient, err error) { + c = &shardClient{ + manager: manager, + shardID: shardID, + } + for _, fn := range opts { + if err = fn(c); err != nil { + return + } + } + return +} + +func (c *shardClient) Apply(ctx context.Context, cmd []byte) (value uint64, data []byte, err error) { + c.manager.mutex.RLock() + el := c.manager.clientMember[c.shardID].Front() + c.manager.mutex.RUnlock() + for ; el != nil; el = el.Next() { + value, data, err = el.Value.Apply(ctx, c.shardID, cmd) + } + return +} + +func (c *shardClient) Commit(ctx context.Context, cmd []byte) (err error) { + c.manager.mutex.RLock() + el := c.manager.clientMember[c.shardID].Front() + c.manager.mutex.RUnlock() + for ; el != nil; el = el.Next() { + err = el.Value.Commit(ctx, c.shardID, cmd) + } + return +} + +func (c *shardClient) Query(ctx context.Context, query []byte, stale ...bool) (value uint64, data []byte, err error) { + var run bool + if len(stale) > 0 && stale[0] { + c.manager.mutex.RLock() + el := c.manager.clientReplica[c.shardID].Front() + c.manager.mutex.RUnlock() + for ; el != nil; el = el.Next() { + run = true + value, data, err = el.Value.Query(ctx, c.shardID, query) + } + if run && err == nil { + return + } + } + c.manager.mutex.RLock() + el := c.manager.clientMember[c.shardID].Front() + c.manager.mutex.RUnlock() + for ; el != nil; el = el.Next() { + value, data, err = el.Value.Query(ctx, c.shardID, query) + } + return +} diff --git a/shard_client_manager.go b/shard_client_manager.go new file mode 100644 index 0000000..0c3782b --- /dev/null +++ b/shard_client_manager.go @@ -0,0 +1,139 @@ +package zongzi + +import ( + "cmp" + "context" + "slices" + "sync" + "time" + + "github.com/benbjohnson/clock" + "github.com/elliotchance/orderedmap/v2" +) + +// The shardClientManager creates and destroys replicas based on a shard tags. +type shardClientManager struct { + agent *Agent + clock clock.Clock + ctx context.Context + ctxCancel context.CancelFunc + clientHost map[string]HostClient + clientMember map[uint64]*orderedmap.OrderedMap[int64, HostClient] + clientReplica map[uint64]*orderedmap.OrderedMap[int64, HostClient] + index uint64 + log Logger + mutex sync.RWMutex + shardController ShardController + wg sync.WaitGroup +} + +func newShardClientManager(agent *Agent) *shardClientManager { + return &shardClientManager{ + log: agent.log, + agent: agent, + clock: clock.New(), + clientHost: map[string]HostClient{}, + clientMember: map[uint64]*orderedmap.OrderedMap[int64, HostClient]{}, + clientReplica: map[uint64]*orderedmap.OrderedMap[int64, HostClient]{}, + } +} + +func (c *shardClientManager) Start() (err error) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.ctx, c.ctxCancel = context.WithCancel(context.Background()) + c.wg.Add(1) + go func() { + defer c.wg.Done() + t := c.clock.Ticker(500 * time.Millisecond) + defer t.Stop() + for { + select { + case <-c.ctx.Done(): + c.log.Infof("Shard controller manager stopped") + return + case <-t.C: + c.tick() + } + } + }() + return +} + +type hostClientPing struct { + ping int64 + client HostClient +} + +func (c *shardClientManager) tick() { + var err error + var index uint64 + var start = c.clock.Now() + var shardCount int + var replicaCount int + var pings = map[string]time.Duration{} + err = c.agent.Read(c.ctx, func(state *State) { + state.ShardIterateUpdatedAfter(c.index, func(shard Shard) bool { + shardCount++ + index = shard.Updated + members := []hostClientPing{} + replicas := []hostClientPing{} + state.ReplicaIterateByShardID(shard.ID, func(replica Replica) bool { + replicaCount++ + client, ok := c.clientHost[replica.HostID] + if !ok { + client = c.agent.HostClient(replica.HostID) + c.clientHost[replica.HostID] = client + } + ping, ok := pings[replica.HostID] + if !ok { + ctx, cancel := context.WithTimeout(c.ctx, time.Second) + defer cancel() + ping, err = client.Ping(ctx) + if err != nil { + c.log.Warningf(`Unable to ping host in shard client manager: %s`, err.Error()) + return true + } + pings[replica.HostID] = ping + } + if replica.IsNonVoting { + replicas = append(replicas, hostClientPing{ping.Nanoseconds(), client}) + } else { + members = append(replicas, hostClientPing{ping.Nanoseconds(), client}) + } + return true + }) + slices.SortFunc(members, func(a, b hostClientPing) int { return cmp.Compare(a.ping, b.ping) }) + slices.SortFunc(replicas, func(a, b hostClientPing) int { return cmp.Compare(a.ping, b.ping) }) + newMembers := orderedmap.NewOrderedMap[int64, HostClient]() + for _, item := range members { + newMembers.Set(item.ping, item.client) + } + newReplicas := orderedmap.NewOrderedMap[int64, HostClient]() + for _, item := range replicas { + newReplicas.Set(item.ping, item.client) + } + c.mutex.Lock() + c.clientMember[shard.ID] = newMembers + c.clientReplica[shard.ID] = newReplicas + c.mutex.Unlock() + return true + }) + }, true) + if err == nil && shardCount > 0 { + c.log.Infof("%s Shard client manager updated. hosts: %d shards: %d replicas: %d time: %vms", c.agent.HostID(), len(pings), shardCount, replicaCount, int(c.clock.Since(start)/time.Millisecond)) + c.index = index + } + return +} + +func (c *shardClientManager) Stop() { + defer c.log.Infof(`Stopped shardClientManager`) + if c.ctxCancel != nil { + c.ctxCancel() + } + c.wg.Wait() + c.mutex.Lock() + defer c.mutex.Unlock() + c.index = 0 +} diff --git a/shard_client_option.go b/shard_client_option.go new file mode 100644 index 0000000..6bedac2 --- /dev/null +++ b/shard_client_option.go @@ -0,0 +1,10 @@ +package zongzi + +type ShardClientOption func(*shardClient) error + +func WithRetries(retries int) ShardClientOption { + return func(c *shardClient) error { + c.retries = retries + return nil + } +} diff --git a/shard_controller_test.go b/shard_controller_test.go index ef36075..67ef4fd 100644 --- a/shard_controller_test.go +++ b/shard_controller_test.go @@ -8,7 +8,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestShardController(t *testing.T) { +func TestShardControllerDefault(t *testing.T) { t.Run("reconcile", func(t *testing.T) { state := newFsmStateRadix() testHelperFillHosts(state, 3)