Skip to content

Commit

Permalink
errors: export ErrRecordRetries, ErrRecordTimeout
Browse files Browse the repository at this point in the history
There's little reason not to export these errors, and they can be useful
for users to know what type of error they're experiencing when
producing.
  • Loading branch information
twmb committed Sep 21, 2021
1 parent ee8b12d commit cf04997
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 9 deletions.
14 changes: 8 additions & 6 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,18 @@ var (
// Returned when trying to produce a record outside of a transaction.
errNotInTransaction = errors.New("cannot produce record transactionally if not in a transaction")

// Returned when records are unable to be produced and they hit the
// configured record timeout limit.
errRecordTimeout = errors.New("records have timed out before they were able to be produced")

errRecordRetries = errors.New("record failed after being retried too many times")

//////////////
// EXTERNAL //
//////////////

// ErrRecordTimeout is passed to produce promises when records are
// unable to be produced within the RecordDeliveryTimeout.
ErrRecordTimeout = errors.New("records have timed out before they were able to be produced")

// ErrRecordRetries is passed to produce promises when records are
// unable to be produced after RecordRetries attempts.
ErrRecordRetries = errors.New("record failed after being retried too many times")

// ErrMaxBuffered is returned when producing with manual flushing
// enabled and the maximum amount of records are buffered.
ErrMaxBuffered = errors.New("manual flushing is enabled and the maximum amount of records are buffered, cannot buffer more")
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ func (cl *Client) waitUnknownTopic(
case <-cl.ctx.Done():
err = ErrClientClosed
case <-after:
err = errRecordTimeout
err = ErrRecordTimeout
case retriableErr, ok := <-unknown.wait:
if !ok {
cl.cfg.logger.Log(LogLevelInfo, "done waiting for metadata for new topic", "topic", topic)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,9 +1272,9 @@ func (b *recBatch) maybeFailErr(cfg *cfg) error {
}
}
if b.isTimedOut(cfg.recordTimeout) {
return errRecordTimeout
return ErrRecordTimeout
} else if b.tries >= cfg.recordRetries {
return errRecordRetries
return ErrRecordRetries
} else if b.owner.cl.producer.isAborting() {
return ErrAborting
}
Expand Down

0 comments on commit cf04997

Please sign in to comment.