From 6a5a2b5135f695897503babdb70f022e5b3299d5 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 27 Oct 2020 08:38:52 +0100 Subject: [PATCH 1/2] step --- .../pkg/agent/application/stream.go | 4 +- .../application/upgrade/step_download.go | 62 ++++++++++++++++++- x-pack/elastic-agent/pkg/artifact/config.go | 2 +- .../pkg/artifact/download/fs/verifier.go | 7 ++- .../pkg/artifact/download/http/downloader.go | 12 ++-- .../download/localremote/downloader.go | 6 +- .../artifact/download/localremote/verifier.go | 6 +- .../artifact/download/snapshot/downloader.go | 20 ++++-- .../artifact/download/snapshot/verifier.go | 4 +- 9 files changed, 95 insertions(+), 28 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/application/stream.go b/x-pack/elastic-agent/pkg/agent/application/stream.go index 2d372ef4387..0cdbd99082a 100644 --- a/x-pack/elastic-agent/pkg/agent/application/stream.go +++ b/x-pack/elastic-agent/pkg/agent/application/stream.go @@ -57,9 +57,9 @@ func streamFactory(ctx context.Context, agentInfo *info.AgentInfo, cfg *configur } func newOperator(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, id routingKey, config *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) (*operation.Operator, error) { - fetcher := downloader.NewDownloader(log, config.DownloadConfig, false) + fetcher := downloader.NewDownloader(log, config.DownloadConfig) allowEmptyPgp, pgp := release.PGP() - verifier, err := downloader.NewVerifier(log, config.DownloadConfig, allowEmptyPgp, pgp, false) + verifier, err := downloader.NewVerifier(log, config.DownloadConfig, allowEmptyPgp, pgp) if err != nil { return nil, errors.New(err, "initiating verifier") } diff --git a/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go b/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go index 0294308ff3a..a649444532f 100644 --- a/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go +++ b/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go @@ -7,15 +7,26 @@ package upgrade import ( "context" "strings" + "time" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/composed" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/fs" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/http" downloader "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/localremote" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/snapshot" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI string) (string, error) { // do not update source config settings := *u.settings + + // agent binaries are a bit larger and do not fit into normal timeout on slower connections + settings.Timeout = 120 * time.Second if sourceURI != "" { if strings.HasPrefix(sourceURI, "file://") { // update the DropPath so the fs.Downloader can download from this @@ -26,13 +37,16 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri } } - allowEmptyPgp, pgp := release.PGP() - verifier, err := downloader.NewVerifier(u.log, &settings, allowEmptyPgp, pgp, true) + verifier, err := newVerifier(version, u.log, &settings) if err != nil { return "", errors.New(err, "initiating verifier") } - fetcher := downloader.NewDownloader(u.log, &settings, true) + fetcher, err := newDownloader(version, u.log, &settings) + if err != nil { + return "", errors.New(err, "initiating fetcher") + } + path, err := fetcher.Download(ctx, agentName, agentArtifactName, version) if err != nil { return "", errors.New(err, "failed upgrade of agent binary") @@ -48,3 +62,45 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri return path, nil } + +func newDownloader(version string, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) { + if !strings.HasSuffix(version, "-SNAPSHOT") { + return downloader.NewDownloader(log, settings), nil + } + + // try snapshot repo before official + snapDownloader, err := snapshot.NewDownloader(settings, version) + if err != nil { + return nil, err + } + + return composed.NewDownloader( + fs.NewDownloader(settings), + snapDownloader, + http.NewDownloader(settings), + ), nil +} + +func newVerifier(version string, log *logger.Logger, settings *artifact.Config) (download.Verifier, error) { + allowEmptyPgp, pgp := release.PGP() + if !strings.HasSuffix(version, "-SNAPSHOT") { + return downloader.NewVerifier(log, settings, allowEmptyPgp, pgp) + } + + fsVerifier, err := fs.NewVerifier(settings, allowEmptyPgp, pgp) + if err != nil { + return nil, err + } + + snapshotVerifier, err := snapshot.NewVerifier(settings, allowEmptyPgp, pgp, version) + if err != nil { + return nil, err + } + + remoteVerifier, err := http.NewVerifier(settings, allowEmptyPgp, pgp) + if err != nil { + return nil, err + } + + return composed.NewVerifier(fsVerifier, snapshotVerifier, remoteVerifier), nil +} diff --git a/x-pack/elastic-agent/pkg/artifact/config.go b/x-pack/elastic-agent/pkg/artifact/config.go index 5b0766cb257..efbd925879c 100644 --- a/x-pack/elastic-agent/pkg/artifact/config.go +++ b/x-pack/elastic-agent/pkg/artifact/config.go @@ -47,7 +47,7 @@ func DefaultConfig() *Config { return &Config{ SourceURI: "https://artifacts.elastic.co/downloads/", TargetDirectory: filepath.Join(homePath, "downloads"), - Timeout: 30 * time.Second, + Timeout: 60 * time.Second, // binaries are a bit larger it might take >30s to download them InstallPath: filepath.Join(homePath, "install"), } } diff --git a/x-pack/elastic-agent/pkg/artifact/download/fs/verifier.go b/x-pack/elastic-agent/pkg/artifact/download/fs/verifier.go index 56652d4f69c..e9e405a5def 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/fs/verifier.go +++ b/x-pack/elastic-agent/pkg/artifact/download/fs/verifier.go @@ -22,7 +22,8 @@ import ( ) const ( - ascSuffix = ".asc" + ascSuffix = ".asc" + sha512Length = 128 ) // Verifier verifies a downloaded package by comparing with public ASC @@ -93,7 +94,9 @@ func (v *Verifier) verifyHash(filename, fullPath string) (bool, error) { continue } - expectedHash = strings.TrimSpace(strings.TrimSuffix(line, filename)) + if len(line) > sha512Length { + expectedHash = strings.TrimSpace(line[:sha512Length]) + } } if expectedHash == "" { diff --git a/x-pack/elastic-agent/pkg/artifact/download/http/downloader.go b/x-pack/elastic-agent/pkg/artifact/download/http/downloader.go index 358b793fccf..4af71eca0db 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/http/downloader.go +++ b/x-pack/elastic-agent/pkg/artifact/download/http/downloader.go @@ -132,6 +132,12 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f return "", errors.New(err, "fetching package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)) } + destinationFile, err := os.OpenFile(fullPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, packagePermissions) + if err != nil { + return "", errors.New(err, "creating package file failed", errors.TypeFilesystem, errors.M(errors.MetaKeyPath, fullPath)) + } + defer destinationFile.Close() + resp, err := e.client.Do(req.WithContext(ctx)) if err != nil { return "", errors.New(err, "fetching package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)) @@ -142,12 +148,6 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f return "", errors.New(fmt.Sprintf("call to '%s' returned unsuccessful status code: %d", sourceURI, resp.StatusCode), errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI)) } - destinationFile, err := os.OpenFile(fullPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, packagePermissions) - if err != nil { - return "", errors.New(err, "creating package file failed", errors.TypeFilesystem, errors.M(errors.MetaKeyPath, fullPath)) - } - defer destinationFile.Close() - _, err = io.Copy(destinationFile, resp.Body) return fullPath, nil } diff --git a/x-pack/elastic-agent/pkg/artifact/download/localremote/downloader.go b/x-pack/elastic-agent/pkg/artifact/download/localremote/downloader.go index ba82195ffbd..6934adc1ea3 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/localremote/downloader.go +++ b/x-pack/elastic-agent/pkg/artifact/download/localremote/downloader.go @@ -17,13 +17,13 @@ import ( // NewDownloader creates a downloader which first checks local directory // and then fallbacks to remote if configured. -func NewDownloader(log *logger.Logger, config *artifact.Config, forceSnapshot bool) download.Downloader { +func NewDownloader(log *logger.Logger, config *artifact.Config) download.Downloader { downloaders := make([]download.Downloader, 0, 3) downloaders = append(downloaders, fs.NewDownloader(config)) // try snapshot repo before official - if release.Snapshot() || forceSnapshot { - snapDownloader, err := snapshot.NewDownloader(config) + if release.Snapshot() { + snapDownloader, err := snapshot.NewDownloader(config, "") if err != nil { log.Error(err) } else { diff --git a/x-pack/elastic-agent/pkg/artifact/download/localremote/verifier.go b/x-pack/elastic-agent/pkg/artifact/download/localremote/verifier.go index 30517d12d3d..119121df0cd 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/localremote/verifier.go +++ b/x-pack/elastic-agent/pkg/artifact/download/localremote/verifier.go @@ -17,7 +17,7 @@ import ( // NewVerifier creates a downloader which first checks local directory // and then fallbacks to remote if configured. -func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool, pgp []byte, forceSnapshot bool) (download.Verifier, error) { +func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool, pgp []byte) (download.Verifier, error) { verifiers := make([]download.Verifier, 0, 3) fsVer, err := fs.NewVerifier(config, allowEmptyPgp, pgp) @@ -27,8 +27,8 @@ func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool verifiers = append(verifiers, fsVer) // try snapshot repo before official - if release.Snapshot() || forceSnapshot { - snapshotVerifier, err := snapshot.NewVerifier(config, allowEmptyPgp, pgp) + if release.Snapshot() { + snapshotVerifier, err := snapshot.NewVerifier(config, allowEmptyPgp, pgp, "") if err != nil { log.Error(err) } else { diff --git a/x-pack/elastic-agent/pkg/artifact/download/snapshot/downloader.go b/x-pack/elastic-agent/pkg/artifact/download/snapshot/downloader.go index 6f28ad8d926..a5b706a243c 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/snapshot/downloader.go +++ b/x-pack/elastic-agent/pkg/artifact/download/snapshot/downloader.go @@ -18,16 +18,16 @@ import ( // NewDownloader creates a downloader which first checks local directory // and then fallbacks to remote if configured. -func NewDownloader(config *artifact.Config) (download.Downloader, error) { - cfg, err := snapshotConfig(config) +func NewDownloader(config *artifact.Config, versionOverride string) (download.Downloader, error) { + cfg, err := snapshotConfig(config, versionOverride) if err != nil { return nil, err } return http.NewDownloader(cfg), nil } -func snapshotConfig(config *artifact.Config) (*artifact.Config, error) { - snapshotURI, err := snapshotURI() +func snapshotConfig(config *artifact.Config, versionOverride string) (*artifact.Config, error) { + snapshotURI, err := snapshotURI(versionOverride) if err != nil { return nil, fmt.Errorf("failed to detect remote snapshot repo, proceeding with configured: %v", err) } @@ -43,8 +43,16 @@ func snapshotConfig(config *artifact.Config) (*artifact.Config, error) { }, nil } -func snapshotURI() (string, error) { - artifactsURI := fmt.Sprintf("https://artifacts-api.elastic.co/v1/search/%s-SNAPSHOT/elastic-agent", release.Version()) +func snapshotURI(versionOverride string) (string, error) { + version := release.Version() + if versionOverride != "" { + if strings.HasSuffix(versionOverride, "-SNAPSHOT") { + versionOverride = strings.TrimSuffix(versionOverride, "-SNAPSHOT") + } + version = versionOverride + } + + artifactsURI := fmt.Sprintf("https://artifacts-api.elastic.co/v1/search/%s-SNAPSHOT/elastic-agent", version) resp, err := gohttp.Get(artifactsURI) if err != nil { return "", err diff --git a/x-pack/elastic-agent/pkg/artifact/download/snapshot/verifier.go b/x-pack/elastic-agent/pkg/artifact/download/snapshot/verifier.go index e9d8bbd4dc1..63757880f96 100644 --- a/x-pack/elastic-agent/pkg/artifact/download/snapshot/verifier.go +++ b/x-pack/elastic-agent/pkg/artifact/download/snapshot/verifier.go @@ -12,8 +12,8 @@ import ( // NewVerifier creates a downloader which first checks local directory // and then fallbacks to remote if configured. -func NewVerifier(config *artifact.Config, allowEmptyPgp bool, pgp []byte) (download.Verifier, error) { - cfg, err := snapshotConfig(config) +func NewVerifier(config *artifact.Config, allowEmptyPgp bool, pgp []byte, versionOverride string) (download.Verifier, error) { + cfg, err := snapshotConfig(config, versionOverride) if err != nil { return nil, err } From fafe325be790bf5404c1bb84d4a52e44c054ed00 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 27 Oct 2020 16:47:54 +0100 Subject: [PATCH 2/2] unified timeout --- .../pkg/agent/application/upgrade/step_download.go | 4 ---- x-pack/elastic-agent/pkg/artifact/config.go | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go b/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go index a649444532f..e96296016a4 100644 --- a/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go +++ b/x-pack/elastic-agent/pkg/agent/application/upgrade/step_download.go @@ -7,7 +7,6 @@ package upgrade import ( "context" "strings" - "time" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" @@ -24,9 +23,6 @@ import ( func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI string) (string, error) { // do not update source config settings := *u.settings - - // agent binaries are a bit larger and do not fit into normal timeout on slower connections - settings.Timeout = 120 * time.Second if sourceURI != "" { if strings.HasPrefix(sourceURI, "file://") { // update the DropPath so the fs.Downloader can download from this diff --git a/x-pack/elastic-agent/pkg/artifact/config.go b/x-pack/elastic-agent/pkg/artifact/config.go index efbd925879c..81ed3f856fc 100644 --- a/x-pack/elastic-agent/pkg/artifact/config.go +++ b/x-pack/elastic-agent/pkg/artifact/config.go @@ -47,7 +47,7 @@ func DefaultConfig() *Config { return &Config{ SourceURI: "https://artifacts.elastic.co/downloads/", TargetDirectory: filepath.Join(homePath, "downloads"), - Timeout: 60 * time.Second, // binaries are a bit larger it might take >30s to download them + Timeout: 120 * time.Second, // binaries are a getting bit larger it might take >30s to download them InstallPath: filepath.Join(homePath, "install"), } }