Skip to content

Commit

Permalink
Removing usage of ES output from monitoring code!
Browse files Browse the repository at this point in the history
  • Loading branch information
ycombinator committed Mar 5, 2020
1 parent addcfd4 commit 6788cfd
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 31 deletions.
21 changes: 8 additions & 13 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,22 @@ import (
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring/report"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/testing"
)

var createDocPrivAvailableESVersion = common.MustNewVersion("7.5.0")

type publishClient struct {
es *elasticsearch.Client
es *eslegclient.Connection
params map[string]string
format report.Format

log *logp.Logger
}

func newPublishClient(
es *elasticsearch.Client,
es *eslegclient.Connection,
params map[string]string,
format report.Format,
) (*publishClient, error) {
Expand All @@ -70,8 +69,7 @@ func (c *publishClient) Connect() error {
params := map[string]string{
"filter_path": "features.monitoring.enabled",
}
conn := c.es.Connection()
status, body, err := conn.Request("GET", "/_xpack", "", params, nil)
status, body, err := c.es.Request("GET", "/_xpack", "", params, nil)
if err != nil {
return fmt.Errorf("X-Pack capabilities query failed with: %v", err)
}
Expand Down Expand Up @@ -161,12 +159,11 @@ func (c *publishClient) Publish(batch publisher.Batch) error {
}

func (c *publishClient) Test(d testing.Driver) {
conn := c.es.Connection()
conn.Test(d)
c.es.Test(d)
}

func (c *publishClient) String() string {
return "publish(" + c.es.String() + ")"
return "monitoring(" + c.es.URL + ")"
}

func (c *publishClient) publishXPackBulk(params map[string]string, event publisher.Event, typ string) error {
Expand All @@ -185,8 +182,7 @@ func (c *publishClient) publishXPackBulk(params map[string]string, event publish

// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
conn := c.es.Connection()
_, err := conn.SendMonitoringBulk(params, bulk[:])
_, err := c.es.SendMonitoringBulk(params, bulk[:])
return err
}

Expand All @@ -196,8 +192,7 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error {
"_routing": nil,
}

conn := c.es.Connection()
esVersion := conn.GetVersion()
esVersion := c.es.GetVersion()
if esVersion.Major < 7 {
meta["_type"] = "doc"
}
Expand Down Expand Up @@ -238,7 +233,7 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error {

// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
_, result, err := conn.Bulk(getMonitoringIndexName(), "", nil, bulk[:])
_, result, err := c.es.Bulk(getMonitoringIndexName(), "", nil, bulk[:])
if err != nil {
return err
}
Expand Down
30 changes: 12 additions & 18 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import (
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/monitoring/report"
"github.com/elastic/beats/v7/libbeat/outputs"
esout "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/outputs/outil"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
Expand Down Expand Up @@ -337,22 +335,18 @@ func makeClient(
return nil, err
}

esClient, err := esout.NewClient(esout.ClientSettings{
ConnectionSettings: eslegclient.ConnectionSettings{
URL: url,
Proxy: proxyURL,
TLS: tlsConfig,
Username: config.Username,
Password: config.Password,
APIKey: config.APIKey,
Parameters: params,
Headers: config.Headers,
Timeout: config.Timeout,
CompressionLevel: config.CompressionLevel,
},
Index: outil.MakeSelector(outil.ConstSelectorExpr("_xpack")),
Pipeline: nil,
}, nil)
esClient, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{
URL: url,
Proxy: proxyURL,
TLS: tlsConfig,
Username: config.Username,
Password: config.Password,
APIKey: config.APIKey,
Parameters: params,
Headers: config.Headers,
Timeout: config.Timeout,
CompressionLevel: config.CompressionLevel,
})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 6788cfd

Please sign in to comment.