Skip to content

Commit

Permalink
feat: error handling
Browse files Browse the repository at this point in the history
Signed-off-by: LingKa <cnfty786@gmail.com>
  • Loading branch information
LingKa28 committed Jan 15, 2024
1 parent e4f18c4 commit 23e0893
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 7 deletions.
19 changes: 18 additions & 1 deletion client/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,21 @@ func (e *CommandError) Error() string {
return fmt.Sprintf("command error: %v", e.err)
}

var ErrWrongClusterVersion = errors.New("wrong cluster version")
var ErrShuttingDown = errors.New("Curp Server is shutting down")
var ErrWrongClusterVersion = errors.New("Wrong cluster version")

type errInternalError struct {
inner error
}

func NewErrInternalError(err error) error {
return &errInternalError{
inner: err,
}

Check warning on line 28 in client/error.go

View check run for this annotation

Codecov / codecov/patch

client/error.go#L25-L28

Added lines #L25 - L28 were not covered by tests
}

func (e *errInternalError) Error() string {
return fmt.Sprintf("Client Internal error: %s", e.inner.Error())

Check warning on line 32 in client/error.go

View check run for this annotation

Codecov / codecov/patch

client/error.go#L31-L32

Added lines #L31 - L32 were not covered by tests
}

var ErrTimeout = errors.New("Request timeout")
102 changes: 96 additions & 6 deletions client/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,15 +281,22 @@ func (c *protocolClient) fastRound(pid *curpapi.ProposeId, cmd *xlineapi.Command
return nil, &CommandError{err: &exeErr}
}
case err := <-errCh:
// TODO: error handling
c.logger.Warn("propose fail", zap.Error(err))
if fromErr, ok := status.FromError(err); ok {
msg := fromErr.Message()
if msg == "wrong cluster version" {
curpErr := curpapi.CurpError{}
dtl := fromErr.Details()
err := proto.Unmarshal(dtl[0].([]byte), &curpErr)
if err != nil {
return nil, err
}
if curpErr.GetShuttingDown() != nil {
return nil, ErrShuttingDown
} else if curpErr.GetWrongClusterVersion() != nil {

Check warning on line 294 in client/protocol.go

View check run for this annotation

Codecov / codecov/patch

client/protocol.go#L286-L294

Added lines #L286 - L294 were not covered by tests
return nil, ErrWrongClusterVersion
} else {
continue

Check warning on line 297 in client/protocol.go

View check run for this annotation

Codecov / codecov/patch

client/protocol.go#L296-L297

Added lines #L296 - L297 were not covered by tests
}
}
return nil, err
}
}

Expand All @@ -311,6 +318,7 @@ func (c *protocolClient) slowRound(pid *curpapi.ProposeId, cmd *xlineapi.Command
var exeErr xlineapi.ExecuteError

retryCnt := c.config.RetryCount
retryTimeout := c.getBackoff()
for i := 0; i < retryCnt; i++ {
leaderID, err := c.getLeaderID()
if err != nil {
Expand All @@ -327,7 +335,39 @@ func (c *protocolClient) slowRound(pid *curpapi.ProposeId, cmd *xlineapi.Command
}
res, err := protocolClient.WaitSynced(ctx, req)
if err != nil {
return nil, err
if fromErr, ok := status.FromError(err); ok {
curpErr := curpapi.CurpError{}
dtl := fromErr.Details()
err := proto.Unmarshal(dtl[0].([]byte), &curpErr)
if err != nil {
return nil, err
}
if curpErr.GetShuttingDown() != nil {
return nil, ErrShuttingDown
} else if curpErr.GetWrongClusterVersion() != nil {
return nil, ErrWrongClusterVersion
} else if curpErr.GetRpcTransport() != nil {
// it's quite likely that the leader has crashed, then we should wait for some time and fetch the leader again
time.Sleep(retryTimeout.nextRetry())
err := c.resendPropose(pid, cmd, nil)
if err != nil {
return nil, err
}
continue
} else if curpErr.GetRedirect() != nil {
newLeader := curpErr.GetRedirect().GetLeaderId()
term := curpErr.GetRedirect().GetTerm()
c.state.checkAndUpdate(newLeader, term)
// resend the propose to the new leader
err := c.resendPropose(pid, cmd, &newLeader)
if err != nil {
return nil, err
}
continue
} else {
return nil, NewErrInternalError(err)
}

Check warning on line 369 in client/protocol.go

View check run for this annotation

Codecov / codecov/patch

client/protocol.go#L338-L369

Added lines #L338 - L369 were not covered by tests
}
}

if res.AfterSyncResult != nil {
Expand Down Expand Up @@ -370,7 +410,57 @@ func (c *protocolClient) slowRound(pid *curpapi.ProposeId, cmd *xlineapi.Command
}, nil
}

return nil, errors.New("slow round timeout")
return nil, ErrTimeout

Check warning on line 413 in client/protocol.go

View check run for this annotation

Codecov / codecov/patch

client/protocol.go#L413

Added line #L413 was not covered by tests
}

// Resend the propose only to the leader. This is used when leader changes and we need to ensure that the propose is received by the new leader.
func (c *protocolClient) resendPropose(pid *curpapi.ProposeId, cmd *xlineapi.Command, newLeader *ServerId) error {
retryTimeout := c.getBackoff()
retryCnt := c.config.RetryCount

for i := 0; i < retryCnt; i++ {
time.Sleep(retryTimeout.nextRetry())

var leaderID ServerId
if newLeader != nil {
leaderID = *newLeader
} else {
res, err := c.fetchLeader()
if err != nil {
return err
}
leaderID = *res

Check warning on line 432 in client/protocol.go

View check run for this annotation

Codecov / codecov/patch

client/protocol.go#L417-L432

Added lines #L417 - L432 were not covered by tests
}

bcmd, err := proto.Marshal(cmd)
if err != nil {
return err
}

Check warning on line 438 in client/protocol.go

View check run for this annotation

Codecov / codecov/patch

client/protocol.go#L435-L438

Added lines #L435 - L438 were not covered by tests

protocolClient := curpapi.NewProtocolClient(c.connects[leaderID])
ctx, cancel := context.WithTimeout(context.Background(), c.config.ProposeTimeout)
defer cancel()
_, err = protocolClient.Propose(ctx, &curpapi.ProposeRequest{ProposeId: pid, Command: bcmd, ClusterVersion: c.clusterVersion})
if err != nil {
if fromErr, ok := status.FromError(err); ok {
curpErr := curpapi.CurpError{}
dtl := fromErr.Details()
err := proto.Unmarshal(dtl[0].([]byte), &curpErr)
if err != nil {
return err
}
if curpErr.GetShuttingDown() != nil {
return ErrShuttingDown
} else if curpErr.GetWrongClusterVersion() != nil {
return ErrWrongClusterVersion
} else {
continue

Check warning on line 457 in client/protocol.go

View check run for this annotation

Codecov / codecov/patch

client/protocol.go#L440-L457

Added lines #L440 - L457 were not covered by tests
}
}
}
}

return ErrTimeout

Check warning on line 463 in client/protocol.go

View check run for this annotation

Codecov / codecov/patch

client/protocol.go#L463

Added line #L463 was not covered by tests
}

// Generate a propose id
Expand Down

0 comments on commit 23e0893

Please sign in to comment.