From 23e089341af23c3d5afa92df093a0a6f59cfbc99 Mon Sep 17 00:00:00 2001 From: LingKa Date: Tue, 16 Jan 2024 01:58:32 +0800 Subject: [PATCH] feat: error handling Signed-off-by: LingKa --- client/error.go | 19 ++++++++- client/protocol.go | 102 ++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 114 insertions(+), 7 deletions(-) diff --git a/client/error.go b/client/error.go index 3094ffc..4bc267d 100644 --- a/client/error.go +++ b/client/error.go @@ -15,4 +15,21 @@ func (e *CommandError) Error() string { return fmt.Sprintf("command error: %v", e.err) } -var ErrWrongClusterVersion = errors.New("wrong cluster version") +var ErrShuttingDown = errors.New("Curp Server is shutting down") +var ErrWrongClusterVersion = errors.New("Wrong cluster version") + +type errInternalError struct { + inner error +} + +func NewErrInternalError(err error) error { + return &errInternalError{ + inner: err, + } +} + +func (e *errInternalError) Error() string { + return fmt.Sprintf("Client Internal error: %s", e.inner.Error()) +} + +var ErrTimeout = errors.New("Request timeout") diff --git a/client/protocol.go b/client/protocol.go index 77fd352..ae56c05 100644 --- a/client/protocol.go +++ b/client/protocol.go @@ -281,15 +281,22 @@ func (c *protocolClient) fastRound(pid *curpapi.ProposeId, cmd *xlineapi.Command return nil, &CommandError{err: &exeErr} } case err := <-errCh: - // TODO: error handling c.logger.Warn("propose fail", zap.Error(err)) if fromErr, ok := status.FromError(err); ok { - msg := fromErr.Message() - if msg == "wrong cluster version" { + curpErr := curpapi.CurpError{} + dtl := fromErr.Details() + err := proto.Unmarshal(dtl[0].([]byte), &curpErr) + if err != nil { + return nil, err + } + if curpErr.GetShuttingDown() != nil { + return nil, ErrShuttingDown + } else if curpErr.GetWrongClusterVersion() != nil { return nil, ErrWrongClusterVersion + } else { + continue } } - return nil, err } } @@ -311,6 +318,7 @@ func (c *protocolClient) slowRound(pid *curpapi.ProposeId, cmd *xlineapi.Command var exeErr xlineapi.ExecuteError retryCnt := c.config.RetryCount + retryTimeout := c.getBackoff() for i := 0; i < retryCnt; i++ { leaderID, err := c.getLeaderID() if err != nil { @@ -327,7 +335,39 @@ func (c *protocolClient) slowRound(pid *curpapi.ProposeId, cmd *xlineapi.Command } res, err := protocolClient.WaitSynced(ctx, req) if err != nil { - return nil, err + if fromErr, ok := status.FromError(err); ok { + curpErr := curpapi.CurpError{} + dtl := fromErr.Details() + err := proto.Unmarshal(dtl[0].([]byte), &curpErr) + if err != nil { + return nil, err + } + if curpErr.GetShuttingDown() != nil { + return nil, ErrShuttingDown + } else if curpErr.GetWrongClusterVersion() != nil { + return nil, ErrWrongClusterVersion + } else if curpErr.GetRpcTransport() != nil { + // it's quite likely that the leader has crashed, then we should wait for some time and fetch the leader again + time.Sleep(retryTimeout.nextRetry()) + err := c.resendPropose(pid, cmd, nil) + if err != nil { + return nil, err + } + continue + } else if curpErr.GetRedirect() != nil { + newLeader := curpErr.GetRedirect().GetLeaderId() + term := curpErr.GetRedirect().GetTerm() + c.state.checkAndUpdate(newLeader, term) + // resend the propose to the new leader + err := c.resendPropose(pid, cmd, &newLeader) + if err != nil { + return nil, err + } + continue + } else { + return nil, NewErrInternalError(err) + } + } } if res.AfterSyncResult != nil { @@ -370,7 +410,57 @@ func (c *protocolClient) slowRound(pid *curpapi.ProposeId, cmd *xlineapi.Command }, nil } - return nil, errors.New("slow round timeout") + return nil, ErrTimeout +} + +// Resend the propose only to the leader. This is used when leader changes and we need to ensure that the propose is received by the new leader. +func (c *protocolClient) resendPropose(pid *curpapi.ProposeId, cmd *xlineapi.Command, newLeader *ServerId) error { + retryTimeout := c.getBackoff() + retryCnt := c.config.RetryCount + + for i := 0; i < retryCnt; i++ { + time.Sleep(retryTimeout.nextRetry()) + + var leaderID ServerId + if newLeader != nil { + leaderID = *newLeader + } else { + res, err := c.fetchLeader() + if err != nil { + return err + } + leaderID = *res + } + + bcmd, err := proto.Marshal(cmd) + if err != nil { + return err + } + + protocolClient := curpapi.NewProtocolClient(c.connects[leaderID]) + ctx, cancel := context.WithTimeout(context.Background(), c.config.ProposeTimeout) + defer cancel() + _, err = protocolClient.Propose(ctx, &curpapi.ProposeRequest{ProposeId: pid, Command: bcmd, ClusterVersion: c.clusterVersion}) + if err != nil { + if fromErr, ok := status.FromError(err); ok { + curpErr := curpapi.CurpError{} + dtl := fromErr.Details() + err := proto.Unmarshal(dtl[0].([]byte), &curpErr) + if err != nil { + return err + } + if curpErr.GetShuttingDown() != nil { + return ErrShuttingDown + } else if curpErr.GetWrongClusterVersion() != nil { + return ErrWrongClusterVersion + } else { + continue + } + } + } + } + + return ErrTimeout } // Generate a propose id