Skip to content

Commit

Permalink
[7.x] Separate ES client code from ES output code (#16150) (#17222)
Browse files Browse the repository at this point in the history
* Separate ES client code from ES output code (#16150)

* Basic extraction of ES client code from ES output code

* Move test

* Removing duplicate function in monitoring reporter

* Break import cycle

* Guard onConnect callback execution

* Replace use of field with getter

* Moving common bulk API response processing into esclientleg

* Moving API integration tests

* Fixing references in tests

* Adding developer CHANGELOG entry

* Move LoadJSON method

* Move callbacks to own file

* Move client-related constructors into client.go file

* Reducing global logging usage

* Use new constructor in test

* Passing logger in test

* Use logger in test

* Use struct fieldnames when initializing

* Use constructor in test

* Fixing typos

* Replace esclient.ParseProxyURL with generic function in common

* Imports formatting

* Moving more fields from ES output client to esclientleg.Connection

* Moving more fields

* Update test code

* Use new TLS package

* Extracting common test code into eslegtest package

* Replace uses of elasticsearch output client with esclientleg.NewConnection

* Replacing uses of ES output client struct with esclientleg.Connection

* Handle callbacks

* Fixing formatting

* Fixing import cycle

* Fixing import and package name

* Fixing imports

* More fixes

* Breaking import cycle

* Removing unused function

* Adding back missing statement

* Fixing param

* Fixing package name

* Include ES output plugin so it's registered

* Proxy handling

* Let Connection handle ProxyDisable setting

* Only parse proxy field from config if set

* Cast timeout ints

* Parse proxy URL

* Fixing proxy integration test

* Fixing ILM test

* Updating expected request count in test

* Fixing package names

* Lots more refactoring!!!

* Move timeout field

* More fixes

* Adding missing files

* No need to pass HTTP any more

* Simplifying Bulk API usage

* Removing unused code

* Remove bulk state from Connection

* Removing empty file

* Moving Bulk API response streaming parsing code back into ES output package

* Don't make monitoring bulk parsing code use streaming parser

* Replacing old HTTP struct passing

* Removing HTTP use

* Adding build tag

* Fixing up tests

* Allow default scheme to be configurable

* Adding versions to import paths

* Remove redundant check

* Undoing unnecessary heartbeat import change

* Forgot to resolve conflicts

* Fixing imports

* Running go mod tidy

* Revert "Remove redundant check"

This reverts commit c5fde6ff3be765a89c0bc20f9cae8f697d08d47e.

* Fixing args order

* Removing extraneous parameter

* Removing wrong errors package import

* Fixing order of arguments

* Fixing package name

* Instantiating logger for tests

* Making streaming JSON parser private to ES output package

* Detect and try to fix scheme before parsing URL

* Making Connection private to ES output Client

* Update test

* Replace client.Ping() calls with client.Connect() calls in test code

* Updating tests

* Removing usage of ES output from monitoring code!

* Using strings.Index instead of strings.SplitN

* Return default config via function call

* Removing "escape hatch" method to expose underlying connection from ES output client

* Using client connection in tests

* Re-implement Test() method for ES output client

* Adding back missing import / sorting imports

* Removing unused import

* Fixing up developer CHANGELOG

* Clean up rebase

* Rebase cleanup

* Making 7.x specific adaptations (ML setup in Filebeat)

* Updating go.mod and go.sum files

* Running go mod tidy
  • Loading branch information
ycombinator authored Mar 25, 2020
1 parent 96054ee commit b581b17
Show file tree
Hide file tree
Showing 46 changed files with 1,759 additions and 1,369 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- The disk spool types `spool.Spool` and `spool.Settings` have been renamed to the internal types `spool.diskSpool` and `spool.settings`. {pull}16693[16693]
- `queue.Eventer` has been renamed to `queue.ACKListener` {pull}16691[16691]
- Require logger as first parameter for `outputs.elasticsearch.client#BulkReadItemStatus`. {pull}16761[16761]
- Extract Elasticsearch client logic from `outputs/elasticsearch` package into new `esclientleg` package. {pull}16150[16150]

==== Bugfixes

Expand Down
30 changes: 14 additions & 16 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,29 @@ import (
"fmt"
"strings"

"github.com/elastic/beats/v7/libbeat/common/reload"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

fbautodiscover "github.com/elastic/beats/v7/filebeat/autodiscover"
"github.com/elastic/beats/v7/filebeat/channel"
cfg "github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/fileset"
_ "github.com/elastic/beats/v7/filebeat/include"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/registrar"
"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/kibana"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"

fbautodiscover "github.com/elastic/beats/v7/filebeat/autodiscover"
"github.com/elastic/beats/v7/filebeat/channel"
cfg "github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/fileset"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/registrar"

_ "github.com/elastic/beats/v7/filebeat/include"

// Add filebeat level processors
_ "github.com/elastic/beats/v7/filebeat/processor/add_kubernetes_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/decode_csv_fields"
Expand Down Expand Up @@ -157,7 +155,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {

overwritePipelines := true
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
esClient, err := elasticsearch.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -191,7 +189,7 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {

// register pipeline loading to happen every time a new ES connection is
// established
callback := func(esClient *elasticsearch.Client) error {
callback := func(esClient *eslegclient.Connection) error {
return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines)
}
_, err := elasticsearch.RegisterConnectCallback(callback)
Expand All @@ -211,7 +209,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
}

esConfig := b.Config.Output.Config()
esClient, err := elasticsearch.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig)
if err != nil {
return errors.Errorf("Error creating Elasticsearch client: %v", err)
}
Expand Down Expand Up @@ -273,7 +271,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
return errs.Err()
}

func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *elasticsearch.Client, kibanaClient *kibana.Client) error {
func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *eslegclient.Connection, kibanaClient *kibana.Client) error {
if isElasticsearchLoads(kibanaClient.GetVersion()) {
return reg.LoadML(esClient)
}
Expand Down Expand Up @@ -457,7 +455,7 @@ func (fb *Filebeat) Stop() {
// Create a new pipeline loader (es client) factory
func newPipelineLoaderFactory(esConfig *common.Config) fileset.PipelineLoaderFactory {
pipelineLoaderFactory := func() (fileset.PipelineLoader, error) {
esClient, err := elasticsearch.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig)
if err != nil {
return nil, errors.Wrap(err, "Error creating Elasticsearch client")
}
Expand Down
3 changes: 2 additions & 1 deletion filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
Expand Down Expand Up @@ -138,7 +139,7 @@ func (p *inputsRunner) Start() {
}

// Register callback to try to load pipelines when connecting to ES.
callback := func(esClient *elasticsearch.Client) error {
callback := func(esClient *eslegclient.Connection) error {
return p.moduleRegistry.LoadPipelines(esClient, p.overwritePipelines)
}
p.pipelineCallbackID, err = elasticsearch.RegisterConnectCallback(callback)
Expand Down
41 changes: 31 additions & 10 deletions filebeat/fileset/modules_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch/estest"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/esleg/eslegtest"
)

func TestLoadPipeline(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestLoadPipeline(t *testing.T) {
checkUploadedPipeline(t, client, "describe pipeline 2")
}

func checkUploadedPipeline(t *testing.T, client *elasticsearch.Client, expectedDescription string) {
func checkUploadedPipeline(t *testing.T, client *eslegclient.Connection, expectedDescription string) {
status, response, err := client.Request("GET", "/_ingest/pipeline/my-pipeline-id", "", nil, nil)
assert.NoError(t, err)
assert.Equal(t, 200, status)
Expand All @@ -82,7 +82,7 @@ func checkUploadedPipeline(t *testing.T, client *elasticsearch.Client, expectedD
}

func TestSetupNginx(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestSetupNginx(t *testing.T) {
}

func TestAvailableProcessors(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand All @@ -139,18 +139,18 @@ func TestAvailableProcessors(t *testing.T) {
assert.Contains(t, err.Error(), "ingest-hello")
}

func hasIngest(client *elasticsearch.Client) bool {
func hasIngest(client *eslegclient.Connection) bool {
v := client.GetVersion()
return v.Major >= 5
}

func hasIngestPipelineProcessor(client *elasticsearch.Client) bool {
func hasIngestPipelineProcessor(client *eslegclient.Connection) bool {
v := client.GetVersion()
return v.Major > 6 || (v.Major == 6 && v.Minor >= 5)
}

func TestLoadMultiplePipelines(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestLoadMultiplePipelines(t *testing.T) {
}

func TestLoadMultiplePipelinesWithRollback(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand Down Expand Up @@ -237,3 +237,24 @@ func TestLoadMultiplePipelinesWithRollback(t *testing.T) {
status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-plain_logs_bad", "", nil, nil)
assert.Equal(t, 404, status)
}

func getTestingElasticsearch(t eslegtest.TestLogger) *eslegclient.Connection {
conn, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{
URL: eslegtest.GetURL(),
Timeout: 0,
})
if err != nil {
t.Fatal(err)
panic(err) // panic in case TestLogger did not stop test
}

conn.Encoder = eslegclient.NewJSONEncoder(nil, false)

err = conn.Connect()
if err != nil {
t.Fatal(err)
panic(err) // panic in case TestLogger did not stop test
}

return conn
}
14 changes: 8 additions & 6 deletions filebeat/fileset/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"

"github.com/stretchr/testify/assert"
)

func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) {
Expand Down Expand Up @@ -87,9 +88,10 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) {
}))
defer testESServer.Close()

testESClient, err := elasticsearch.NewClient(elasticsearch.ClientSettings{
URL: testESServer.URL,
}, nil)
testESClient, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{
URL: testESServer.URL,
Timeout: 90 * time.Second,
})
assert.NoError(t, err)

err = testESClient.Connect()
Expand Down
17 changes: 8 additions & 9 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,10 @@ import (
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/kibana"

"github.com/gofrs/uuid"
errw "github.com/pkg/errors"
"go.uber.org/zap"

sysinfo "github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"
ucfg "github.com/elastic/go-ucfg"

"github.com/elastic/beats/v7/libbeat/api"
"github.com/elastic/beats/v7/libbeat/asset"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -54,8 +48,10 @@ import (
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/common/seccomp"
"github.com/elastic/beats/v7/libbeat/dashboards"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/keystore"
"github.com/elastic/beats/v7/libbeat/kibana"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/logp/configure"
"github.com/elastic/beats/v7/libbeat/management"
Expand All @@ -71,6 +67,9 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher/processing"
svc "github.com/elastic/beats/v7/libbeat/service"
"github.com/elastic/beats/v7/libbeat/version"
sysinfo "github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"
ucfg "github.com/elastic/go-ucfg"
)

// Beat provides the runnable and configurable instance of a beat.
Expand Down Expand Up @@ -498,7 +497,7 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er
if outCfg.Name() != "elasticsearch" {
return fmt.Errorf("Index management requested but the Elasticsearch output is not configured/enabled")
}
esClient, err := elasticsearch.NewConnectedClient(outCfg.Config())
esClient, err := eslegclient.NewConnectedClient(outCfg.Config())
if err != nil {
return err
}
Expand Down Expand Up @@ -811,7 +810,7 @@ func (b *Beat) registerESIndexManagement() error {
}

func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback {
return func(esClient *elasticsearch.Client) error {
return func(esClient *eslegclient.Connection) error {
m := b.IdxSupporter.Manager(idxmgmt.NewESClientHandler(esClient), idxmgmt.BeatsAssets(b.Fields))
return m.Setup(idxmgmt.LoadModeEnabled, idxmgmt.LoadModeEnabled)
}
Expand Down Expand Up @@ -857,7 +856,7 @@ func (b *Beat) clusterUUIDFetchingCallback() (elasticsearch.ConnectCallback, err
elasticsearchRegistry := stateRegistry.NewRegistry("outputs.elasticsearch")
clusterUUIDRegVar := monitoring.NewString(elasticsearchRegistry, "cluster_uuid")

callback := func(esClient *elasticsearch.Client) error {
callback := func(esClient *eslegclient.Connection) error {
var response struct {
ClusterUUID string `json:"cluster_uuid"`
}
Expand Down
4 changes: 3 additions & 1 deletion libbeat/common/transport/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package transport

import "net"
import (
"net"
)

func ConnWrapper(d Dialer, w func(net.Conn) net.Conn) Dialer {
return DialerFunc(func(network, addr string) (net.Conn, error) {
Expand Down
30 changes: 30 additions & 0 deletions libbeat/common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,33 @@ func EncodeURLParams(url string, params url.Values) string {

return strings.Join([]string{url, "?", params.Encode()}, "")
}

type ParseHint func(raw string) string

// ParseURL tries to parse a URL and return the parsed result.
func ParseURL(raw string, hints ...ParseHint) (*url.URL, error) {
if raw == "" {
return nil, nil
}

if len(hints) == 0 {
hints = append(hints, WithDefaultScheme("http"))
}

if strings.Index(raw, "://") == -1 {
for _, hint := range hints {
raw = hint(raw)
}
}

return url.Parse(raw)
}

func WithDefaultScheme(scheme string) ParseHint {
return func(raw string) string {
if !strings.Contains(raw, "://") {
return scheme + "://" + raw
}
return raw
}
}
Loading

0 comments on commit b581b17

Please sign in to comment.