Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] Agent fix snapshot download for upgrade #22175

Merged
merged 3 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func streamFactory(ctx context.Context, agentInfo *info.AgentInfo, cfg *configur
}

func newOperator(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, id routingKey, config *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) (*operation.Operator, error) {
fetcher := downloader.NewDownloader(log, config.DownloadConfig, false)
fetcher := downloader.NewDownloader(log, config.DownloadConfig)
allowEmptyPgp, pgp := release.PGP()
verifier, err := downloader.NewVerifier(log, config.DownloadConfig, allowEmptyPgp, pgp, false)
verifier, err := downloader.NewVerifier(log, config.DownloadConfig, allowEmptyPgp, pgp)
if err != nil {
return nil, errors.New(err, "initiating verifier")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ import (
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/composed"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/fs"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/http"
downloader "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/localremote"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/snapshot"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
)

Expand All @@ -26,13 +33,16 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri
}
}

allowEmptyPgp, pgp := release.PGP()
verifier, err := downloader.NewVerifier(u.log, &settings, allowEmptyPgp, pgp, true)
verifier, err := newVerifier(version, u.log, &settings)
if err != nil {
return "", errors.New(err, "initiating verifier")
}

fetcher := downloader.NewDownloader(u.log, &settings, true)
fetcher, err := newDownloader(version, u.log, &settings)
if err != nil {
return "", errors.New(err, "initiating fetcher")
}

path, err := fetcher.Download(ctx, agentName, agentArtifactName, version)
if err != nil {
return "", errors.New(err, "failed upgrade of agent binary")
Expand All @@ -48,3 +58,45 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri

return path, nil
}

func newDownloader(version string, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) {
if !strings.HasSuffix(version, "-SNAPSHOT") {
return downloader.NewDownloader(log, settings), nil
}

// try snapshot repo before official
snapDownloader, err := snapshot.NewDownloader(settings, version)
if err != nil {
return nil, err
}

return composed.NewDownloader(
fs.NewDownloader(settings),
snapDownloader,
http.NewDownloader(settings),
), nil
}

func newVerifier(version string, log *logger.Logger, settings *artifact.Config) (download.Verifier, error) {
allowEmptyPgp, pgp := release.PGP()
if !strings.HasSuffix(version, "-SNAPSHOT") {
return downloader.NewVerifier(log, settings, allowEmptyPgp, pgp)
}

fsVerifier, err := fs.NewVerifier(settings, allowEmptyPgp, pgp)
if err != nil {
return nil, err
}

snapshotVerifier, err := snapshot.NewVerifier(settings, allowEmptyPgp, pgp, version)
if err != nil {
return nil, err
}

remoteVerifier, err := http.NewVerifier(settings, allowEmptyPgp, pgp)
if err != nil {
return nil, err
}

return composed.NewVerifier(fsVerifier, snapshotVerifier, remoteVerifier), nil
}
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/artifact/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func DefaultConfig() *Config {
return &Config{
SourceURI: "https://artifacts.elastic.co/downloads/",
TargetDirectory: filepath.Join(homePath, "downloads"),
Timeout: 30 * time.Second,
Timeout: 120 * time.Second, // binaries are a getting bit larger it might take >30s to download them
InstallPath: filepath.Join(homePath, "install"),
}
}
Expand Down
7 changes: 5 additions & 2 deletions x-pack/elastic-agent/pkg/artifact/download/fs/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
)

const (
ascSuffix = ".asc"
ascSuffix = ".asc"
sha512Length = 128
)

// Verifier verifies a downloaded package by comparing with public ASC
Expand Down Expand Up @@ -93,7 +94,9 @@ func (v *Verifier) verifyHash(filename, fullPath string) (bool, error) {
continue
}

expectedHash = strings.TrimSpace(strings.TrimSuffix(line, filename))
if len(line) > sha512Length {
expectedHash = strings.TrimSpace(line[:sha512Length])
}
}

if expectedHash == "" {
Expand Down
12 changes: 6 additions & 6 deletions x-pack/elastic-agent/pkg/artifact/download/http/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f
return "", errors.New(err, "fetching package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
}

destinationFile, err := os.OpenFile(fullPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, packagePermissions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved to take the local file into account first? Not sure I fully follow this part here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was to ensure that the file path to write to can be opened before even starting the HTTP connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly, just a micro optimization

if err != nil {
return "", errors.New(err, "creating package file failed", errors.TypeFilesystem, errors.M(errors.MetaKeyPath, fullPath))
}
defer destinationFile.Close()

resp, err := e.client.Do(req.WithContext(ctx))
if err != nil {
return "", errors.New(err, "fetching package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
Expand All @@ -142,12 +148,6 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f
return "", errors.New(fmt.Sprintf("call to '%s' returned unsuccessful status code: %d", sourceURI, resp.StatusCode), errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
}

destinationFile, err := os.OpenFile(fullPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, packagePermissions)
if err != nil {
return "", errors.New(err, "creating package file failed", errors.TypeFilesystem, errors.M(errors.MetaKeyPath, fullPath))
}
defer destinationFile.Close()

_, err = io.Copy(destinationFile, resp.Body)
return fullPath, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (

// NewDownloader creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewDownloader(log *logger.Logger, config *artifact.Config, forceSnapshot bool) download.Downloader {
func NewDownloader(log *logger.Logger, config *artifact.Config) download.Downloader {
downloaders := make([]download.Downloader, 0, 3)
downloaders = append(downloaders, fs.NewDownloader(config))

// try snapshot repo before official
if release.Snapshot() || forceSnapshot {
snapDownloader, err := snapshot.NewDownloader(config)
if release.Snapshot() {
snapDownloader, err := snapshot.NewDownloader(config, "")
if err != nil {
log.Error(err)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

// NewVerifier creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool, pgp []byte, forceSnapshot bool) (download.Verifier, error) {
func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool, pgp []byte) (download.Verifier, error) {
verifiers := make([]download.Verifier, 0, 3)

fsVer, err := fs.NewVerifier(config, allowEmptyPgp, pgp)
Expand All @@ -27,8 +27,8 @@ func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool
verifiers = append(verifiers, fsVer)

// try snapshot repo before official
if release.Snapshot() || forceSnapshot {
snapshotVerifier, err := snapshot.NewVerifier(config, allowEmptyPgp, pgp)
if release.Snapshot() {
snapshotVerifier, err := snapshot.NewVerifier(config, allowEmptyPgp, pgp, "")
if err != nil {
log.Error(err)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ import (

// NewDownloader creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewDownloader(config *artifact.Config) (download.Downloader, error) {
cfg, err := snapshotConfig(config)
func NewDownloader(config *artifact.Config, versionOverride string) (download.Downloader, error) {
cfg, err := snapshotConfig(config, versionOverride)
if err != nil {
return nil, err
}
return http.NewDownloader(cfg), nil
}

func snapshotConfig(config *artifact.Config) (*artifact.Config, error) {
snapshotURI, err := snapshotURI()
func snapshotConfig(config *artifact.Config, versionOverride string) (*artifact.Config, error) {
snapshotURI, err := snapshotURI(versionOverride)
if err != nil {
return nil, fmt.Errorf("failed to detect remote snapshot repo, proceeding with configured: %v", err)
}
Expand All @@ -43,8 +43,16 @@ func snapshotConfig(config *artifact.Config) (*artifact.Config, error) {
}, nil
}

func snapshotURI() (string, error) {
artifactsURI := fmt.Sprintf("https://artifacts-api.elastic.co/v1/search/%s-SNAPSHOT/elastic-agent", release.Version())
func snapshotURI(versionOverride string) (string, error) {
version := release.Version()
if versionOverride != "" {
if strings.HasSuffix(versionOverride, "-SNAPSHOT") {
versionOverride = strings.TrimSuffix(versionOverride, "-SNAPSHOT")
}
version = versionOverride
}

artifactsURI := fmt.Sprintf("https://artifacts-api.elastic.co/v1/search/%s-SNAPSHOT/elastic-agent", version)
resp, err := gohttp.Get(artifactsURI)
if err != nil {
return "", err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (

// NewVerifier creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewVerifier(config *artifact.Config, allowEmptyPgp bool, pgp []byte) (download.Verifier, error) {
cfg, err := snapshotConfig(config)
func NewVerifier(config *artifact.Config, allowEmptyPgp bool, pgp []byte, versionOverride string) (download.Verifier, error) {
cfg, err := snapshotConfig(config, versionOverride)
if err != nil {
return nil, err
}
Expand Down