From c1fb0e437185a77e35e81f77867944afae652f87 Mon Sep 17 00:00:00 2001 From: LingKa Date: Thu, 11 Jan 2024 17:49:33 +0800 Subject: [PATCH] refactor: propose id Signed-off-by: LingKa --- client/auth.go | 8 +--- client/client.go | 2 +- client/cluster.go | 2 +- client/compare.go | 2 +- client/error.go | 2 +- client/kv.go | 32 ++++--------- client/lease.go | 20 ++------ client/lock.go | 27 ++--------- client/maintenance.go | 2 +- client/op.go | 2 +- client/protocol.go | 103 ++++++++++++++++++++---------------------- client/txn.go | 8 +--- client/watch.go | 2 +- tests/lease_test.go | 2 +- tests/lock_test.go | 2 +- 15 files changed, 81 insertions(+), 135 deletions(-) diff --git a/client/auth.go b/client/auth.go index af43f2f..5b63c0f 100644 --- a/client/auth.go +++ b/client/auth.go @@ -21,7 +21,7 @@ import ( "errors" "fmt" - "github.com/xline-kv/go-xline/api/xline" + "github.com/xline-kv/go-xline/api/gen/xline" "golang.org/x/crypto/pbkdf2" ) @@ -488,11 +488,7 @@ func (c *authClient) RoleRevokePermission(role string, key, rangeEnd []byte) (*A // Send request using fast path func (c *authClient) handleReq(req *xlineapi.RequestWithToken, useFastPath bool) (*proposeRes, error) { - pid, err := c.curpClient.GenProposeID() - if err != nil { - return nil, err - } - cmd := xlineapi.Command{Request: req, ProposeId: pid} + cmd := xlineapi.Command{Request: req} if useFastPath { res, err := c.curpClient.Propose(&cmd, true) diff --git a/client/client.go b/client/client.go index bfaff1f..fa005c1 100644 --- a/client/client.go +++ b/client/client.go @@ -5,7 +5,7 @@ import ( "fmt" "math/rand" - "github.com/xline-kv/go-xline/api/xline" + "github.com/xline-kv/go-xline/api/gen/xline" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) diff --git a/client/cluster.go b/client/cluster.go index b0e87f2..11b138f 100644 --- a/client/cluster.go +++ b/client/cluster.go @@ -17,7 +17,7 @@ package client import ( "context" - "github.com/xline-kv/go-xline/api/xline" + "github.com/xline-kv/go-xline/api/gen/xline" "google.golang.org/grpc" ) diff --git a/client/compare.go b/client/compare.go index 015a59b..fa05231 100644 --- a/client/compare.go +++ b/client/compare.go @@ -14,7 +14,7 @@ package client -import "github.com/xline-kv/go-xline/api/xline" +import "github.com/xline-kv/go-xline/api/gen/xline" type CompareTarget int type CompareResult int diff --git a/client/error.go b/client/error.go index 9af4cb8..3094ffc 100644 --- a/client/error.go +++ b/client/error.go @@ -4,7 +4,7 @@ import ( "errors" "fmt" - xlineapi "github.com/xline-kv/go-xline/api/xline" + "github.com/xline-kv/go-xline/api/gen/xline" ) type CommandError struct { diff --git a/client/kv.go b/client/kv.go index 517702f..fbfc7b6 100644 --- a/client/kv.go +++ b/client/kv.go @@ -14,7 +14,7 @@ package client -import "github.com/xline-kv/go-xline/api/xline" +import "github.com/xline-kv/go-xline/api/gen/xline" type KV interface { // Put puts a key-value pair into xline. @@ -77,6 +77,7 @@ func NewKV(curpClient Curp, token string) KV { // Put a key-value into the store func (c *kvClient) Put(key, val []byte, opts ...OpOption) (*PutResponse, error) { op := OpPut(key, val, opts...) + // TODO: buildOpFromOp krs := []*xlineapi.KeyRange{op.toKeyRange()} req := xlineapi.RequestWithToken{ Token: &c.token, @@ -84,12 +85,8 @@ func (c *kvClient) Put(key, val []byte, opts ...OpOption) (*PutResponse, error) PutRequest: op.toPutReq(), }, } - pid, err := c.curpClient.GenProposeID() - if err != nil { - return nil, err - } - cmd := xlineapi.Command{Keys: krs, Request: &req, ProposeId: pid} - res, err := c.curpClient.Propose(&cmd, true) + cmd := &xlineapi.Command{Keys: krs, Request: &req} + res, err := c.curpClient.Propose(cmd, true) if err != nil { return nil, err } @@ -99,6 +96,7 @@ func (c *kvClient) Put(key, val []byte, opts ...OpOption) (*PutResponse, error) // Range a range of keys from the store func (c *kvClient) Range(key []byte, opt ...OpOption) (*RangeResponse, error) { op := OpRange(key, opt...) + // TODO: buildOpFromOp krs := []*xlineapi.KeyRange{op.toKeyRange()} req := xlineapi.RequestWithToken{ Token: &c.token, @@ -106,11 +104,7 @@ func (c *kvClient) Range(key []byte, opt ...OpOption) (*RangeResponse, error) { RangeRequest: op.toRangeReq(), }, } - pid, err := c.curpClient.GenProposeID() - if err != nil { - return nil, err - } - cmd := xlineapi.Command{Keys: krs, Request: &req, ProposeId: pid} + cmd := xlineapi.Command{Keys: krs, Request: &req} res, err := c.curpClient.Propose(&cmd, true) if err != nil { return nil, err @@ -121,6 +115,7 @@ func (c *kvClient) Range(key []byte, opt ...OpOption) (*RangeResponse, error) { // Delete a range of keys from the store func (c *kvClient) Delete(key string, opts ...OpOption) (*DeleteResponse, error) { op := OpDelete(key, opts...) + // TODO: buildOpFromOp krs := []*xlineapi.KeyRange{op.toKeyRange()} req := xlineapi.RequestWithToken{ Token: &c.token, @@ -128,11 +123,7 @@ func (c *kvClient) Delete(key string, opts ...OpOption) (*DeleteResponse, error) DeleteRangeRequest: op.toDeleteReq(), }, } - pid, err := c.curpClient.GenProposeID() - if err != nil { - return nil, err - } - cmd := xlineapi.Command{Keys: krs, Request: &req, ProposeId: pid} + cmd := xlineapi.Command{Keys: krs, Request: &req} res, err := c.curpClient.Propose(&cmd, false) if err != nil { return nil, err @@ -152,17 +143,14 @@ func (c *kvClient) Txn() Txn { func (c *kvClient) Compact(rev int64, opts ...OpOption) (*CompactResponse, error) { r := OpCompact(rev, opts...).toCompactReq() useFastPath := r.Physical + // TODO: buildOpFromOp req := xlineapi.RequestWithToken{ Token: &c.token, RequestWrapper: &xlineapi.RequestWithToken_CompactionRequest{ CompactionRequest: r, }, } - pid, err := c.curpClient.GenProposeID() - if err != nil { - return nil, err - } - cmd := xlineapi.Command{Request: &req, ProposeId: pid} + cmd := xlineapi.Command{Request: &req} res, err := c.curpClient.Propose(&cmd, useFastPath) if err != nil { return nil, err diff --git a/client/lease.go b/client/lease.go index 7ed1ef7..fe052ab 100644 --- a/client/lease.go +++ b/client/lease.go @@ -18,7 +18,7 @@ import ( "context" "time" - "github.com/xline-kv/go-xline/api/xline" + "github.com/xline-kv/go-xline/api/gen/xline" "github.com/xline-kv/go-xline/xlog" "go.uber.org/zap" "google.golang.org/grpc" @@ -101,11 +101,7 @@ func (c *leaseClient) Grant(ttl int64, opts ...LeaseOption) (*LeaseGrantResponse LeaseGrantRequest: request, }, } - pid, err := c.curpClient.GenProposeID() - if err != nil { - return nil, err - } - cmd := xlineapi.Command{Request: &requestWithToken, ProposeId: pid} + cmd := xlineapi.Command{Request: &requestWithToken} res, err := c.curpClient.Propose(&cmd, true) if err != nil { @@ -122,11 +118,7 @@ func (c *leaseClient) Revoke(id int64) (*LeaseRevokeResponse, error) { LeaseRevokeRequest: &xlineapi.LeaseRevokeRequest{ID: id}, }, } - pid, err := c.curpClient.GenProposeID() - if err != nil { - return nil, err - } - cmd := xlineapi.Command{Request: &requestWithToken, ProposeId: pid} + cmd := xlineapi.Command{Request: &requestWithToken} res, err := c.curpClient.Propose(&cmd, true) if err != nil { @@ -213,11 +205,7 @@ func (c *leaseClient) Leases() (*LeaseLeasesResponse, error) { LeaseLeasesRequest: &xlineapi.LeaseLeasesRequest{}, }, } - pid, err := c.curpClient.GenProposeID() - if err != nil { - return nil, err - } - cmd := xlineapi.Command{Request: &requestWithToken, ProposeId: pid} + cmd := xlineapi.Command{Request: &requestWithToken} res, err := c.curpClient.Propose(&cmd, true) if err != nil { diff --git a/client/lock.go b/client/lock.go index f182c8b..fc3bbe5 100644 --- a/client/lock.go +++ b/client/lock.go @@ -6,7 +6,7 @@ import ( "fmt" "math" - "github.com/xline-kv/go-xline/api/xline" + "github.com/xline-kv/go-xline/api/gen/xline" "github.com/xline-kv/go-xline/xlog" "go.uber.org/zap" ) @@ -69,13 +69,8 @@ func (c *lockClient) lockInner( TxnRequest: &txn, }, } - pid, err := c.curpClient.GenProposeID() - if err != nil { - return nil, err - } cmd := xlineapi.Command{ - Request: &requestWithToken, - ProposeId: pid, + Request: &requestWithToken, } res, err := c.curpClient.Propose(&cmd, false) if err != nil { @@ -108,11 +103,7 @@ func (c *lockClient) lockInner( RangeRequest: &rangeReq, }, } - pid, err := c.curpClient.GenProposeID() - if err != nil { - return nil, err - } - cmd := xlineapi.Command{Request: &requestWithToken, ProposeId: pid} + cmd := xlineapi.Command{Request: &requestWithToken} res, err := c.curpClient.Propose(&cmd, true) if err == nil { @@ -240,11 +231,7 @@ func (c *lockClient) waitDelete(pfx string, myRev int64) { RangeRequest: &getReq, }, } - pid, err := c.curpClient.GenProposeID() - if err != nil { - return - } - cmd := xlineapi.Command{Request: &requestWithToken, ProposeId: pid} + cmd := xlineapi.Command{Request: &requestWithToken} res, err := c.curpClient.Propose(&cmd, false) if err != nil { @@ -297,11 +284,7 @@ func (c *lockClient) deleteKey(key []byte) (*xlineapi.ResponseHeader, error) { DeleteRangeRequest: &delReq, }, } - pid, err := c.curpClient.GenProposeID() - if err != nil { - return nil, err - } - cmd := xlineapi.Command{Request: &requestWithToken, ProposeId: pid} + cmd := xlineapi.Command{Request: &requestWithToken} res, err := c.curpClient.Propose(&cmd, true) if err != nil { diff --git a/client/maintenance.go b/client/maintenance.go index 615c664..5ea8b69 100644 --- a/client/maintenance.go +++ b/client/maintenance.go @@ -3,7 +3,7 @@ package client import ( "context" - "github.com/xline-kv/go-xline/api/xline" + "github.com/xline-kv/go-xline/api/gen/xline" "google.golang.org/grpc" ) diff --git a/client/op.go b/client/op.go index 0370bce..b35e405 100644 --- a/client/op.go +++ b/client/op.go @@ -14,7 +14,7 @@ package client -import "github.com/xline-kv/go-xline/api/xline" +import "github.com/xline-kv/go-xline/api/gen/xline" // Op represents an Operation that kv can execute. type Op struct { diff --git a/client/protocol.go b/client/protocol.go index b07d1eb..77fd352 100644 --- a/client/protocol.go +++ b/client/protocol.go @@ -21,8 +21,8 @@ import ( "math/rand" "time" - curpapi "github.com/xline-kv/go-xline/api/curp" - xlineapi "github.com/xline-kv/go-xline/api/xline" + "github.com/xline-kv/go-xline/api/gen/curp" + "github.com/xline-kv/go-xline/api/gen/xline" "github.com/xline-kv/go-xline/xlog" "go.uber.org/zap" "google.golang.org/grpc" @@ -36,7 +36,7 @@ type Curp interface { Propose(cmd *xlineapi.Command, useFastPath bool) (*proposeRes, error) // Generate a propose id - GenProposeID() (*xlineapi.ProposeId, error) + GenProposeID() (*curpapi.ProposeId, error) } // Protocol client @@ -132,11 +132,15 @@ func fastFetchCluster(addrs []string, proposeTimeout time.Duration) (*curpapi.Fe func (c *protocolClient) Propose(cmd *xlineapi.Command, useFastPath bool) (*proposeRes, error) { var res *proposeRes var err error + pid, err := c.GenProposeID() + if err != nil { + return nil, err + } for { if useFastPath { - res, err = c.fastPath(cmd) + res, err = c.fastPath(pid, cmd) } else { - res, err = c.slowPath(cmd) + res, err = c.slowPath(pid, cmd) } if errors.Is(err, ErrWrongClusterVersion) { @@ -155,21 +159,20 @@ func (c *protocolClient) Propose(cmd *xlineapi.Command, useFastPath bool) (*prop } // Fast path of propose -func (c *protocolClient) fastPath(cmd *xlineapi.Command) (*proposeRes, error) { +func (c *protocolClient) fastPath(pid *curpapi.ProposeId, cmd *xlineapi.Command) (*proposeRes, error) { fastCh := make(chan *fastRoundRes) slowCh := make(chan *slowRoundRes) errCh := make(chan error) go func() { - res, err := c.fastRound(cmd) + res, err := c.fastRound(pid, cmd) if err != nil { - errCh <- err return } fastCh <- res }() go func() { - res, err := c.slowRound(cmd) + res, err := c.slowRound(pid, cmd) if err != nil { errCh <- err return @@ -192,16 +195,16 @@ func (c *protocolClient) fastPath(cmd *xlineapi.Command) (*proposeRes, error) { } // Slow path of propose -func (c *protocolClient) slowPath(cmd *xlineapi.Command) (*proposeRes, error) { +func (c *protocolClient) slowPath(pid *curpapi.ProposeId, cmd *xlineapi.Command) (*proposeRes, error) { slowCh := make(chan *slowRoundRes) errCh := make(chan error) go func() { // nolint: errcheck - c.fastRound(cmd) + c.fastRound(pid, cmd) }() go func() { - res, err := c.slowRound(cmd) + res, err := c.slowRound(pid, cmd) if err != nil { errCh <- err } @@ -218,8 +221,8 @@ func (c *protocolClient) slowPath(cmd *xlineapi.Command) (*proposeRes, error) { // The fast round of Curp protocol // It broadcast the requests to all the curp servers. -func (c *protocolClient) fastRound(cmd *xlineapi.Command) (*fastRoundRes, error) { - c.logger.Info("fast round started", zap.Any("propose ID", cmd.ProposeId)) +func (c *protocolClient) fastRound(pid *curpapi.ProposeId, cmd *xlineapi.Command) (*fastRoundRes, error) { + c.logger.Info("fast round started", zap.Any("propose ID", pid)) resCh := make(chan *curpapi.ProposeResponse) errCh := make(chan error) @@ -234,6 +237,7 @@ func (c *protocolClient) fastRound(cmd *xlineapi.Command) (*fastRoundRes, error) return nil, err } req := &curpapi.ProposeRequest{ + ProposeId: pid, Command: bcmd, ClusterVersion: c.clusterVersion, } @@ -247,7 +251,6 @@ func (c *protocolClient) fastRound(cmd *xlineapi.Command) (*fastRoundRes, error) res, err := protocolClient.Propose(ctx, req) if err != nil { errCh <- err - return } resCh <- res }() @@ -256,34 +259,29 @@ func (c *protocolClient) fastRound(cmd *xlineapi.Command) (*fastRoundRes, error) for i := 0; i < len(c.connects); i++ { select { case res := <-resCh: - c.state.checkAndUpdate(*res.LeaderId, res.Term) - switch pr := res.ExeResult.(type) { - case *curpapi.ProposeResponse_Result: - okCnt++ - switch cr := pr.Result.Result.(type) { - case *curpapi.CmdResult_Ok: - if isLeaderOK { - panic("should not set exe result twice") - } - isLeaderOK = true - err := proto.Unmarshal(cr.Ok, &exeResult) - if err != nil { - panic(err) - } - case *curpapi.CmdResult_Error: - err := proto.Unmarshal(cr.Error, &exeErr) - if err != nil { - panic(err) - } - c.logger.Info("fast round failed", zap.Any("propose ID", cmd.ProposeId)) - return nil, &CommandError{err: &exeErr} + okCnt++ + if res.GetResult() == nil { + continue + } + switch cmdRes := res.GetResult().Result.(type) { + case *curpapi.CmdResult_Ok: + if isLeaderOK { + panic("should not set exe result twice") } - case *curpapi.ProposeResponse_Error: - c.logger.Warn("propose fail", zap.Error(errors.New(pr.Error.String())), zap.Any("propose ID", cmd.ProposeId)) - default: - okCnt++ + isLeaderOK = true + err := proto.Unmarshal(cmdRes.Ok, &exeResult) + if err != nil { + panic(err) + } + case *curpapi.CmdResult_Error: + err := proto.Unmarshal(cmdRes.Error, &exeErr) + if err != nil { + panic(err) + } + return nil, &CommandError{err: &exeErr} } case err := <-errCh: + // TODO: error handling c.logger.Warn("propose fail", zap.Error(err)) if fromErr, ok := status.FromError(err); ok { msg := fromErr.Message() @@ -296,17 +294,17 @@ func (c *protocolClient) fastRound(cmd *xlineapi.Command) (*fastRoundRes, error) } if okCnt >= superQuorum && isLeaderOK { - c.logger.Info("fast round succeeded", zap.Any("propose ID", cmd.ProposeId)) + c.logger.Info("fast round succeeded", zap.Any("propose ID", pid)) return &fastRoundRes{er: &exeResult, isSucc: true}, nil } - c.logger.Info("fast round failed", zap.Any("propose ID", cmd.ProposeId)) - return &fastRoundRes{er: &exeResult, isSucc: false}, nil + c.logger.Info("fast round failed", zap.Any("propose ID", pid)) + return &fastRoundRes{isSucc: false}, nil } // The slow round of Curp protocol -func (c *protocolClient) slowRound(cmd *xlineapi.Command) (*slowRoundRes, error) { - c.logger.Info("slow round started", zap.Any("propose ID", cmd.ProposeId)) +func (c *protocolClient) slowRound(pid *curpapi.ProposeId, cmd *xlineapi.Command) (*slowRoundRes, error) { + c.logger.Info("slow round started", zap.Any("propose ID", pid)) var asr xlineapi.SyncResponse var er xlineapi.CommandResponse @@ -324,10 +322,7 @@ func (c *protocolClient) slowRound(cmd *xlineapi.Command) (*slowRoundRes, error) ctx, cancel := context.WithTimeout(context.Background(), c.config.ProposeTimeout) defer cancel() req := &curpapi.WaitSyncedRequest{ - ProposeId: &curpapi.ProposeId{ - ClientId: cmd.ProposeId.ClientId, - SeqNum: cmd.ProposeId.SeqNum, - }, + ProposeId: pid, ClusterVersion: c.clusterVersion, } res, err := protocolClient.WaitSynced(ctx, req) @@ -347,7 +342,7 @@ func (c *protocolClient) slowRound(cmd *xlineapi.Command) (*slowRoundRes, error) if err != nil { panic(err) } - c.logger.Info("slow round failed", zap.Any("propose ID", cmd.ProposeId)) + c.logger.Info("slow round failed", zap.Any("propose ID", pid)) return nil, &CommandError{err: &exeErr} } } @@ -363,12 +358,12 @@ func (c *protocolClient) slowRound(cmd *xlineapi.Command) (*slowRoundRes, error) if err != nil { panic(err) } - c.logger.Info("slow round failed", zap.Any("propose ID", cmd.ProposeId)) + c.logger.Info("slow round failed", zap.Any("propose ID", pid)) return nil, &CommandError{err: &exeErr} } } - c.logger.Info("slow round succeeded", zap.Any("propose ID", cmd.ProposeId)) + c.logger.Info("slow round succeeded", zap.Any("propose ID", pid)) return &slowRoundRes{ asr: &asr, er: &er, @@ -379,13 +374,13 @@ func (c *protocolClient) slowRound(cmd *xlineapi.Command) (*slowRoundRes, error) } // Generate a propose id -func (c *protocolClient) GenProposeID() (*xlineapi.ProposeId, error) { +func (c *protocolClient) GenProposeID() (*curpapi.ProposeId, error) { clientID, err := c.getClientID() if err != nil { return nil, err } seqNum := c.newSeqNum() - return &xlineapi.ProposeId{ + return &curpapi.ProposeId{ ClientId: clientID, SeqNum: seqNum, }, nil diff --git a/client/txn.go b/client/txn.go index eb8802f..b2eff61 100644 --- a/client/txn.go +++ b/client/txn.go @@ -17,7 +17,7 @@ package client import ( "sync" - "github.com/xline-kv/go-xline/api/xline" + "github.com/xline-kv/go-xline/api/gen/xline" ) type Txn interface { @@ -130,11 +130,7 @@ func (txn *txn) Commit() (*TxnResponse, error) { TxnRequest: req, }, } - pid, err := txn.curpClient.GenProposeID() - if err != nil { - return nil, err - } - cmd := xlineapi.Command{Keys: krs, Request: &requestWithToken, ProposeId: pid} + cmd := xlineapi.Command{Keys: krs, Request: &requestWithToken} res, err := txn.curpClient.Propose(&cmd, false) if err != nil { return nil, err diff --git a/client/watch.go b/client/watch.go index 78d3f48..4dc8358 100644 --- a/client/watch.go +++ b/client/watch.go @@ -18,7 +18,7 @@ import ( "context" "time" - "github.com/xline-kv/go-xline/api/xline" + "github.com/xline-kv/go-xline/api/gen/xline" "github.com/xline-kv/go-xline/xlog" "go.uber.org/zap" "google.golang.org/grpc" diff --git a/tests/lease_test.go b/tests/lease_test.go index 23b8d3b..dd34bb4 100644 --- a/tests/lease_test.go +++ b/tests/lease_test.go @@ -19,7 +19,7 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/xline-kv/go-xline/api/xline" + "github.com/xline-kv/go-xline/api/gen/xline" "github.com/xline-kv/go-xline/client" "github.com/xline-kv/go-xline/xlog" "go.uber.org/zap/zapcore" diff --git a/tests/lock_test.go b/tests/lock_test.go index 43d0946..f2a4400 100644 --- a/tests/lock_test.go +++ b/tests/lock_test.go @@ -6,7 +6,7 @@ import ( "time" "github.com/stretchr/testify/assert" - "github.com/xline-kv/go-xline/api/xline" + "github.com/xline-kv/go-xline/api/gen/xline" "github.com/xline-kv/go-xline/client" "github.com/xline-kv/go-xline/xlog" "go.uber.org/zap/zapcore"