Skip to content

Commit

Permalink
Compute last timestamp while accounting for system time going backwards
Browse files Browse the repository at this point in the history
  • Loading branch information
ycombinator committed Dec 9, 2019
1 parent 99c01b7 commit e4f9fe9
Showing 1 changed file with 26 additions and 21 deletions.
47 changes: 26 additions & 21 deletions libbeat/processors/add_id/generator/es_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func ESTimeBasedUUIDGenerator() IDGenerator {

var (
sequenceNumber uint32
lastTimestamp time.Time
lastTimestamp uint64
delta uint64
once sync.Once
mac []byte
mu sync.Mutex
Expand Down Expand Up @@ -68,7 +69,7 @@ func initOnce() {
})
}

func nextIDData() (int64, uint32) {
func nextIDData() (uint64, uint32) {
mu.Lock()
defer mu.Unlock()

Expand All @@ -77,35 +78,39 @@ func nextIDData() (int64, uint32) {
// We only use bottom 3 bytes for the sequence number.
s := sequenceNumber & 0xffffff

timestamp := getTimestamp()
if s == 0 {
// Always force the clock to increment whenever sequence number is 0, in case we have a long time-slip backwards.
timestamp.Add(1 * time.Millisecond)
}
lastTimestamp = timestamp

t := timestamp.UnixNano() / 1000 // timestamp in ms-since-epoch
t := timestamp()
return t, s
}

func getTimestamp() time.Time {
// Don't let timestamp go backwards, at least "on our watch" (while this process is running). We are still vulnerable if we are
// shut down, clock goes backwards, and we restart... for this we randomize the sequenceNumber on init to decrease chance of
// collision.
now := time.Now()
// timestamp returns the next timestamp to use, while accounting for system time going
// backwards (e.g. due to a DST change).
func timestamp() uint64 {
now := uint64(time.Now().UnixNano() / 1000)
if lastTimestamp == 0 {
// Last timestamp has not been previously initialized
lastTimestamp = now
return lastTimestamp
}

if lastTimestamp.IsZero() {
return now
// Normally now should be later than lastTimestamp, but if the system time went backwards
// (e.g. due to DST change), we should compute a delta to account for this change, so we always return
// a value that's greater than the last call to this function.
if now < lastTimestamp {
delta = lastTimestamp - now + 1
lastTimestamp = now
return now + delta
}

if lastTimestamp.After(now) {
return lastTimestamp
// If the system time was reset, reset delta as well
if now-lastTimestamp >= delta {
delta = 0
}

return now
lastTimestamp = now
return lastTimestamp
}

func packID(buf []byte, ts int64, seq uint32) {
func packID(buf []byte, ts uint64, seq uint32) {
//// We have auto-generated ids, which are usually used for append-only workloads.
//// So we try to optimize the order of bytes for indexing speed (by having quite
//// unique bytes close to the beginning of the ids so that sorting is fast) and
Expand Down

0 comments on commit e4f9fe9

Please sign in to comment.