Skip to content

Commit

Permalink
Adapt packetbeat tcp protocol generator to new publisher pipeline (el…
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored and tsg committed Aug 24, 2017
1 parent 5fde3b2 commit 4f46830
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 27 deletions.
30 changes: 17 additions & 13 deletions packetbeat/scripts/tcp-protocol/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,21 @@ func echo(sock net.Conn) {
Create analyzer skeleton from code generator template.

```
$ cd ${GOPATH}/src/github.com/elastic/beats/packetbeat/protos
$ python ${GOPATH}/src/github.com/elastic/beats/packetbeat/script/create_tcp_protocol.py
$ cd ${GOPATH}/src/github.com/elastic/beats/packetbeat
$ python ${GOPATH}/src/github.com/elastic/beats/packetbeat/scripts/create_tcp_protocol.py
```

Load plugin into packetbeat by adding `_ "github.com/elastic/beats/packetbeat/protos/echo"` to packetbeat import list in `$GOPATH/src/github.com/elastic/beats/packetbeat/main.go`
Load plugin into packetbeat by running `make update`. Or add `_
"github.com/elastic/beats/packetbeat/protos/echo"` to the import list in
`$GOPATH/src/github.com/elastic/beats/packetbeat/include/list.go`.

### 2.2 Standalone beat with protocol analyzer (echo):

Use packetbeat as framework to build custom beat (e.g. for testing) with
selected protocol plugins only. A protocol plugin can still be added to
packetbeat later by copying the final plugin to
`$GOPATH/src/github.com/elastic/beats/packetbeat/protos` and importing module in
`$GOPATH/src/github.com/elastic/beats/packetbeat/main.go`.
`$GOPATH/src/github.com/elastic/beats/packetbeat/include/list.go`.

Create custom beat (e.g. github.com/<username>/pb_echo):

Expand All @@ -134,7 +136,7 @@ import (
"github.com/elastic/beats/packetbeat/beater"
// import supported protocol modules
_ "github.com/urso/pb_echo/protos/echo"
_ "github.com/urso/pb_echo/protos/echo"
)
var Name = "pb_echo"
Expand All @@ -152,7 +154,7 @@ Create protocol analyzer module (use name ‘echo’ for new protocol):
```
$ mkdir proto
$ cd proto
$ python ${GOPATH}/src/github.com/elastic/beats/packetbeat/script/create_tcp_protocol.py
$ python ${GOPATH}/src/github.com/elastic/beats/packetbeat/scripts/create_tcp_protocol.py
```

### 3 Implement application layer analyzer
Expand Down Expand Up @@ -226,7 +228,7 @@ If possible you can use third-party libraries for parsing messages. This might r
### 3.2 Add additional fields to transaction event

```
func (pub *transPub) createEvent(requ, resp *message) common.MapStr {
func (pub *transPub) createEvent(requ, resp *message) beat.Event {
status := common.OK_STATUS
if resp.failed {
status = common.ERROR_STATUS
Expand All @@ -246,8 +248,7 @@ func (pub *transPub) createEvent(requ, resp *message) common.MapStr {
Proc: string(requ.CmdlineTuple.Dst),
}
event := common.MapStr{
"@timestamp": common.Time(requ.Ts),
fields := common.MapStr{
"type": "echo",
"status": status,
"responsetime": responseTime,
Expand All @@ -259,17 +260,20 @@ func (pub *transPub) createEvent(requ, resp *message) common.MapStr {
// add processing notes/errors to event
if len(requ.Notes)+len(resp.Notes) > 0 {
event["notes"] = append(requ.Notes, resp.Notes...)
fields["notes"] = append(requ.Notes, resp.Notes...)
}
if pub.sendRequest {
event["request"] = requ.content
fields["request"] = requ.content
}
if pub.sendResponse {
event["response"] = requ.content
fields["response"] = requ.content
}
return event
return beat.Event{
Timestamp: requ.Ts,
Fields: fields,
}
}
```

Expand Down
25 changes: 14 additions & 11 deletions packetbeat/scripts/tcp-protocol/{protocol}/pub.go.tmpl
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
package {protocol}

import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/packetbeat/publish"

"github.com/elastic/beats/packetbeat/protos"
)

// Transaction Publisher.
type transPub struct {
sendRequest bool
sendResponse bool

results publish.Transactions
results protos.Reporter
}

func (pub *transPub) onTransaction(requ, resp *message) error {
if pub.results == nil {
return nil
}

event := pub.createEvent(requ, resp)
pub.results.PublishTransaction(event)
pub.results(pub.createEvent(requ, resp))
return nil
}

func (pub *transPub) createEvent(requ, resp *message) common.MapStr {
func (pub *transPub) createEvent(requ, resp *message) beat.Event {
status := common.OK_STATUS

// resp_time in milliseconds
Expand All @@ -40,8 +41,7 @@ func (pub *transPub) createEvent(requ, resp *message) common.MapStr {
Proc: string(requ.CmdlineTuple.Dst),
}

event := common.MapStr{
"@timestamp": common.Time(requ.Ts),
fields := common.MapStr{
"type": "{protocol}",
"status": status,
"responsetime": responseTime,
Expand All @@ -53,15 +53,18 @@ func (pub *transPub) createEvent(requ, resp *message) common.MapStr {

// add processing notes/errors to event
if len(requ.Notes)+len(resp.Notes) > 0 {
event["notes"] = append(requ.Notes, resp.Notes...)
fields["notes"] = append(requ.Notes, resp.Notes...)
}

if pub.sendRequest {
// event["request"] =
// fields["request"] =
}
if pub.sendResponse {
// event["response"] =
// fields["response"] =
}

return event
return beat.Event{
Timestamp: requ.Ts,
Fields: fields,
}
}
5 changes: 2 additions & 3 deletions packetbeat/scripts/tcp-protocol/{protocol}/{protocol}.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
"github.com/elastic/beats/packetbeat/publish"
)

// {plugin_type} application level protocol analyzer plugin
Expand Down Expand Up @@ -45,7 +44,7 @@ func init() {
// New create and initializes a new {protocol} protocol analyzer instance.
func New(
testMode bool,
results publish.Transactions,
results protos.Reporter,
cfg *common.Config,
) (protos.Plugin, error) {
p := &{plugin_type}{}
Expand All @@ -62,7 +61,7 @@ func New(
return p, nil
}

func ({plugin_var} *{plugin_type}) init(results publish.Transactions, config *{protocol}Config) error {
func ({plugin_var} *{plugin_type}) init(results protos.Reporter, config *{protocol}Config) error {
if err := {plugin_var}.setFromConfig(config); err != nil {
return err
}
Expand Down

0 comments on commit 4f46830

Please sign in to comment.