diff --git a/config.go b/config.go index 030ba926e..6c61bfc66 100644 --- a/config.go +++ b/config.go @@ -130,7 +130,7 @@ type Config struct { // Offsets specifies configuration for how and when to commit consumed offsets. This currently requires the // manual use of an OffsetManager but will eventually be automated. Offsets struct { - // How frequently to commit updated offsets. Defaults to 10s. + // How frequently to commit updated offsets. Defaults to 1s. CommitInterval time.Duration // The initial offset to use if no offset was previously committed. Should be OffsetNewest or OffsetOldest. @@ -175,7 +175,7 @@ func NewConfig() *Config { c.Consumer.MaxWaitTime = 250 * time.Millisecond c.Consumer.MaxProcessingTime = 100 * time.Millisecond c.Consumer.Return.Errors = false - c.Consumer.Offsets.CommitInterval = 10 * time.Second + c.Consumer.Offsets.CommitInterval = 1 * time.Second c.Consumer.Offsets.Initial = OffsetNewest c.ChannelBufferSize = 256 diff --git a/offset_manager.go b/offset_manager.go index ca7cd173f..65abcec4c 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -311,9 +311,11 @@ func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) { pom.lock.Lock() defer pom.lock.Unlock() - pom.offset = offset - pom.metadata = metadata - pom.dirty = true + if offset > pom.offset { + pom.offset = offset + pom.metadata = metadata + pom.dirty = true + } } func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {