Skip to content

Commit

Permalink
Add shard client
Browse files Browse the repository at this point in the history
  • Loading branch information
kevburnsjr committed Mar 9, 2024
1 parent db47ee6 commit 9286ee6
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 165 deletions.
131 changes: 0 additions & 131 deletions _example/kv1/controller.go

This file was deleted.

18 changes: 14 additions & 4 deletions _example/kv1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,35 @@ import (
"context"
"encoding/json"
"fmt"
"hash/fnv"
"net/http"
"strconv"
"time"

"github.com/logbn/zongzi"
)

type handler struct {
ctrl *controller
clients []zongzi.ShardClient
}

func hash(b []byte) uint32 {
hash := fnv.New32a()
hash.Write(b)
return hash.Sum32()
}

func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer w.Write([]byte("\n"))
var err error
var shard = int(hash([]byte(r.URL.Path)) % uint32(len(h.clients)))
ctx, _ := context.WithTimeout(context.Background(), time.Second)
if r.Method == "GET" {
query := kvQuery{
Op: queryOpRead,
Key: r.URL.Path,
}
code, data, err := h.ctrl.getClient(r.FormValue("local") != "true", false).Query(ctx, h.ctrl.shard.ID, query.MustMarshalBinary(), r.FormValue("stale") == "true")
code, data, err := h.clients[shard].Query(ctx, query.MustMarshalBinary(), r.FormValue("stale") == "true")
if err != nil {
w.WriteHeader(500)
w.Write([]byte(err.Error()))
Expand Down Expand Up @@ -56,9 +66,9 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var code uint64
var data []byte
if r.FormValue("stale") == "true" {
err = h.ctrl.getClient(r.FormValue("local") != "true", true).Commit(ctx, h.ctrl.shard.ID, cmd.MustMarshalBinary())
err = h.clients[shard].Commit(ctx, cmd.MustMarshalBinary())
} else {
code, data, err = h.ctrl.getClient(r.FormValue("local") != "true", true).Apply(ctx, h.ctrl.shard.ID, cmd.MustMarshalBinary())
code, data, err = h.clients[shard].Apply(ctx, cmd.MustMarshalBinary())
if code == ResultCodeFailure {
w.WriteHeader(400)
w.Write(data)
Expand Down
17 changes: 6 additions & 11 deletions _example/kv1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func main() {
flag.Parse()
zongzi.SetLogLevelDebug()
ctx := context.Background()
ctrl := newController()
agent, err := zongzi.NewAgent(*name, strings.Split(*peers, ","),
zongzi.WithHostConfig(zongzi.HostConfig{
NodeHostDir: *dataDir + "/raft",
Expand All @@ -43,7 +42,6 @@ func main() {
zongzi.WithGossipAddress(*gossipAddr),
zongzi.WithRaftAddress(*raftAddr),
zongzi.WithApiAddress(*zongziAddr),
zongzi.WithRaftEventListener(ctrl),
zongzi.WithHostTags(
fmt.Sprintf(`geo:region=%s`, *region),
fmt.Sprintf(`geo:zone=%s`, *zone)))
Expand All @@ -54,33 +52,30 @@ func main() {
if err = agent.Start(ctx); err != nil {
panic(err)
}
// var clients = make([]zongzi.ShardClient, *shards)
for i := 1; i <= *shards; i++ {
_, _, err := agent.RegisterShard(ctx, uri,
var clients = make([]zongzi.ShardClient, *shards)
for i := 0; i < *shards; i++ {
shard, _, err := agent.RegisterShard(ctx, uri,
zongzi.WithName(fmt.Sprintf(`%s-%05d`, *name, i)),
zongzi.WithPlacementVary(`geo:zone`),
zongzi.WithPlacementMembers(3, `geo:region=`+*region))
// zongzi.WithPlacementReplicas(*region, 3, `geo:region=`+*region)) // Place 3 read replicas in this region
if err != nil {
panic(err)
}
// clients[i] = agent.ShardClient(shard.ID)
}
if err = ctrl.Start(agent); err != nil {
panic(err)
clients[i] = agent.ShardClient(shard.ID)
}
// Start HTTP API
go func(s *http.Server) {
log.Fatal(s.ListenAndServe())
}(&http.Server{
Addr: *httpAddr,
Handler: &handler{ctrl},
Handler: &handler{clients},
})

stop := make(chan os.Signal)
signal.Notify(stop, os.Interrupt)
signal.Notify(stop, syscall.SIGTERM)
<-stop
ctrl.Stop()

agent.Stop()
}
28 changes: 23 additions & 5 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Agent struct {
clusterName string
hostController *hostController
shardControllerManager *shardControllerManager
shardClientManager *shardClientManager
fsm *fsm
grpcClientPool *grpcClientPool
grpcServer *grpcServer
Expand Down Expand Up @@ -66,6 +67,7 @@ func NewAgent(clusterName string, peers []string, opts ...AgentOption) (a *Agent
status: AgentStatus_Pending,
}
a.shardControllerManager = newShardControllerManager(a)
a.shardClientManager = newShardClientManager(a)
for _, opt := range append([]AgentOption{
WithApiAddress(DefaultApiAddress),
WithGossipAddress(DefaultGossipAddress),
Expand All @@ -75,7 +77,10 @@ func NewAgent(clusterName string, peers []string, opts ...AgentOption) (a *Agent
opt(a)
}
a.hostController = newHostController(a)
a.hostConfig.RaftEventListener = newCompositeRaftEventListener(a.shardControllerManager, a.hostConfig.RaftEventListener)
a.hostConfig.RaftEventListener = newCompositeRaftEventListener(
a.shardControllerManager,
a.hostConfig.RaftEventListener,
)
a.replicaConfig.ShardID = ShardID
a.hostConfig.DeploymentID = mustBase36Decode(clusterName)
a.hostConfig.AddressByNodeHostID = true
Expand All @@ -91,6 +96,7 @@ func (a *Agent) Start(ctx context.Context) (err error) {
if err == nil {
a.hostController.Start()
a.shardControllerManager.Start()
a.shardClientManager.Start()
a.setStatus(AgentStatus_Ready)
}
}()
Expand Down Expand Up @@ -206,17 +212,24 @@ func (a *Agent) Start(ctx context.Context) (err error) {
return
}

// Client returns a Client for a specific host.
func (a *Agent) Client(hostID string) (c *Client) {
// HostClient returns a Client for a specific host.
func (a *Agent) HostClient(hostID string) (c HostClient) {
a.Read(a.ctx, func(s *State) {
host, ok := s.Host(hostID)
if ok {
c = newClient(a, host)
c = newHostClient(a, host)
}
})
return
}

// ShardClient returns a Client for a specific shard.
// It will send writes to the nearest member and send reads to the nearest replica (by ping).
func (a *Agent) ShardClient(shardID uint64) (c ShardClient) {
c, _ = newShardClient(a.shardClientManager, shardID)
return
}

// Status returns the agent status
func (a *Agent) Status() AgentStatus {
a.mutex.RLock()
Expand All @@ -231,7 +244,7 @@ func (a *Agent) Status() AgentStatus {
// return nil
// })
//
// Linear reads are enable by default to achieve "Read Your Writes" consistency following a proposal. Pass optional
// Linear reads are enabled by default to achieve "Read Your Writes" consistency following a proposal. Pass optional
// argument _stale_ as true to disable linearizable reads (for higher performance). State will always provide snapshot
// isolation, even for stale reads.
//
Expand All @@ -250,6 +263,11 @@ func (a *Agent) Read(ctx context.Context, fn func(*State), stale ...bool) (err e
return
}

func (a *Agent) ReadStale(fn func(*State)) (err error) {
fn(a.fsm.state.withTxn(false))
return
}

// RegisterShard creates a new shard. If shard name option is provided and shard already exists,
// found shard is updated.
func (a *Agent) RegisterShard(ctx context.Context, uri string, opts ...ShardOption) (shard Shard, created bool, err error) {
Expand Down
32 changes: 30 additions & 2 deletions agent_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,12 @@ func TestAgent(t *testing.T) {
}
for _, op := range []string{"update", "query"} {
for _, linearity := range []string{"linear", "non-linear"} {
t.Run(fmt.Sprintf(`%s %s %s`, sm, op, linearity), func(t *testing.T) {
t.Run(fmt.Sprintf(`%s %s %s host client`, sm, op, linearity), func(t *testing.T) {
runAgentSubTest(t, agents, shard, sm, op, linearity != "linear")
})
t.Run(fmt.Sprintf(`%s %s %s shard client`, sm, op, linearity), func(t *testing.T) {
runAgentSubTestByShard(t, agents, shard, sm, op, linearity != "linear")
})
}
}
}
Expand Down Expand Up @@ -266,7 +269,7 @@ func runAgentSubTest(t *testing.T, agents []*Agent, shard Shard, sm, op string,
return true
}
val = 0
client := agents[0].Client(r.HostID)
client := agents[0].HostClient(r.HostID)
require.NotNil(t, client)
if op == "update" && stale {
err = client.Commit(raftCtx(), shard.ID, bytes.Repeat([]byte("test"), i+1))
Expand All @@ -290,6 +293,31 @@ func runAgentSubTest(t *testing.T, agents []*Agent, shard Shard, sm, op string,
}
}

func runAgentSubTestByShard(t *testing.T, agents []*Agent, shard Shard, sm, op string, stale bool) {
var i = int(1e6)
var err error
var val uint64
for _, a := range agents {
val = 0
client := a.ShardClient(shard.ID)
require.NotNil(t, client)
if op == "update" && stale {
err = client.Commit(raftCtx(), bytes.Repeat([]byte("test"), i+1))
} else if op == "update" && !stale {
val, _, err = client.Apply(raftCtx(), bytes.Repeat([]byte("test"), i+1))
} else {
val, _, err = client.Query(raftCtx(), bytes.Repeat([]byte("test"), i+1), stale)
}
require.Nil(t, err, `%v, %v, %#v`, i, err, client)
if op == "update" && stale {
assert.Equal(t, uint64(0), val)
} else {
assert.Equal(t, uint64((i+1)*4), val)
}
i++
}
}

func await(d, n time.Duration, fn func() bool) bool {
for i := 0; i < int(n); i++ {
if fn() {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/benbjohnson/clock v1.3.0
github.com/elliotchance/orderedmap/v2 v2.2.0
github.com/hashicorp/go-memdb v1.3.4
github.com/hashicorp/golang-lru/v2 v2.0.2
github.com/lni/dragonboat/v4 v4.0.0-20230202152124-023bafb8e648
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4cyHeOFOnj4Vk=
github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
Loading

0 comments on commit 9286ee6

Please sign in to comment.