Skip to content

Commit

Permalink
Fix mysql parser handling of connection phase (elastic#8139)
Browse files Browse the repository at this point in the history
The mysql protocol parser assumes that transactions can only be started
by the client. This is true once the connection has been negotiated
("command phase"), but not during initial handshake ("connection phase").

This causes parsing problems when a connection is monitored from the
start, as sometimes the connection phase leaves the parser confused on
which side is client.

This patch modifies how client-side is detected, which can only be done
by looking at the destination port.
  • Loading branch information
adriansr committed Sep 7, 2018
1 parent a84e51a commit 50067d7
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 21 deletions.
34 changes: 20 additions & 14 deletions packetbeat/protos/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,18 @@ func (stream *mysqlStream) prepareForNewMessage() {
stream.data = stream.data[stream.parseOffset:]
stream.parseState = mysqlStateStart
stream.parseOffset = 0
stream.isClient = false
stream.message = nil
}

func (mysql *mysqlPlugin) isServerPort(port uint16) bool {
for _, sPort := range mysql.ports {
if uint16(sPort) == port {
return true
}
}
return false
}

func mysqlMessageParser(s *mysqlStream) (bool, bool) {
logp.Debug("mysqldetailed", "MySQL parser called. parseState = %s", s.parseState)

Expand All @@ -229,12 +237,11 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) {
m.seq = hdr[3]
m.typ = hdr[4]

logp.Debug("mysqldetailed", "MySQL Header: Packet length %d, Seq %d, Type=%d", m.packetLength, m.seq, m.typ)
logp.Debug("mysqldetailed", "MySQL Header: Packet length %d, Seq %d, Type=%d isClient=%v", m.packetLength, m.seq, m.typ, s.isClient)

if m.seq == 0 {
if s.isClient {
// starts Command Phase

if m.typ == mysqlCmdQuery {
if m.seq == 0 && m.typ == mysqlCmdQuery {
// parse request
m.isRequest = true
m.start = s.parseOffset
Expand All @@ -245,11 +252,6 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) {
m.ignoreMessage = true
s.parseState = mysqlStateEatMessage
}

if !s.isClient {
s.isClient = true
}

} else if !s.isClient {
// parse response
m.isRequest = false
Expand All @@ -275,7 +277,6 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) {
m.ignoreMessage = true
s.parseState = mysqlStateEatMessage
}

} else {
// something else, not expected
logp.Debug("mysql", "Unexpected MySQL message of type %d received.", m.typ)
Expand Down Expand Up @@ -500,9 +501,14 @@ func (mysql *mysqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple,
}

if priv.data[dir] == nil {
dstPort := tcptuple.DstPort
if dir == 0 {
dstPort = tcptuple.SrcPort
}
priv.data[dir] = &mysqlStream{
data: pkt.Payload,
message: &mysqlMessage{ts: pkt.Ts},
data: pkt.Payload,
message: &mysqlMessage{ts: pkt.Ts},
isClient: mysql.isServerPort(dstPort),
}
} else {
// concatenate bytes
Expand All @@ -521,7 +527,7 @@ func (mysql *mysqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple,
}

ok, complete := mysqlMessageParser(priv.data[dir])
//logp.Debug("mysqldetailed", "mysqlMessageParser returned ok=%b complete=%b", ok, complete)
logp.Debug("mysqldetailed", "mysqlMessageParser returned ok=%v complete=%v", ok, complete)
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
Expand Down
17 changes: 10 additions & 7 deletions packetbeat/protos/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"time"
)

const serverPort = 3306

type eventStore struct {
events []beat.Event
}
Expand All @@ -55,6 +57,7 @@ func mysqlModForTests(store *eventStore) *mysqlPlugin {

var mysql mysqlPlugin
config := defaultConfig
config.Ports = []int{serverPort}
mysql.init(callback, &config)
return &mysql
}
Expand Down Expand Up @@ -82,7 +85,7 @@ func TestMySQLParser_simpleRequest(t *testing.T) {
t.Errorf("Failed to decode hex string")
}

stream := &mysqlStream{data: message, message: new(mysqlMessage)}
stream := &mysqlStream{data: message, message: new(mysqlMessage), isClient: true}

ok, complete := mysqlMessageParser(stream)

Expand Down Expand Up @@ -482,7 +485,7 @@ func testTCPTuple() *common.TCPTuple {
IPLength: 4,
BaseTuple: common.BaseTuple{
SrcIP: net.IPv4(192, 168, 0, 1), DstIP: net.IPv4(192, 168, 0, 2),
SrcPort: 6512, DstPort: 3306,
SrcPort: 6512, DstPort: serverPort,
},
}
t.ComputeHashables()
Expand Down Expand Up @@ -540,17 +543,17 @@ func Test_gap_in_response(t *testing.T) {

private := protos.ProtocolData(new(mysqlPrivateData))

private = mysql.Parse(&req, tcptuple, 0, private)
private = mysql.Parse(&resp, tcptuple, 1, private)
private = mysql.Parse(&req, tcptuple, 1, private)
private = mysql.Parse(&resp, tcptuple, 0, private)

logp.Debug("mysql", "Now sending gap..")

_, drop := mysql.GapInStream(tcptuple, 1, 10, private)
_, drop := mysql.GapInStream(tcptuple, 0, 10, private)
assert.Equal(t, true, drop)

trans := expectTransaction(t, store)
assert.NotNil(t, trans)
assert.Equal(t, trans["notes"], []string{"Packet loss while capturing the response"})
assert.Equal(t, []string{"Packet loss while capturing the response"}, trans["notes"])
}

// Test that loss of data during the request doesn't result in a
Expand All @@ -567,7 +570,7 @@ func Test_gap_in_eat_message(t *testing.T) {
"66726f6d20746573")
assert.Nil(t, err)

stream := &mysqlStream{data: reqData, message: new(mysqlMessage)}
stream := &mysqlStream{data: reqData, message: new(mysqlMessage), isClient: true}
ok, complete := mysqlMessageParser(stream)
assert.Equal(t, true, ok)
assert.Equal(t, false, complete)
Expand Down
Binary file not shown.
22 changes: 22 additions & 0 deletions packetbeat/tests/system/test_0067_mysql_connection_phase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from packetbeat import BaseTest

"""
Tests that the negotiation phase at the beginning of a mysql connection
doesn't leave the parser in a broken state.
"""


class Test(BaseTest):

def test_connection_phase(self):
"""
This tests that requests are still captured from a mysql stream that
starts with the "connection phase" negotiation.
"""
self.render_config_template(
mysql_ports=[3306],
)
self.run_packetbeat(pcap="mysql_connection.pcap")

objs = self.read_output()
assert len(objs) > 0

0 comments on commit 50067d7

Please sign in to comment.