Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #16215 to 7.x: azure-event hub: improve error handling and stop input if the event has not been processed correctly #17195

Merged
merged 2 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add ingress nginx controller fileset {pull}16197[16197]
- move create-[module,fileset,fields] to mage and enable in x-pack/filebeat {pull}15836[15836]
- Add ECS tls and categorization fields to apache module. {issue}16032[16032] {pull}16121[16121]
- Work on e2e ACK's for the azure-eventhub input {issue}15671[15671] {pull}16215[16215]
- Add MQTT input. {issue}15602[15602] {pull}16204[16204]
- Add a TLS test and more debug output to httpjson input {pull}16315[16315]
- Add an SSL config example in config.yml for filebeat MISP module. {pull}16320[16320]
Expand Down
2 changes: 1 addition & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ License type (autodetected): MIT

--------------------------------------------------------------------
Dependency: github.com/Azure/azure-event-hubs-go/v3
Version: v3.1.0
Version: v3.1.2
License type (autodetected): MIT
./vendor/github.com/Azure/azure-event-hubs-go/v3/LICENSE:
--------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
code.cloudfoundry.org/go-diodes v0.0.0-20190809170250-f77fb823c7ee // indirect
code.cloudfoundry.org/go-loggregator v7.4.0+incompatible
code.cloudfoundry.org/rfc5424 v0.0.0-20180905210152-236a6d29298a // indirect
github.com/Azure/azure-event-hubs-go/v3 v3.1.0
github.com/Azure/azure-event-hubs-go/v3 v3.1.2
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ code.cloudfoundry.org/rfc5424 v0.0.0-20180905210152-236a6d29298a/go.mod h1:tkZo8
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-amqp-common-go/v3 v3.0.0 h1:j9tjcwhypb/jek3raNrwlCIl7iKQYOug7CLpSyBBodc=
github.com/Azure/azure-amqp-common-go/v3 v3.0.0/go.mod h1:SY08giD/XbhTz07tJdpw1SoxQXHPN30+DI3Z04SYqyg=
github.com/Azure/azure-event-hubs-go/v3 v3.1.0 h1:j+/WXzke3PTRu5gAgSpWgWJVfpwIyaedIqqgdgkjAe0=
github.com/Azure/azure-event-hubs-go/v3 v3.1.0/go.mod h1:hR40byNJjKkS74+3RhloPQ8sJ8zFQeJ920Uk3oYY0+k=
github.com/Azure/azure-event-hubs-go/v3 v3.1.2 h1:S/NjCZ1Z2R4rHJd2Hbbad6rIhxJ4lZZebKTsKHweX4A=
github.com/Azure/azure-event-hubs-go/v3 v3.1.2/go.mod h1:hR40byNJjKkS74+3RhloPQ8sJ8zFQeJ920Uk3oYY0+k=
github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-pipeline-go v0.2.1 h1:OLBdZJ3yvOn2MezlWvbrBMTEUQC72zAftRZOMdj5HYo=
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 28 additions & 22 deletions vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/github.com/Azure/azure-event-hubs-go/v3/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ github.com/Azure/azure-amqp-common-go/v3/internal/tracing
github.com/Azure/azure-amqp-common-go/v3/rpc
github.com/Azure/azure-amqp-common-go/v3/sas
github.com/Azure/azure-amqp-common-go/v3/uuid
# github.com/Azure/azure-event-hubs-go/v3 v3.1.0
# github.com/Azure/azure-event-hubs-go/v3 v3.1.2
github.com/Azure/azure-event-hubs-go/v3
github.com/Azure/azure-event-hubs-go/v3/atom
github.com/Azure/azure-event-hubs-go/v3/eph
Expand Down
10 changes: 9 additions & 1 deletion x-pack/filebeat/input/azureeventhub/eph.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azureeventhub

import (
"context"
"errors"
"fmt"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
Expand Down Expand Up @@ -48,8 +49,15 @@ func (a *azureInput) runWithEPH() error {
// register a message handler -- many can be registered
handlerID, err := a.processor.RegisterHandler(a.workerCtx,
func(c context.Context, e *eventhub.Event) error {
var onEventErr error
// partitionID is not yet mapped in the azure-eventhub sdk
return a.processEvents(e, "")
ok := a.processEvents(e, "")
if !ok {
onEventErr = errors.New("OnEvent function returned false. Stopping input worker")
a.log.Debug(onEventErr.Error())
a.Stop()
}
return onEventErr
})
if err != nil {
return err
Expand Down
34 changes: 17 additions & 17 deletions x-pack/filebeat/input/azureeventhub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type azureInput struct {
workerWg sync.WaitGroup // waits on worker goroutine.
processor *eph.EventProcessorHost // eph will be assigned if users have enabled the option
hub *eventhub.Hub // hub will be assigned
ackChannel chan int
}

const (
Expand All @@ -66,14 +67,6 @@ func NewInput(
if err := cfg.Unpack(&config); err != nil {
return nil, errors.Wrapf(err, "reading %s input config", inputName)
}
out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: inputContext.DynamicFields,
},
})
if err != nil {
return nil, err
}

inputCtx, cancelInputCtx := context.WithCancel(context.Background())
go func() {
Expand All @@ -88,17 +81,24 @@ func NewInput(
// to be recreated with each restart.
workerCtx, workerCancel := context.WithCancel(inputCtx)

input := &azureInput{
in := &azureInput{
config: config,
log: logp.NewLogger(fmt.Sprintf("%s input", inputName)).With("connection string", config.ConnectionString),
outlet: out,
context: inputContext,
workerCtx: workerCtx,
workerCancel: workerCancel,
}

input.log.Infof("Initialized %s input.", inputName)
return input, nil
out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: inputContext.DynamicFields,
},
})
if err != nil {
return nil, err
}
in.outlet = out
in.log.Infof("Initialized %s input.", inputName)
return in, nil
}

// Run starts the input worker then returns. Only the first invocation
Expand Down Expand Up @@ -176,7 +176,7 @@ func (a *azureInput) Wait() {
a.Stop()
}

func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) error {
func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bool {
timestamp := time.Now()
azure := common.MapStr{
// partitionID is only mapped in the non-eph option which is not available yet, this field will be temporary unavailable
Expand All @@ -195,12 +195,13 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) er
"message": msg,
"azure": azure,
},
Private: event.Data,
})
if !ok {
return errors.New("event has not been sent")
return ok
}
}
return nil
return true
}

// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration
Expand All @@ -209,7 +210,6 @@ func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
err := json.Unmarshal(bMessage, &obj)
if err != nil {
a.log.Errorw(fmt.Sprintf("deserializing multiple messages using the group object `records`"), "error", err)
return []string{string(bMessage)}
}
var messages []string
if len(obj[expandEventListFromField]) > 0 {
Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/azureeventhub/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func TestProcessEvents(t *testing.T) {
Data: []byte(msg),
SystemProperties: &properties,
}
err = input.processEvents(&ev, "0")
if err != nil {
t.Fatal(err)
ok := input.processEvents(&ev, "0")
if !ok {
t.Fatal("OnEvent function returned false")
}
assert.Equal(t, len(o.Events), 1)
message, err := o.Events[0].Fields.GetValue("message")
Expand Down