Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to CFT region and add more robust tracking and cleanup of stacks. #3701

Merged
merged 15 commits into from
Nov 14, 2023
Merged
5 changes: 2 additions & 3 deletions .buildkite/hooks/pre-command
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ DOCKER_REGISTRY="docker.elastic.co"
DOCKER_REGISTRY_SECRET_PATH="kv/ci-shared/platform-ingest/docker_registry_prod"
CI_DRA_ROLE_PATH="kv/ci-shared/release/dra-role"
CI_GCP_OBS_PATH="kv/ci-shared/observability-ingest/cloud/gcp"
# CI_AGENT_QA_OBS_PATH="kv/ci-shared/observability-ingest/elastic-agent-ess-qa"
CI_ESS_STAGING_PATH="kv/ci-shared/platform-ingest/platform-ingest-ec-staging"
CI_ESS_PATH="kv/ci-shared/platform-ingest/platform-ingest-ec-prod"
CI_DRA_ROLE_PATH="kv/ci-shared/release/dra-role"


Expand Down Expand Up @@ -55,7 +54,7 @@ if [[ "$BUILDKITE_PIPELINE_SLUG" == "elastic-agent" && "$BUILDKITE_STEP_KEY" ==
export TEST_INTEG_AUTH_GCP_SERVICE_TOKEN_FILE=$(realpath ./gcp.json)

# ESS credentials
export API_KEY_TOKEN=$(vault kv get -field apiKey ${CI_ESS_STAGING_PATH})
export API_KEY_TOKEN=$(vault kv get -field apiKey ${CI_ESS_PATH})
echo ${API_KEY_TOKEN} > ./apiKey
export TEST_INTEG_AUTH_ESS_APIKEY_FILE=$(realpath ./apiKey)
fi
Expand Down
6 changes: 5 additions & 1 deletion .buildkite/hooks/pre-exit
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ if [[ "$BUILDKITE_PIPELINE_SLUG" == "elastic-agent" && "$BUILDKITE_STEP_KEY" ==

# Perform cleanup of integration tests resources
echo "--- Cleaning up integration test resources"
TEST_INTEG_AUTH_ESS_REGION=us-east-1 SNAPSHOT=true mage integration:clean
if [[ "$BUILDKITE_STEP_KEY" == "serverless-integration-tests" ]]; then
STACK_PROVISIONER=serverless SNAPSHOT=true mage integration:clean
else
SNAPSHOT=true mage integration:clean
fi
fi

if [ -n "$GOOGLE_APPLICATION_CREDENTIALS" ]; then
Expand Down
4 changes: 0 additions & 4 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ steps:

- label: "Serverless integration test"
key: "serverless-integration-tests"
env:
TEST_INTEG_AUTH_ESS_REGION: us-east-1
command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestLogIngestionFleetManaged" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite
artifact_paths:
- "build/TEST-**"
Expand All @@ -150,8 +148,6 @@ steps:

- label: "Integration tests"
key: "integration-tests"
env:
TEST_INTEG_AUTH_ESS_REGION: us-east-1
command: ".buildkite/scripts/steps/integration_tests.sh stateful"
artifact_paths:
- "build/TEST-**"
Expand Down
9 changes: 5 additions & 4 deletions magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -1752,15 +1752,16 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche
}
datacenter := os.Getenv("TEST_INTEG_AUTH_GCP_DATACENTER")
if datacenter == "" {
// us-central1-a is used because T2A instances required for ARM64 testing are only
// available in the central regions
datacenter = "us-central1-a"
}

// Valid values are gcp-us-central1 (default), azure-eastus2,
// aws-eu-central-1, us-east-1 (which is an AWS region but the
// "aws" CSP prefix is not used by ESS for some reason!)
// Possible to change the region for deployment, default is gcp-us-west2 which is
// the CFT region.
essRegion := os.Getenv("TEST_INTEG_AUTH_ESS_REGION")
if essRegion == "" {
essRegion = "gcp-us-central1"
essRegion = "gcp-us-west2"
}

instanceProvisionerMode := os.Getenv("INSTANCE_PROVISIONER")
Expand Down
7 changes: 6 additions & 1 deletion pkg/testing/ess/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ type Config struct {
}

func defaultConfig() *Config {
baseURL := os.Getenv("TEST_INTEG_AUTH_ESS_URL")
if baseURL == "" {
baseURL = "https://cloud.elastic.co"
}
url := strings.TrimRight(baseURL, "/") + "/api/v1"
return &Config{
BaseUrl: `https://staging.found.no/api/v1`,
BaseUrl: url,
}
}

Expand Down
149 changes: 65 additions & 84 deletions pkg/testing/ess/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"strings"
"time"

"golang.org/x/sync/errgroup"

"github.com/elastic/elastic-agent/pkg/testing/runner"
)

Expand Down Expand Up @@ -62,89 +60,77 @@ func (p *provisioner) SetLogger(l runner.Logger) {
p.logger = l
}

func (p *provisioner) Provision(ctx context.Context, requests []runner.StackRequest) ([]runner.Stack, error) {
results := make(map[runner.StackRequest]*CreateDeploymentResponse)
for _, r := range requests {
// allow up to 2 minutes for each create request
createCtx, createCancel := context.WithTimeout(ctx, 2*time.Minute)
resp, err := p.createDeployment(createCtx, r,
map[string]string{
"division": "engineering",
"org": "ingest",
"team": "elastic-agent",
"project": "elastic-agent",
"integration-tests": "true",
})
createCancel()
if err != nil {
return nil, err
}
results[r] = resp
}
// Create creates a stack.
func (p *provisioner) Create(ctx context.Context, request runner.StackRequest) (runner.Stack, error) {
// allow up to 2 minutes for request
createCtx, createCancel := context.WithTimeout(ctx, 2*time.Minute)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can pass timeout as a parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above on why I didn't do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

I didn't get why not have the timeout configurable.

See above on why I didn't do that.

I read up to the top of the file and didn't get the reasoning to have the timeout hardcoded...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think he meant the comment chain in https://github.com/elastic/elastic-agent/pull/3701/files#r1383088811 which might have been above at one point but isn't necessarily now.

defer createCancel()
resp, err := p.createDeployment(createCtx, request,
map[string]string{
"division": "engineering",
"org": "ingest",
"team": "elastic-agent",
"project": "elastic-agent",
"integration-tests": "true",
})
if err != nil {
return runner.Stack{}, err
blakerouse marked this conversation as resolved.
Show resolved Hide resolved
}
return runner.Stack{
ID: request.ID,
Version: request.Version,
Elasticsearch: resp.ElasticsearchEndpoint,
Kibana: resp.KibanaEndpoint,
Username: resp.Username,
Password: resp.Password,
Internal: map[string]interface{}{
"deployment_id": resp.ID,
},
Ready: false,
}, nil
}

// set a long timeout
// this context travels up to the magefile, clients that want a shorter timeout can set
// it via mage's -t flag
readyCtx, readyCancel := context.WithTimeout(ctx, 25*time.Minute)
defer readyCancel()

g, gCtx := errgroup.WithContext(readyCtx)
for req, resp := range results {
g.Go(func(req runner.StackRequest, resp *CreateDeploymentResponse) func() error {
return func() error {
ready, err := p.client.DeploymentIsReady(gCtx, resp.ID, 30*time.Second)
if err != nil {
return fmt.Errorf("failed to check for cloud %s to be ready: %w", req.Version, err)
}
if !ready {
return fmt.Errorf("cloud %s never became ready: %w", req.Version, err)
}
return nil
}
}(req, resp))
// WaitForReady should block until the stack is ready or the context is cancelled.
func (p *provisioner) WaitForReady(ctx context.Context, stack runner.Stack) (runner.Stack, error) {
deploymentID, err := p.getDeploymentID(stack)
if err != nil {
return stack, fmt.Errorf("failed to get deployment ID from the stack: %w", err)
}
err := g.Wait()
// allow up to 10 minutes for it to become ready
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make the timeout a parameter passed from the outside maybe ?

(there are other instances throughout the code, all with slightly different timeouts, I would really prefer if we could configure them elsewhere and pass them in to the provisioners functions)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it this way allowing the provisioner code to make the decision on what is the best timeout to be used based on what is being used. That way a timeout from a setting or default value doesn't effect the needed time. I believe its the provisioners that should be determine the appropriate amount of time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it seems to me that we have at least one occurrence where these timeouts are related
https://github.com/elastic/elastic-agent/pull/3701/files#diff-488eb64eb90cc8cd9d898511bc73a09b146c6704f3ce57d9e88d0189cd007498R693-R695

or is it just coincidence?

The timeout should be how long the caller is willing to wait, so why can't we pass that in ?

In general I don't see the downside of putting the timeouts as parameters (even if at first we just pass constants in it) rather than hardcoding different timeouts in different functions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That timeout places a hard limit just for caution. I could remove it, but if the provisioner doesn't place a timeout on its own execution then it would result in it blocking forever. If you prefer I can remove it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it as it is, having only the context, if the caller wants a timeout, it can set the timeout in the context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the provisioner setting a timeout in case its caller does not, so we can avoid getting stuck wiating for ever, but the caller should have the flexibility to choose how long it is willing to wait.

I like the idea of just using the context, the provisioner can check whether there is a deadline set in the context, if there isn't one, then it can set a hardcoded default one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how it is.

Copy link
Member

@cmacknz cmacknz Nov 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: My example is wrong

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated comment with a correct example:

https://go.dev/play/p/KBEtIjKFho_b

package main

import (
	"context"
	"fmt"
	"time"
)

func main() {
	ctx1, cancel1 := context.WithTimeout(context.Background(), 1*time.Millisecond)
	defer cancel1()
	ctx5, cancel5 := context.WithTimeout(ctx1, 5*time.Second)
	defer cancel5()

	start := time.Now()
	select {
	case <-ctx5.Done():
		fmt.Printf("Child context is expired after %s\n", time.Since(start))
	}
}

This prints Child context is expired after 1ms since the child context respects the parent context's deadline.

So with the code today the maximum and default timeout is 10 minutes, and callers can set a shorter timeout via the context. I think this is a perfectly reasonable way for this to work.

The only debate should be if 10 minutes is a long enough maximum timeout.

defer cancel()
p.logger.Logf("Waiting for cloud stack %s to be ready [stack_id: %s, deployment_id: %s]", stack.Version, stack.ID, deploymentID)
ready, err := p.client.DeploymentIsReady(ctx, deploymentID, 30*time.Second)
if err != nil {
return nil, err
return stack, fmt.Errorf("failed to check for cloud %s [stack_id: %s, deployment_id: %s] to be ready: %w", stack.Version, stack.ID, deploymentID, err)
}

var stacks []runner.Stack
for req, resp := range results {
stacks = append(stacks, runner.Stack{
ID: req.ID,
Version: req.Version,
Elasticsearch: resp.ElasticsearchEndpoint,
Kibana: resp.KibanaEndpoint,
Username: resp.Username,
Password: resp.Password,
Internal: map[string]interface{}{
"deployment_id": resp.ID,
},
})
if !ready {
return stack, fmt.Errorf("cloud %s [stack_id: %s, deployment_id: %s] never became ready: %w", stack.Version, stack.ID, deploymentID, err)
}
return stacks, nil
stack.Ready = true
return stack, nil
}

// Clean cleans up all provisioned resources.
func (p *provisioner) Clean(ctx context.Context, stacks []runner.Stack) error {
var errs []error
for _, s := range stacks {
err := p.destroyDeployment(ctx, s)
if err != nil {
errs = append(errs, fmt.Errorf("failed to destroy stack %s (%s): %w", s.Version, s.ID, err))
}
}
if len(errs) > 0 {
return errors.Join(errs...)
// Delete deletes a stack.
func (p *provisioner) Delete(ctx context.Context, stack runner.Stack) error {
deploymentID, err := p.getDeploymentID(stack)
if err != nil {
return err
}
return nil

// allow up to 1 minute for request
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()

p.logger.Logf("Destroying cloud stack %s [stack_id: %s, deployment_id: %s]", stack.Version, stack.ID, deploymentID)
return p.client.ShutdownDeployment(ctx, deploymentID)
}

func (p *provisioner) createDeployment(ctx context.Context, r runner.StackRequest, tags map[string]string) (*CreateDeploymentResponse, error) {
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()

p.logger.Logf("Creating stack %s (%s)", r.Version, r.ID)
p.logger.Logf("Creating cloud stack %s [stack_id: %s]", r.Version, r.ID)
name := fmt.Sprintf("%s-%s", strings.Replace(p.cfg.Identifier, ".", "-", -1), r.ID)

// prepare tags
Expand All @@ -168,26 +154,21 @@ func (p *provisioner) createDeployment(ctx context.Context, r runner.StackReques
p.logger.Logf("Failed to create ESS cloud %s: %s", r.Version, err)
return nil, fmt.Errorf("failed to create ESS cloud for version %s: %w", r.Version, err)
}
p.logger.Logf("Created stack %s (%s) [id: %s]", r.Version, r.ID, resp.ID)
p.logger.Logf("Created cloud stack %s [stack_id: %s, deployment_id: %s]", r.Version, r.ID, resp.ID)
return resp, nil
}

func (p *provisioner) destroyDeployment(ctx context.Context, s runner.Stack) error {
if s.Internal == nil {
return fmt.Errorf("missing internal information")
func (p *provisioner) getDeploymentID(stack runner.Stack) (string, error) {
if stack.Internal == nil {
return "", fmt.Errorf("missing internal information")
}
deploymentIDRaw, ok := s.Internal["deployment_id"]
deploymentIDRaw, ok := stack.Internal["deployment_id"]
if !ok {
return fmt.Errorf("missing internal deployment_id")
return "", fmt.Errorf("missing internal deployment_id")
}
deploymentID, ok := deploymentIDRaw.(string)
if !ok {
return fmt.Errorf("internal deployment_id not a string")
return "", fmt.Errorf("internal deployment_id not a string")
}

ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()

p.logger.Logf("Destroying stack %s (%s)", s.Version, s.ID)
return p.client.ShutdownDeployment(ctx, deploymentID)
return deploymentID, nil
}
2 changes: 1 addition & 1 deletion pkg/testing/ess/serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/elastic/elastic-agent/pkg/testing/runner"
)

var serverlessURL = "https://staging.found.no"
var serverlessURL = "https://cloud.elastic.co"

// ServerlessClient is the handler the serverless ES instance
type ServerlessClient struct {
Expand Down
Loading
Loading