Skip to content

Commit

Permalink
Merge pull request #1898 from mmaslankaprv/fixed-parsing-control-reco…
Browse files Browse the repository at this point in the history
…rd-value

Parsing only known control batches value
  • Loading branch information
d1egoaz authored Mar 11, 2021
2 parents 470106f + e299b55 commit 7285d6c
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 10 deletions.
22 changes: 12 additions & 10 deletions control_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,6 @@ type ControlRecord struct {

func (cr *ControlRecord) decode(key, value packetDecoder) error {
var err error
cr.Version, err = value.getInt16()
if err != nil {
return err
}

cr.CoordinatorEpoch, err = value.getInt32()
if err != nil {
return err
}

// There a version for the value part AND the key part. And I have no idea if they are supposed to match or not
// Either way, all these version can only be 0 for now
cr.Version, err = key.getInt16()
Expand All @@ -55,6 +45,18 @@ func (cr *ControlRecord) decode(key, value packetDecoder) error {
// UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
cr.Type = ControlRecordUnknown
}
// we want to parse value only if we are decoding control record of known type
if cr.Type != ControlRecordUnknown {
cr.Version, err = value.getInt16()
if err != nil {
return err
}

cr.CoordinatorEpoch, err = value.getInt32()
if err != nil {
return err
}
}
return nil
}

Expand Down
66 changes: 66 additions & 0 deletions control_record_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,67 @@
package sarama

import (
"testing"
)

var (
abortTxCtrlRecKey = []byte{
0, 0, // version
0, 0, // TX_ABORT = 0
}
abortTxCtrlRecValue = []byte{
0, 0, // version
0, 0, 0, 10, // coordinator epoch
}
commitTxCtrlRecKey = []byte{
0, 0, // version
0, 1, // TX_COMMIT = 1
}
commitTxCtrlRecValue = []byte{
0, 0, // version
0, 0, 0, 15, // coordinator epoch
}
unknownCtrlRecKey = []byte{
0, 0, // version
0, 128, // UNKNOWN = -1
}
// empty value for unknown record
unknownCtrlRecValue = []byte{}
)

func testDecode(t *testing.T, tp string, key []byte, value []byte) ControlRecord {
controlRecord := ControlRecord{}
err := controlRecord.decode(&realDecoder{raw: key}, &realDecoder{raw: value})
if err != nil {
t.Error("Decoding control record of type " + tp + " failed")
return ControlRecord{}
}
return controlRecord
}

func assertRecordType(t *testing.T, r *ControlRecord, expected ControlRecordType) {
if r.Type != expected {
t.Errorf("control record type mismatch, expected: %v, have %v", expected, r.Type)
}
}

func TestDecodingControlRecords(t *testing.T) {
abortTx := testDecode(t, "abort transaction", abortTxCtrlRecKey, abortTxCtrlRecValue)

assertRecordType(t, &abortTx, ControlRecordAbort)

if abortTx.CoordinatorEpoch != 10 {
t.Errorf("abort tx control record coordinator epoch mismatch")
}

commitTx := testDecode(t, "commit transaction", commitTxCtrlRecKey, commitTxCtrlRecValue)

if commitTx.CoordinatorEpoch != 15 {
t.Errorf("commit tx control record coordinator epoch mismatch")
}
assertRecordType(t, &commitTx, ControlRecordCommit)

unknown := testDecode(t, "unknown", unknownCtrlRecKey, unknownCtrlRecValue)

assertRecordType(t, &unknown, ControlRecordUnknown)
}

0 comments on commit 7285d6c

Please sign in to comment.