Skip to content

Commit

Permalink
DNS - implementing Parse for Tcp.
Browse files Browse the repository at this point in the history
* Implement DNS over TCP
* Publish Notes when a response fails to decode in Gap and Fin
  • Loading branch information
McStork committed Dec 15, 2015
1 parent 7ec8245 commit e268f01
Show file tree
Hide file tree
Showing 5 changed files with 691 additions and 18 deletions.
205 changes: 193 additions & 12 deletions packetbeat/protos/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
// messages.
//
// Future Additions:
// * Implement TcpProtocolPlugin.
// * Publish a message when packets are received that cannot be decoded.
// * Publish a message when Query packets are received that cannot be decoded.
// * Add EDNS and DNSSEC support (consider using miekg/dns instead
// of gopacket).
// * Consider adding ICMP support to
Expand All @@ -30,6 +29,7 @@ import (
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"

"github.com/tsg/gopacket"
"github.com/tsg/gopacket/layers"
Expand All @@ -45,8 +45,10 @@ const (

// Notes that are added to messages during exceptional conditions.
const (
NonDnsPacketMsg = "Packet's data could not be decoded as DNS."
DuplicateQueryMsg = "Another query with the same DNS ID from this client " +
NonDnsPacketMsg = "Packet's data could not be decoded as DNS."
NonDnsCompleteMsg = "Message's data could not be decoded as DNS."
NonDnsResponsePacketMsg = "Response packet's data could not be decoded as DNS"
DuplicateQueryMsg = "Another query with the same DNS ID from this client " +
"was received so this query was closed without receiving a response."
OrphanedResponseMsg = "Response was received without an associated query."
NoResponse = "No response to this query was received."
Expand All @@ -60,6 +62,8 @@ const (
TransportUdp
)

const DecodeOffset = 2

var TransportNames = []string{
"tcp",
"udp",
Expand Down Expand Up @@ -166,7 +170,7 @@ type DnsMessage struct {
// DnsStream contains DNS data from one side of a TCP transmission. A pair
// of DnsStream's are used to represent the full conversation.
type DnsStream struct {
tcptuple *common.TcpTuple
tcpTuple *common.TcpTuple

data []byte

Expand Down Expand Up @@ -318,7 +322,7 @@ func (dns *Dns) ParseUdp(pkt *protos.Packet) {
logp.Debug("dns", "Parsing packet addressed with %s of length %d.",
pkt.Tuple.String(), len(pkt.Payload))

dnsPkt, err := decodeDnsPacket(pkt.Payload)
dnsPkt, err := decodeDnsData(pkt.Payload)
if err != nil {
// This means that malformed requests or responses are being sent or
// that someone is attempting to the DNS port for non-DNS traffic. Both
Expand All @@ -344,6 +348,10 @@ func (dns *Dns) ParseUdp(pkt *protos.Packet) {
}
}

func (dns *Dns) ConnectionTimeout() time.Duration {
return dns.transactionTimeout
}

func (dns *Dns) receivedDnsRequest(tuple *DnsTuple, msg *DnsMessage) {
logp.Debug("dns", "Processing query. %s", tuple)

Expand Down Expand Up @@ -379,6 +387,11 @@ func (dns *Dns) receivedDnsResponse(tuple *DnsTuple, msg *DnsMessage) {
}

func (dns *Dns) publishTransaction(t *DnsTransaction) {
var offset int
if t.Transport == TransportTcp {
offset = DecodeOffset
}

if dns.results == nil {
return
}
Expand All @@ -402,8 +415,8 @@ func (dns *Dns) publishTransaction(t *DnsTransaction) {
event["dns"] = dnsEvent

if t.Request != nil && t.Response != nil {
event["bytes_in"] = t.Request.Length
event["bytes_out"] = t.Response.Length
event["bytes_in"] = t.Request.Length + offset
event["bytes_out"] = t.Response.Length + offset
event["responsetime"] = int32(t.Response.Ts.Sub(t.ts).Nanoseconds() / 1e6)
event["method"] = dnsOpCodeToString(t.Request.Data.OpCode)
if len(t.Request.Data.Questions) > 0 {
Expand All @@ -424,7 +437,7 @@ func (dns *Dns) publishTransaction(t *DnsTransaction) {
event["response"] = dnsToString(t.Response.Data)
}
} else if t.Request != nil {
event["bytes_in"] = t.Request.Length
event["bytes_in"] = t.Request.Length + offset
event["method"] = dnsOpCodeToString(t.Request.Data.OpCode)
if len(t.Request.Data.Questions) > 0 {
event["query"] = dnsQuestionToString(t.Request.Data.Questions[0])
Expand All @@ -437,7 +450,7 @@ func (dns *Dns) publishTransaction(t *DnsTransaction) {
event["request"] = dnsToString(t.Request.Data)
}
} else if t.Response != nil {
event["bytes_out"] = t.Response.Length
event["bytes_out"] = t.Response.Length + offset
event["method"] = dnsOpCodeToString(t.Response.Data.OpCode)
if len(t.Response.Data.Questions) > 0 {
event["query"] = dnsQuestionToString(t.Response.Data.Questions[0])
Expand Down Expand Up @@ -690,20 +703,188 @@ func nameToString(name []byte) string {
return string(s)
}

// decodeDnsPacket decodes a byte array into a DNS struct. If an error occurs
// decodeDnsData decodes a byte array into a DNS struct. If an error occurs
// then the returnd dns pointer will be nil. This method recovers from panics
// and is concurrency-safe.
func decodeDnsPacket(data []byte) (dns *layers.DNS, err error) {
func decodeDnsData(data []byte) (dns *layers.DNS, err error) {
// Recover from any panics that occur while parsing a packet.
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
}()

d := &layers.DNS{}
err = d.DecodeFromBytes(data, gopacket.NilDecodeFeedback)
if err != nil {
return nil, err
}
return d, nil
}

// TCP implementation

func (dns *Dns) Parse(pkt *protos.Packet, tcpTuple *common.TcpTuple, dir uint8, private protos.ProtocolData) protos.ProtocolData {
defer logp.Recover("DNS ParseTcp")

logp.Debug("dns", "Parsing packet addressed with %s of length %d.",
pkt.Tuple.String(), len(pkt.Payload))

priv := dnsPrivateData{}

if private != nil {
var ok bool
priv, ok = private.(dnsPrivateData)
if !ok {
priv = dnsPrivateData{}
}
}

var payload []byte

// Offset is critical
if len(pkt.Payload) > DecodeOffset {
payload = pkt.Payload[DecodeOffset:]
}

stream := priv.Data[dir]
if stream == nil {
stream = &DnsStream{
tcpTuple: tcpTuple,
data: payload,
message: &DnsMessage{Ts: pkt.Ts, Tuple: pkt.Tuple},
}
} else {
stream.data = append(stream.data, payload...)
if len(stream.data) > tcp.TCP_MAX_DATA_IN_STREAM {
logp.Debug("dns", "Stream data too large, dropping DNS stream")
stream = nil
return priv
}
}

priv.Data[dir] = stream
data, err := decodeDnsData(stream.data)

if err != nil {
logp.Debug("dns", NonDnsCompleteMsg+" addresses %s, length %d",
tcpTuple.String(), len(stream.data))

// wait for decoding with the next segment
return priv
}
dns.messageComplete(tcpTuple, dir, stream, data)

return priv
}

func (dns *Dns) messageComplete(tcpTuple *common.TcpTuple, dir uint8, s *DnsStream, decodedData *layers.DNS) {
dns.handleDns(s.message, tcpTuple, dir, s.data, decodedData)

s.PrepareForNewMessage()
}

func (dns *Dns) handleDns(m *DnsMessage, tcpTuple *common.TcpTuple, dir uint8, data []byte, decodedData *layers.DNS) {
dnsTuple := DnsTupleFromIpPort(&m.Tuple, TransportTcp, decodedData.ID)
m.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcpTuple.IpPort())
m.Data = decodedData
m.Length = len(data)

if decodedData.QR == Query {
dns.receivedDnsRequest(&dnsTuple, m)
} else /* Response */ {
dns.receivedDnsResponse(&dnsTuple, m)
}
}

func (stream *DnsStream) PrepareForNewMessage() {
stream.message = nil
}

func (dns *Dns) ReceivedFin(tcpTuple *common.TcpTuple, dir uint8, private protos.ProtocolData) protos.ProtocolData {
if private == nil {
return private
}
dnsData, ok := private.(dnsPrivateData)
if !ok {
return private
}
if dnsData.Data[dir] == nil {
return dnsData
}
stream := dnsData.Data[dir]
if stream.message != nil {
decodedData, err := decodeDnsData(stream.data)
if err == nil {
dns.messageComplete(tcpTuple, dir, stream, decodedData)
} else /*Failed decode */ {
if dir == tcp.TcpDirectionReverse {
dns.publishDecodeFailureNotes(dnsData)
stream.PrepareForNewMessage()
}
logp.Debug("dns", NonDnsCompleteMsg+" addresses %s, length %d",
tcpTuple.String(), len(stream.data))
}
}

return dnsData
}

func (dns *Dns) GapInStream(tcpTuple *common.TcpTuple, dir uint8, nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {
dnsData, ok := private.(dnsPrivateData)

if !ok {
return private, false
}

stream := dnsData.Data[dir]

if stream == nil || stream.message == nil {
return private, false
}

decodedData, err := decodeDnsData(stream.data)

// Add Notes if the failed stream is the response
if err != nil {
if dir == tcp.TcpDirectionReverse {
dns.publishDecodeFailureNotes(dnsData)
}

// drop the stream because it is binary and it would be rare to have a decodable message later
logp.Debug("dns", NonDnsCompleteMsg+" addresses %s, length %d",
tcpTuple.String(), len(stream.data))
return private, true
}

// publish and ignore the gap. No case should reach this code though ...
dns.messageComplete(tcpTuple, dir, stream, decodedData)
return private, false
}

// Add Notes to the query stream about a failure to decode the response
func (dns *Dns) publishDecodeFailureNotes(dnsData dnsPrivateData) {
streamOrigin := dnsData.Data[tcp.TcpDirectionOriginal]
streamReverse := dnsData.Data[tcp.TcpDirectionReverse]

dataOrigin, err := decodeDnsData(streamOrigin.data)
tupleReverse := streamReverse.message.Tuple

if err == nil {
dnsTupleReverse := DnsTupleFromIpPort(&tupleReverse, TransportTcp, dataOrigin.ID)
hashDnsTupleOrigin := (&dnsTupleReverse).RevHashable()

trans := dns.deleteTransaction(hashDnsTupleOrigin)

if trans == nil { // happens when a Gap is followed by Fin
return
}

trans.Notes = append(trans.Notes, NonDnsResponsePacketMsg)

dns.publishTransaction(trans)
dns.deleteTransaction(hashDnsTupleOrigin)
} else {
logp.Debug("dns", "Unabled to decode response with adresses %s has no associated query", streamReverse.tcpTuple.String())
}
}
Loading

0 comments on commit e268f01

Please sign in to comment.