Skip to content

Commit

Permalink
test: migrate elasticsearch to testcontainers (#11207)
Browse files Browse the repository at this point in the history
  • Loading branch information
powersj authored and MyaLongmire committed Jul 6, 2022
1 parent dd64f20 commit 7cbe4c8
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 33 deletions.
75 changes: 50 additions & 25 deletions plugins/inputs/elasticsearch_query/elasticsearch_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,27 @@ package elasticsearch_query
import (
"bufio"
"context"
"fmt"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/docker/go-connections/nat"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
elastic5 "gopkg.in/olivere/elastic.v5"
)

var (
testindex = "test-elasticsearch_query-" + strconv.Itoa(int(time.Now().Unix()))
setupOnce sync.Once
const (
servicePort = "9200"
testindex = "test-elasticsearch"
)

type esAggregationQueryTest struct {
Expand Down Expand Up @@ -503,7 +505,7 @@ var testEsAggregationData = []esAggregationQueryTest{
},
}

func setupIntegrationTest() error {
func setupIntegrationTest(t *testing.T) (testutil.Container, error) {
type nginxlog struct {
IPaddress string `json:"IP"`
Timestamp time.Time `json:"@timestamp"`
Expand All @@ -515,23 +517,40 @@ func setupIntegrationTest() error {
ResponseTime float64 `json:"response_time"`
}

container := testutil.Container{
Image: "elasticsearch:6.8.23",
ExposedPorts: []string{servicePort},
Env: map[string]string{
"discovery.type": "single-node",
},
WaitingFor: wait.ForAll(
wait.ForLog("] mode [basic] - valid"),
wait.ForListeningPort(nat.Port(servicePort)),
),
}
err := container.Start()
require.NoError(t, err, "failed to start container")

url := fmt.Sprintf(
"http://%s:%s", container.Address, container.Ports[servicePort],
)
e := &ElasticsearchQuery{
URLs: []string{"http://" + testutil.GetLocalHost() + ":9200"},
URLs: []string{url},
Timeout: config.Duration(time.Second * 30),
Log: testutil.Logger{},
}

err := e.connectToES()
err = e.connectToES()
if err != nil {
return err
return container, err
}

bulkRequest := e.esClient.Bulk()

// populate elasticsearch with nginx_logs test data file
file, err := os.Open("testdata/nginx_logs")
if err != nil {
return err
return container, err
}

defer file.Close()
Expand Down Expand Up @@ -560,42 +579,45 @@ func setupIntegrationTest() error {
Doc(logline))
}
if scanner.Err() != nil {
return err
return container, err
}

_, err = bulkRequest.Do(context.Background())
if err != nil {
return err
return container, err
}

// force elastic to refresh indexes to get new batch data
ctx := context.Background()
_, err = e.esClient.Refresh().Do(ctx)
if err != nil {
return err
return container, err
}

return nil
return container, nil
}

func TestElasticsearchQuery(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

setupOnce.Do(func() {
err := setupIntegrationTest()
require.NoError(t, err)
})
container, err := setupIntegrationTest(t)
require.NoError(t, err)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

var acc testutil.Accumulator
e := &ElasticsearchQuery{
URLs: []string{"http://" + testutil.GetLocalHost() + ":9200"},
URLs: []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
},
Timeout: config.Duration(time.Second * 30),
Log: testutil.Logger{},
}

err := e.connectToES()
err = e.connectToES()
require.NoError(t, err)

var aggs []esAggregation
Expand Down Expand Up @@ -641,23 +663,26 @@ func TestElasticsearchQuery_getMetricFields(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}

setupOnce.Do(func() {
err := setupIntegrationTest()
require.NoError(t, err)
})
container, err := setupIntegrationTest(t)
require.NoError(t, err)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

type args struct {
ctx context.Context
aggregation esAggregation
}

e := &ElasticsearchQuery{
URLs: []string{"http://" + testutil.GetLocalHost() + ":9200"},
URLs: []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
},
Timeout: config.Duration(time.Second * 30),
Log: testutil.Logger{},
}

err := e.connectToES()
err = e.connectToES()
require.NoError(t, err)

type test struct {
Expand Down
95 changes: 87 additions & 8 deletions plugins/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,55 @@ package elasticsearch

import (
"context"
"fmt"
"math"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
)

const servicePort = "9200"

func launchTestContainer(t *testing.T) *testutil.Container {
container := testutil.Container{
Image: "elasticsearch:6.8.23",
ExposedPorts: []string{servicePort},
Env: map[string]string{
"discovery.type": "single-node",
},
WaitingFor: wait.ForAll(
wait.ForLog("] mode [basic] - valid"),
wait.ForListeningPort(nat.Port(servicePort)),
),
}
err := container.Start()
require.NoError(t, err, "failed to start container")

return &container
}

func TestConnectAndWriteIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}

e := &Elasticsearch{
URLs: urls,
Expand Down Expand Up @@ -49,7 +79,14 @@ func TestConnectAndWriteMetricWithNaNValueEmpty(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}

urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}

e := &Elasticsearch{
URLs: urls,
Expand Down Expand Up @@ -85,7 +122,14 @@ func TestConnectAndWriteMetricWithNaNValueNone(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}

urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}

e := &Elasticsearch{
URLs: urls,
Expand Down Expand Up @@ -122,7 +166,14 @@ func TestConnectAndWriteMetricWithNaNValueDrop(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}

urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}

e := &Elasticsearch{
URLs: urls,
Expand Down Expand Up @@ -181,7 +232,14 @@ func TestConnectAndWriteMetricWithNaNValueReplacement(t *testing.T) {
},
}

urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}

for _, test := range tests {
e := &Elasticsearch{
Expand Down Expand Up @@ -224,7 +282,14 @@ func TestTemplateManagementEmptyTemplateIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}

urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}

ctx := context.Background()

Expand All @@ -248,7 +313,14 @@ func TestTemplateManagementIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}

urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}

e := &Elasticsearch{
URLs: urls,
Expand Down Expand Up @@ -276,7 +348,14 @@ func TestTemplateInvalidIndexPatternIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}

urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}

e := &Elasticsearch{
URLs: urls,
Expand Down

0 comments on commit 7cbe4c8

Please sign in to comment.