Skip to content

Commit

Permalink
Separate ES client code from ES output code (#16150)
Browse files Browse the repository at this point in the history
* Basic extraction of ES client code from ES output code

* Move test

* Removing duplicate function in monitoring reporter

* Break import cycle

* Guard onConnect callback execution

* Replace use of field with getter

* Moving common bulk API response processing into esclientleg

* Moving API integration tests

* Fixing references in tests

* Adding developer CHANGELOG entry

* Move LoadJSON method

* Move callbacks to own file

* Move client-related constructors into client.go file

* Reducing global logging usage

* Use new constructor in test

* Passing logger in test

* Use logger in test

* Use struct fieldnames when initializing

* Use constructor in test

* Fixing typos

* Replace esclient.ParseProxyURL with generic function in common

* Imports formatting

* Moving more fields from ES output client to esclientleg.Connection

* Moving more fields

* Update test code

* Use new TLS package

* Extracting common test code into eslegtest package

* Replace uses of elasticsearch output client with esclientleg.NewConnection

* Replacing uses of ES output client struct with esclientleg.Connection

* Handle callbacks

* Fixing formatting

* Fixing import cycle

* Fixing import and package name

* Fixing imports

* More fixes

* Breaking import cycle

* Removing unused function

* Adding back missing statement

* Fixing param

* Fixing package name

* Include ES output plugin so it's registered

* Proxy handling

* Let Connection handle ProxyDisable setting

* Only parse proxy field from config if set

* Cast timeout ints

* Parse proxy URL

* Fixing proxy integration test

* Fixing ILM test

* Updating expected request count in test

* Fixing package names

* Lots more refactoring!!!

* Move timeout field

* More fixes

* Adding missing files

* No need to pass HTTP any more

* Simplifying Bulk API usage

* Removing unused code

* Remove bulk state from Connection

* Removing empty file

* Moving Bulk API response streaming parsing code back into ES output package

* Don't make monitoring bulk parsing code use streaming parser

* Replacing old HTTP struct passing

* Removing HTTP use

* Adding build tag

* Fixing up tests

* Allow default scheme to be configurable

* Adding versions to import paths

* Remove redundant check

* Undoing unnecessary heartbeat import change

* Forgot to resolve conflicts

* Fixing imports

* Running go mod tidy

* Revert "Remove redundant check"

This reverts commit c5fde6ff3be765a89c0bc20f9cae8f697d08d47e.

* Fixing args order

* Removing extraneous parameter

* Removing wrong errors package import

* Fixing order of arguments

* Fixing package name

* Instantiating logger for tests

* Making streaming JSON parser private to ES output package

* Detect and try to fix scheme before parsing URL

* Making Connection private to ES output Client

* Update test

* Replace client.Ping() calls with client.Connect() calls in test code

* Updating tests

* Removing usage of ES output from monitoring code!

* Using strings.Index instead of strings.SplitN

* Return default config via function call

* Removing "escape hatch" method to expose underlying connection from ES output client

* Using client connection in tests

* Re-implement Test() method for ES output client
  • Loading branch information
ycombinator authored Mar 6, 2020
1 parent 9718fa2 commit 8b8829e
Show file tree
Hide file tree
Showing 45 changed files with 1,734 additions and 1,365 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- The `libbeat/outputs/transport` package has been moved to `libbeat/common/transport`. {pull}16734[16734]
- The `libbeat/outputs/tls.go` file has been removed. All exported symbols in that file (`libbeat/outputs.*`) are now available as `libbeat/common/tlscommon.*`. {pull}16734[16734]
- The newly generated Beats are using go modules to manage dependencies. {pull}16288[16288]
- Extract Elasticsearch client logic from `outputs/elasticsearch` package into new `esclientleg` package. {pull}16150[16150]

==== Bugfixes

Expand Down
26 changes: 12 additions & 14 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,27 @@ import (
"fmt"
"strings"

"github.com/elastic/beats/v7/libbeat/common/reload"

"github.com/pkg/errors"

fbautodiscover "github.com/elastic/beats/v7/filebeat/autodiscover"
"github.com/elastic/beats/v7/filebeat/channel"
cfg "github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/fileset"
_ "github.com/elastic/beats/v7/filebeat/include"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/registrar"
"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"

fbautodiscover "github.com/elastic/beats/v7/filebeat/autodiscover"
"github.com/elastic/beats/v7/filebeat/channel"
cfg "github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/fileset"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/registrar"

_ "github.com/elastic/beats/v7/filebeat/include"

// Add filebeat level processors
_ "github.com/elastic/beats/v7/filebeat/processor/add_kubernetes_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/decode_csv_fields"
Expand Down Expand Up @@ -150,7 +148,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {

overwritePipelines := true
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
esClient, err := elasticsearch.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -184,7 +182,7 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {

// register pipeline loading to happen every time a new ES connection is
// established
callback := func(esClient *elasticsearch.Client) error {
callback := func(esClient *eslegclient.Connection) error {
return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines)
}
_, err := elasticsearch.RegisterConnectCallback(callback)
Expand Down Expand Up @@ -358,7 +356,7 @@ func (fb *Filebeat) Stop() {
// Create a new pipeline loader (es client) factory
func newPipelineLoaderFactory(esConfig *common.Config) fileset.PipelineLoaderFactory {
pipelineLoaderFactory := func() (fileset.PipelineLoader, error) {
esClient, err := elasticsearch.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig)
if err != nil {
return nil, errors.Wrap(err, "Error creating Elasticsearch client")
}
Expand Down
3 changes: 2 additions & 1 deletion filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
Expand Down Expand Up @@ -134,7 +135,7 @@ func (p *inputsRunner) Start() {
}

// Register callback to try to load pipelines when connecting to ES.
callback := func(esClient *elasticsearch.Client) error {
callback := func(esClient *eslegclient.Connection) error {
return p.moduleRegistry.LoadPipelines(esClient, p.overwritePipelines)
}
p.pipelineCallbackID, err = elasticsearch.RegisterConnectCallback(callback)
Expand Down
41 changes: 31 additions & 10 deletions filebeat/fileset/modules_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch/estest"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/esleg/eslegtest"
)

func makeTestInfo(version string) beat.Info {
Expand All @@ -39,7 +39,7 @@ func makeTestInfo(version string) beat.Info {
}

func TestLoadPipeline(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestLoadPipeline(t *testing.T) {
checkUploadedPipeline(t, client, "describe pipeline 2")
}

func checkUploadedPipeline(t *testing.T, client *elasticsearch.Client, expectedDescription string) {
func checkUploadedPipeline(t *testing.T, client *eslegclient.Connection, expectedDescription string) {
status, response, err := client.Request("GET", "/_ingest/pipeline/my-pipeline-id", "", nil, nil)
assert.NoError(t, err)
assert.Equal(t, 200, status)
Expand All @@ -90,7 +90,7 @@ func checkUploadedPipeline(t *testing.T, client *elasticsearch.Client, expectedD
}

func TestSetupNginx(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestSetupNginx(t *testing.T) {
}

func TestAvailableProcessors(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand All @@ -147,18 +147,18 @@ func TestAvailableProcessors(t *testing.T) {
assert.Contains(t, err.Error(), "ingest-hello")
}

func hasIngest(client *elasticsearch.Client) bool {
func hasIngest(client *eslegclient.Connection) bool {
v := client.GetVersion()
return v.Major >= 5
}

func hasIngestPipelineProcessor(client *elasticsearch.Client) bool {
func hasIngestPipelineProcessor(client *eslegclient.Connection) bool {
v := client.GetVersion()
return v.Major > 6 || (v.Major == 6 && v.Minor >= 5)
}

func TestLoadMultiplePipelines(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestLoadMultiplePipelines(t *testing.T) {
}

func TestLoadMultiplePipelinesWithRollback(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
client := getTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}
Expand Down Expand Up @@ -245,3 +245,24 @@ func TestLoadMultiplePipelinesWithRollback(t *testing.T) {
status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-plain_logs_bad", "", nil, nil)
assert.Equal(t, 404, status)
}

func getTestingElasticsearch(t eslegtest.TestLogger) *eslegclient.Connection {
conn, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{
URL: eslegtest.GetURL(),
Timeout: 0,
})
if err != nil {
t.Fatal(err)
panic(err) // panic in case TestLogger did not stop test
}

conn.Encoder = eslegclient.NewJSONEncoder(nil, false)

err = conn.Connect()
if err != nil {
t.Fatal(err)
panic(err) // panic in case TestLogger did not stop test
}

return conn
}
14 changes: 8 additions & 6 deletions filebeat/fileset/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"

"github.com/stretchr/testify/assert"
)

func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) {
Expand Down Expand Up @@ -87,9 +88,10 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) {
}))
defer testESServer.Close()

testESClient, err := elasticsearch.NewClient(elasticsearch.ClientSettings{
URL: testESServer.URL,
}, nil)
testESClient, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{
URL: testESServer.URL,
Timeout: 90 * time.Second,
})
assert.NoError(t, err)

err = testESClient.Connect()
Expand Down
17 changes: 8 additions & 9 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,10 @@ import (
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/kibana"

"github.com/gofrs/uuid"
errw "github.com/pkg/errors"
"go.uber.org/zap"

sysinfo "github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"
ucfg "github.com/elastic/go-ucfg"

"github.com/elastic/beats/v7/libbeat/api"
"github.com/elastic/beats/v7/libbeat/asset"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -53,8 +47,10 @@ import (
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/common/seccomp"
"github.com/elastic/beats/v7/libbeat/dashboards"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/keystore"
"github.com/elastic/beats/v7/libbeat/kibana"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/logp/configure"
"github.com/elastic/beats/v7/libbeat/management"
Expand All @@ -70,6 +66,9 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher/processing"
svc "github.com/elastic/beats/v7/libbeat/service"
"github.com/elastic/beats/v7/libbeat/version"
sysinfo "github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"
ucfg "github.com/elastic/go-ucfg"
)

// Beat provides the runnable and configurable instance of a beat.
Expand Down Expand Up @@ -496,7 +495,7 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er
if outCfg.Name() != "elasticsearch" {
return fmt.Errorf("Index management requested but the Elasticsearch output is not configured/enabled")
}
esClient, err := elasticsearch.NewConnectedClient(outCfg.Config())
esClient, err := eslegclient.NewConnectedClient(outCfg.Config())
if err != nil {
return err
}
Expand Down Expand Up @@ -799,7 +798,7 @@ func (b *Beat) registerESIndexManagement() error {
}

func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback {
return func(esClient *elasticsearch.Client) error {
return func(esClient *eslegclient.Connection) error {
m := b.IdxSupporter.Manager(idxmgmt.NewESClientHandler(esClient), idxmgmt.BeatsAssets(b.Fields))
return m.Setup(idxmgmt.LoadModeEnabled, idxmgmt.LoadModeEnabled)
}
Expand Down Expand Up @@ -845,7 +844,7 @@ func (b *Beat) clusterUUIDFetchingCallback() (elasticsearch.ConnectCallback, err
elasticsearchRegistry := stateRegistry.NewRegistry("outputs.elasticsearch")
clusterUUIDRegVar := monitoring.NewString(elasticsearchRegistry, "cluster_uuid")

callback := func(esClient *elasticsearch.Client) error {
callback := func(esClient *eslegclient.Connection) error {
var response struct {
ClusterUUID string `json:"cluster_uuid"`
}
Expand Down
4 changes: 3 additions & 1 deletion libbeat/common/transport/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package transport

import "net"
import (
"net"
)

func ConnWrapper(d Dialer, w func(net.Conn) net.Conn) Dialer {
return DialerFunc(func(network, addr string) (net.Conn, error) {
Expand Down
30 changes: 30 additions & 0 deletions libbeat/common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,33 @@ func EncodeURLParams(url string, params url.Values) string {

return strings.Join([]string{url, "?", params.Encode()}, "")
}

type ParseHint func(raw string) string

// ParseURL tries to parse a URL and return the parsed result.
func ParseURL(raw string, hints ...ParseHint) (*url.URL, error) {
if raw == "" {
return nil, nil
}

if len(hints) == 0 {
hints = append(hints, WithDefaultScheme("http"))
}

if strings.Index(raw, "://") == -1 {
for _, hint := range hints {
raw = hint(raw)
}
}

return url.Parse(raw)
}

func WithDefaultScheme(scheme string) ParseHint {
return func(raw string) string {
if !strings.Contains(raw, "://") {
return scheme + "://" + raw
}
return raw
}
}
53 changes: 53 additions & 0 deletions libbeat/common/url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestGetUrl(t *testing.T) {
Expand Down Expand Up @@ -114,3 +115,55 @@ func TestURLParamsEncode(t *testing.T) {
assert.Equal(t, output, urlWithParams)
}
}

func TestParseURL(t *testing.T) {
tests := map[string]struct {
input string
hints []ParseHint
expected string
errorAssertFunc require.ErrorAssertionFunc
}{
"http": {
"http://host:1234/path",
nil,
"http://host:1234/path",
require.NoError,
},
"https": {
"https://host:1234/path",
nil,
"https://host:1234/path",
require.NoError,
},
"no_scheme": {
"host:1234/path",
nil,
"http://host:1234/path",
require.NoError,
},
"default_scheme_https": {
"host:1234/path",
[]ParseHint{WithDefaultScheme("https")},
"https://host:1234/path",
require.NoError,
},
"invalid": {
"foobar:port",
nil,
"",
require.Error,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
u, err := ParseURL(test.input, test.hints...)
test.errorAssertFunc(t, err)
if test.expected != "" {
require.Equal(t, test.expected, u.String())
} else {
require.Nil(t, u)
}
})
}
}
Loading

0 comments on commit 8b8829e

Please sign in to comment.