Skip to content

Commit

Permalink
Merge pull request #981 from DimaGolomozy/fix-sticky-buffer-index
Browse files Browse the repository at this point in the history
fix buffer index number for sticky tcp output connection
  • Loading branch information
buger authored Aug 3, 2021
2 parents 0f2f93a + 524b9df commit 024e126
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
7 changes: 3 additions & 4 deletions output_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,15 @@ func (o *TCPOutput) worker(bufferIndex int) {
}
}

func (o *TCPOutput) getBufferIndex(data []byte) int {
func (o *TCPOutput) getBufferIndex(msg *Message) int {
if !o.config.Sticky {
o.workerIndex++
return int(o.workerIndex) % o.config.Workers
}

hasher := fnv.New32a()
hasher.Write(payloadMeta(data)[1])
hasher.Write(payloadID(msg.Meta))
return int(hasher.Sum32()) % o.config.Workers

}

// PluginWrite writes message to this plugin
Expand All @@ -113,7 +112,7 @@ func (o *TCPOutput) PluginWrite(msg *Message) (n int, err error) {
return len(msg.Data), nil
}

bufferIndex := o.getBufferIndex(msg.Data)
bufferIndex := o.getBufferIndex(msg)
o.buf[bufferIndex] <- msg

if Settings.OutputTCPStats {
Expand Down
9 changes: 5 additions & 4 deletions output_tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ func TestBufferDistribution(t *testing.T) {
}
}

func getTestBytes() []byte {
reqh := payloadHeader(RequestPayload, uuid(), time.Now().UnixNano(), -1)
reqb := append(reqh, []byte("GET / HTTP/1.1\r\nHost: www.w3.org\r\nUser-Agent: Go 1.1 package http\r\nAccept-Encoding: gzip\r\n\r\n")...)
return reqb
func getTestBytes() *Message {
return &Message{
Meta: payloadHeader(RequestPayload, uuid(), time.Now().UnixNano(), -1),
Data: []byte("GET / HTTP/1.1\r\nHost: www.w3.org\r\nUser-Agent: Go 1.1 package http\r\nAccept-Encoding: gzip\r\n\r\n"),
}
}

0 comments on commit 024e126

Please sign in to comment.