Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Kafka input, json payload #26833

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions filebeat/docs/inputs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ Configuration options for Kerberos authentication.

See <<configuration-kerberos>> for more information.

===== `payload_type`

This configures how the input will handle the payload. Defaults to `"string"` which puts the payload into message field. Other option `json` will attempt to parse the payload and merge the structure with the top level of the event.

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

Expand Down
15 changes: 15 additions & 0 deletions filebeat/input/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type kafkaInputConfig struct {
Username string `config:"username"`
Password string `config:"password"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
PayloadType string `config:"payload_type"`
}

type kafkaFetch struct {
Expand Down Expand Up @@ -127,6 +128,7 @@ func defaultConfig() kafkaInputConfig {
MaxRetries: 4,
RetryBackoff: 2 * time.Second,
},
PayloadType: "string",
}
}

Expand All @@ -143,6 +145,10 @@ func (c *kafkaInputConfig) Validate() error {
if c.Username != "" && c.Password == "" {
return fmt.Errorf("password must be set when username is configured")
}

if !stringInSlice(c.PayloadType, []string{"string", "json"}) {
return fmt.Errorf("invalid value for payload_type: %s, supported values are: string, json", c.PayloadType)
}
return nil
}

Expand Down Expand Up @@ -272,3 +278,12 @@ func (is *isolationLevel) Unpack(value string) error {
*is = isolationLevel
return nil
}

func stringInSlice(str string, list []string) bool {
for _, v := range list {
if v == str {
return true
}
}
return false
}
144 changes: 106 additions & 38 deletions filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (input *kafkaInput) runConsumerGroup(
outlet: input.outlet,
// expandEventListFromField will be assigned the configuration option expand_event_list_from_field
expandEventListFromField: input.config.ExpandEventListFromField,
payloadType: input.config.PayloadType,
log: input.log,
}

Expand Down Expand Up @@ -241,6 +242,7 @@ type groupHandler struct {
// if the fileset using this input expects to receive multiple messages bundled under a specific field then this value is assigned
// ex. in this case are the azure fielsets where the events are found under the json object "records"
expandEventListFromField string
payloadType string
log *logp.Logger
}

Expand Down Expand Up @@ -275,30 +277,15 @@ func (h *groupHandler) createEvents(
kafkaFields["headers"] = arrayForKafkaHeaders(message.Headers)
}

// if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed
var events []beat.Event
var messages []string
if h.expandEventListFromField == "" {
messages = []string{string(message.Value)}
} else {
messages = h.parseMultipleMessages(message.Value)
switch h.payloadType {
case "string":
return h.parseStringPayload(message, timestamp, kafkaFields)
default:
return h.parseStringPayload(message, timestamp, kafkaFields)
case "json":
return h.parseJsonPayload(message, timestamp, kafkaFields)
}
for _, msg := range messages {
event := beat.Event{
Timestamp: timestamp,
Fields: common.MapStr{
"message": msg,
"kafka": kafkaFields,
},
Private: eventMeta{
handler: h,
message: message,
},
}
events = append(events, event)

}
return events
}

func (h *groupHandler) Setup(session sarama.ConsumerGroupSession) error {
Expand Down Expand Up @@ -335,24 +322,105 @@ func (h *groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sara
return nil
}

// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration
func (h *groupHandler) parseMultipleMessages(bMessage []byte) []string {
var obj map[string][]interface{}
err := json.Unmarshal(bMessage, &obj)
if err != nil {
h.log.Errorw(fmt.Sprintf("Kafka desirializing multiple messages using the group object %s", h.expandEventListFromField), "error", err)
return []string{}
func (h *groupHandler) parseStringPayload(message *sarama.ConsumerMessage, timestamp time.Time, kafkaFields common.MapStr) []beat.Event {
// if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed
if h.expandEventListFromField == "" {
return []beat.Event{
{
Timestamp: timestamp,
Fields: common.MapStr{
"message": string(message.Value),
"kafka": kafkaFields,
},
Private: eventMeta{
handler: h,
message: message,
},
},
}
} else {
// try to split the message into multiple ones based on the group field provided by the configuration
var obj map[string][]common.MapStr
err := json.Unmarshal(message.Value, &obj)
if err != nil {
h.log.Errorw(fmt.Sprintf("Kafka deserializing multiple messages using the group object %s", h.expandEventListFromField), "error", err)
return []beat.Event{}
}
var events []beat.Event
if len(obj[h.expandEventListFromField]) > 0 {
for _, ms := range obj[h.expandEventListFromField] {
js, err := json.Marshal(ms)
if err == nil {
event := beat.Event{
Timestamp: timestamp,
Fields: common.MapStr{
"message": string(js),
"kafka": kafkaFields,
},
Private: eventMeta{
handler: h,
message: message,
},
}
events = append(events, event)
} else {
h.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err)
}
}
}
return events
}
var messages []string
if len(obj[h.expandEventListFromField]) > 0 {
for _, ms := range obj[h.expandEventListFromField] {
js, err := json.Marshal(ms)
if err == nil {
messages = append(messages, string(js))
} else {
h.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err)
}

func (h *groupHandler) parseJsonPayload(message *sarama.ConsumerMessage, timestamp time.Time, kafkaFields common.MapStr) []beat.Event {
// if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed
if h.expandEventListFromField == "" {
var obj common.MapStr
err := json.Unmarshal(message.Value, &obj)
if err != nil {
h.log.Errorw(fmt.Sprintf("Kafka deserializing json payload failed"), "error", err)
return []beat.Event{}
}
var fields = common.MapStr{
"kafka": kafkaFields,
}
fields.Update(obj)
return []beat.Event{
{
Timestamp: timestamp,
Fields: fields,
Private: eventMeta{
handler: h,
message: message,
},
},
}
} else {
// try to split the message into multiple ones based on the group field provided by the configuration
var obj map[string][]common.MapStr
err := json.Unmarshal(message.Value, &obj)
if err != nil {
h.log.Errorw(fmt.Sprintf("Kafka deserializing multiple messages using the group object %s", h.expandEventListFromField), "error", err)
return []beat.Event{}
}
var events []beat.Event
if len(obj[h.expandEventListFromField]) > 0 {
for _, ms := range obj[h.expandEventListFromField] {
var fields = common.MapStr{
"kafka": kafkaFields,
}
fields.Update(ms)
event := beat.Event{
Timestamp: timestamp,
Fields: fields,
Private: eventMeta{
handler: h,
message: message,
},
}
events = append(events, event)
}
}
return events
}
return messages
}
Loading