Skip to content

Commit

Permalink
Improve ES output error insights (#25825)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored May 28, 2021
1 parent e5b976a commit 0ec13ab
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Added new `rate_limit` processor for enforcing rate limits on event throughput. {pull}22883[22883]
- Allow node/namespace metadata to be disabled on kubernetes metagen and ensure add_kubernetes_metadata honors host {pull}23012[23012]
- Add support for defining explicitly named dynamic templates without path/type match criteria {pull}25422[25422]
- Improve ES output error insights. {pull}25825[25825]

*Auditbeat*

Expand Down
60 changes: 60 additions & 0 deletions libbeat/common/transport/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package transport

import (
"io"
"net"

"github.com/elastic/beats/v7/libbeat/logp"
)

type loggingConn struct {
net.Conn
logger *logp.Logger
}

func LoggingDialer(d Dialer, logger *logp.Logger) Dialer {
return DialerFunc(func(network, addr string) (net.Conn, error) {
logger := logger.With("network", network, "address", addr)
c, err := d.Dial(network, addr)
if err != nil {
logger.Errorf("Error dialing %v", err)
return nil, err
}

logger.Debugf("Completed dialing successfully")
return &loggingConn{c, logger}, nil
})
}

func (l *loggingConn) Read(b []byte) (int, error) {
n, err := l.Conn.Read(b)
if err != nil && err != io.EOF {
l.logger.Debugf("Error reading from connection: %v", err)
}
return n, err
}

func (l *loggingConn) Write(b []byte) (int, error) {
n, err := l.Conn.Write(b)
if err != nil && err != io.EOF {
l.logger.Debugf("Error writing to connection: %v", err)
}
return n, err
}
6 changes: 4 additions & 2 deletions libbeat/common/transport/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package transport

import (
"context"
"io"
"net"
)

Expand All @@ -42,7 +44,7 @@ func StatsDialer(d Dialer, s IOStatser) Dialer {

func (s *statsConn) Read(b []byte) (int, error) {
n, err := s.Conn.Read(b)
if err != nil {
if err != nil && err != io.EOF && err != context.Canceled {
s.stats.ReadError(err)
}
s.stats.ReadBytes(n)
Expand All @@ -51,7 +53,7 @@ func (s *statsConn) Read(b []byte) (int, error) {

func (s *statsConn) Write(b []byte) (int, error) {
n, err := s.Conn.Write(b)
if err != nil {
if err != nil && err != io.EOF && err != context.Canceled {
s.stats.WriteError(err)
}
s.stats.WriteBytes(n)
Expand Down
5 changes: 4 additions & 1 deletion libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
dialer = transport.StatsDialer(dialer, st)
tlsDialer = transport.StatsDialer(tlsDialer, st)
}
logger := logp.NewLogger("esclientleg")
dialer = transport.LoggingDialer(dialer, logger)
tlsDialer = transport.LoggingDialer(tlsDialer, logger)

var encoder BodyEncoder
compression := s.CompressionLevel
Expand Down Expand Up @@ -163,7 +166,7 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
ConnectionSettings: s,
HTTP: httpClient,
Encoder: encoder,
log: logp.NewLogger("esclientleg"),
log: logger,
}

if s.APIKey != "" {
Expand Down

0 comments on commit 0ec13ab

Please sign in to comment.