Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Separate ES client code from ES output code (#16150) #17222

Merged
merged 9 commits into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- The disk spool types `spool.Spool` and `spool.Settings` have been renamed to the internal types `spool.diskSpool` and `spool.settings`. {pull}16693[16693]
- `queue.Eventer` has been renamed to `queue.ACKListener` {pull}16691[16691]
- Require logger as first parameter for `outputs.elasticsearch.client#BulkReadItemStatus`. {pull}16761[16761]
- Extract Elasticsearch client logic from `outputs/elasticsearch` package into new `esclientleg` package. {pull}16150[16150]

==== Bugfixes

Expand Down
30 changes: 14 additions & 16 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,29 @@ import (
"fmt"
"strings"

"github.com/elastic/beats/v7/libbeat/common/reload"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

fbautodiscover "github.com/elastic/beats/v7/filebeat/autodiscover"
"github.com/elastic/beats/v7/filebeat/channel"
cfg "github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/fileset"
_ "github.com/elastic/beats/v7/filebeat/include"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/registrar"
"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/kibana"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"

fbautodiscover "github.com/elastic/beats/v7/filebeat/autodiscover"
"github.com/elastic/beats/v7/filebeat/channel"
cfg "github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/fileset"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/registrar"

_ "github.com/elastic/beats/v7/filebeat/include"

// Add filebeat level processors
_ "github.com/elastic/beats/v7/filebeat/processor/add_kubernetes_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/decode_csv_fields"
Expand Down Expand Up @@ -157,7 +155,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {

overwritePipelines := true
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
esClient, err := elasticsearch.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -191,7 +189,7 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {

// register pipeline loading to happen every time a new ES connection is
// established
callback := func(esClient *elasticsearch.Client) error {
callback := func(esClient *eslegclient.Connection) error {
return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines)
}
_, err := elasticsearch.RegisterConnectCallback(callback)
Expand All @@ -211,7 +209,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
}

esConfig := b.Config.Output.Config()
esClient, err := elasticsearch.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig)
if err != nil {
return errors.Errorf("Error creating Elasticsearch client: %v", err)
}
Expand Down Expand Up @@ -273,7 +271,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
return errs.Err()
}

func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *elasticsearch.Client, kibanaClient *kibana.Client) error {
func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *eslegclient.Connection, kibanaClient *kibana.Client) error {
if isElasticsearchLoads(kibanaClient.GetVersion()) {
return reg.LoadML(esClient)
}
Expand Down Expand Up @@ -457,7 +455,7 @@ func (fb *Filebeat) Stop() {
// Create a new pipeline loader (es client) factory
func newPipelineLoaderFactory(esConfig *common.Config) fileset.PipelineLoaderFactory {
pipelineLoaderFactory := func() (fileset.PipelineLoader, error) {
esClient, err := elasticsearch.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig)
if err != nil {
return nil, errors.Wrap(err, "Error creating Elasticsearch client")
}
Expand Down
3 changes: 2 additions & 1 deletion filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
Expand Down Expand Up @@ -138,7 +139,7 @@ func (p *inputsRunner) Start() {
}

// Register callback to try to load pipelines when connecting to ES.
callback := func(esClient *elasticsearch.Client) error {
callback := func(esClient *eslegclient.Connection) error {
return p.moduleRegistry.LoadPipelines(esClient, p.overwritePipelines)
}
p.pipelineCallbackID, err = elasticsearch.RegisterConnectCallback(callback)
Expand Down
41 changes: 31 additions & 10 deletions filebeat/fileset/modules_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch/estest"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/esleg/eslegtest"
)

func TestLoadPipeline(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestLoadPipeline(t *testing.T) {
checkUploadedPipeline(t, client, "describe pipeline 2")
}

func checkUploadedPipeline(t *testing.T, client *elasticsearch.Client, expectedDescription string) {
func checkUploadedPipeline(t *testing.T, client *eslegclient.Connection, expectedDescription string) {
status, response, err := client.Request("GET", "/_ingest/pipeline/my-pipeline-id", "", nil, nil)
assert.NoError(t, err)
assert.Equal(t, 200, status)
Expand All @@ -82,7 +82,7 @@ func checkUploadedPipeline(t *testing.T, client *elasticsearch.Client, expectedD
}

func TestSetupNginx(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestSetupNginx(t *testing.T) {
}

func TestAvailableProcessors(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand All @@ -139,18 +139,18 @@ func TestAvailableProcessors(t *testing.T) {
assert.Contains(t, err.Error(), "ingest-hello")
}

func hasIngest(client *elasticsearch.Client) bool {
func hasIngest(client *eslegclient.Connection) bool {
v := client.GetVersion()
return v.Major >= 5
}

func hasIngestPipelineProcessor(client *elasticsearch.Client) bool {
func hasIngestPipelineProcessor(client *eslegclient.Connection) bool {
v := client.GetVersion()
return v.Major > 6 || (v.Major == 6 && v.Minor >= 5)
}

func TestLoadMultiplePipelines(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestLoadMultiplePipelines(t *testing.T) {
}

func TestLoadMultiplePipelinesWithRollback(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand Down Expand Up @@ -237,3 +237,24 @@ func TestLoadMultiplePipelinesWithRollback(t *testing.T) {
status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-plain_logs_bad", "", nil, nil)
assert.Equal(t, 404, status)
}

func getTestingElasticsearch(t eslegtest.TestLogger) *eslegclient.Connection {
conn, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{
URL: eslegtest.GetURL(),
Timeout: 0,
})
if err != nil {
t.Fatal(err)
panic(err) // panic in case TestLogger did not stop test
}

conn.Encoder = eslegclient.NewJSONEncoder(nil, false)

err = conn.Connect()
if err != nil {
t.Fatal(err)
panic(err) // panic in case TestLogger did not stop test
}

return conn
}
14 changes: 8 additions & 6 deletions filebeat/fileset/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"

"github.com/stretchr/testify/assert"
)

func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) {
Expand Down Expand Up @@ -87,9 +88,10 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) {
}))
defer testESServer.Close()

testESClient, err := elasticsearch.NewClient(elasticsearch.ClientSettings{
URL: testESServer.URL,
}, nil)
testESClient, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{
URL: testESServer.URL,
Timeout: 90 * time.Second,
})
assert.NoError(t, err)

err = testESClient.Connect()
Expand Down
17 changes: 8 additions & 9 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,10 @@ import (
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/kibana"

"github.com/gofrs/uuid"
errw "github.com/pkg/errors"
"go.uber.org/zap"

sysinfo "github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"
ucfg "github.com/elastic/go-ucfg"

"github.com/elastic/beats/v7/libbeat/api"
"github.com/elastic/beats/v7/libbeat/asset"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -54,8 +48,10 @@ import (
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/common/seccomp"
"github.com/elastic/beats/v7/libbeat/dashboards"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/keystore"
"github.com/elastic/beats/v7/libbeat/kibana"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/logp/configure"
"github.com/elastic/beats/v7/libbeat/management"
Expand All @@ -71,6 +67,9 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher/processing"
svc "github.com/elastic/beats/v7/libbeat/service"
"github.com/elastic/beats/v7/libbeat/version"
sysinfo "github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"
ucfg "github.com/elastic/go-ucfg"
)

// Beat provides the runnable and configurable instance of a beat.
Expand Down Expand Up @@ -498,7 +497,7 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er
if outCfg.Name() != "elasticsearch" {
return fmt.Errorf("Index management requested but the Elasticsearch output is not configured/enabled")
}
esClient, err := elasticsearch.NewConnectedClient(outCfg.Config())
esClient, err := eslegclient.NewConnectedClient(outCfg.Config())
if err != nil {
return err
}
Expand Down Expand Up @@ -811,7 +810,7 @@ func (b *Beat) registerESIndexManagement() error {
}

func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback {
return func(esClient *elasticsearch.Client) error {
return func(esClient *eslegclient.Connection) error {
m := b.IdxSupporter.Manager(idxmgmt.NewESClientHandler(esClient), idxmgmt.BeatsAssets(b.Fields))
return m.Setup(idxmgmt.LoadModeEnabled, idxmgmt.LoadModeEnabled)
}
Expand Down Expand Up @@ -857,7 +856,7 @@ func (b *Beat) clusterUUIDFetchingCallback() (elasticsearch.ConnectCallback, err
elasticsearchRegistry := stateRegistry.NewRegistry("outputs.elasticsearch")
clusterUUIDRegVar := monitoring.NewString(elasticsearchRegistry, "cluster_uuid")

callback := func(esClient *elasticsearch.Client) error {
callback := func(esClient *eslegclient.Connection) error {
var response struct {
ClusterUUID string `json:"cluster_uuid"`
}
Expand Down
4 changes: 3 additions & 1 deletion libbeat/common/transport/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package transport

import "net"
import (
"net"
)

func ConnWrapper(d Dialer, w func(net.Conn) net.Conn) Dialer {
return DialerFunc(func(network, addr string) (net.Conn, error) {
Expand Down
30 changes: 30 additions & 0 deletions libbeat/common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,33 @@ func EncodeURLParams(url string, params url.Values) string {

return strings.Join([]string{url, "?", params.Encode()}, "")
}

type ParseHint func(raw string) string

// ParseURL tries to parse a URL and return the parsed result.
func ParseURL(raw string, hints ...ParseHint) (*url.URL, error) {
if raw == "" {
return nil, nil
}

if len(hints) == 0 {
hints = append(hints, WithDefaultScheme("http"))
}

if strings.Index(raw, "://") == -1 {
for _, hint := range hints {
raw = hint(raw)
}
}

return url.Parse(raw)
}

func WithDefaultScheme(scheme string) ParseHint {
return func(raw string) string {
if !strings.Contains(raw, "://") {
return scheme + "://" + raw
}
return raw
}
}
Loading