Skip to content

Commit

Permalink
[Ingest Manager] Agent fix snapshot download for upgrade (elastic#22175)
Browse files Browse the repository at this point in the history
[Ingest Manager] Agent fix snapshot download for upgrade (elastic#22175)
  • Loading branch information
michalpristas committed Oct 27, 2020
1 parent ee7445b commit 90eed06
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 28 deletions.
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func streamFactory(ctx context.Context, agentInfo *info.AgentInfo, cfg *configur
}

func newOperator(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, id routingKey, config *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) (*operation.Operator, error) {
fetcher := downloader.NewDownloader(log, config.DownloadConfig, false)
fetcher := downloader.NewDownloader(log, config.DownloadConfig)
allowEmptyPgp, pgp := release.PGP()
verifier, err := downloader.NewVerifier(log, config.DownloadConfig, allowEmptyPgp, pgp, false)
verifier, err := downloader.NewVerifier(log, config.DownloadConfig, allowEmptyPgp, pgp)
if err != nil {
return nil, errors.New(err, "initiating verifier")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ import (
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/composed"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/fs"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/http"
downloader "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/localremote"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/snapshot"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
)

Expand All @@ -26,13 +33,16 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri
}
}

allowEmptyPgp, pgp := release.PGP()
verifier, err := downloader.NewVerifier(u.log, &settings, allowEmptyPgp, pgp, true)
verifier, err := newVerifier(version, u.log, &settings)
if err != nil {
return "", errors.New(err, "initiating verifier")
}

fetcher := downloader.NewDownloader(u.log, &settings, true)
fetcher, err := newDownloader(version, u.log, &settings)
if err != nil {
return "", errors.New(err, "initiating fetcher")
}

path, err := fetcher.Download(ctx, agentName, agentArtifactName, version)
if err != nil {
return "", errors.New(err, "failed upgrade of agent binary")
Expand All @@ -48,3 +58,45 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri

return path, nil
}

func newDownloader(version string, log *logger.Logger, settings *artifact.Config) (download.Downloader, error) {
if !strings.HasSuffix(version, "-SNAPSHOT") {
return downloader.NewDownloader(log, settings), nil
}

// try snapshot repo before official
snapDownloader, err := snapshot.NewDownloader(settings, version)
if err != nil {
return nil, err
}

return composed.NewDownloader(
fs.NewDownloader(settings),
snapDownloader,
http.NewDownloader(settings),
), nil
}

func newVerifier(version string, log *logger.Logger, settings *artifact.Config) (download.Verifier, error) {
allowEmptyPgp, pgp := release.PGP()
if !strings.HasSuffix(version, "-SNAPSHOT") {
return downloader.NewVerifier(log, settings, allowEmptyPgp, pgp)
}

fsVerifier, err := fs.NewVerifier(settings, allowEmptyPgp, pgp)
if err != nil {
return nil, err
}

snapshotVerifier, err := snapshot.NewVerifier(settings, allowEmptyPgp, pgp, version)
if err != nil {
return nil, err
}

remoteVerifier, err := http.NewVerifier(settings, allowEmptyPgp, pgp)
if err != nil {
return nil, err
}

return composed.NewVerifier(fsVerifier, snapshotVerifier, remoteVerifier), nil
}
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/artifact/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func DefaultConfig() *Config {
return &Config{
SourceURI: "https://artifacts.elastic.co/downloads/",
TargetDirectory: filepath.Join(homePath, "downloads"),
Timeout: 30 * time.Second,
Timeout: 120 * time.Second, // binaries are a getting bit larger it might take >30s to download them
InstallPath: filepath.Join(homePath, "install"),
}
}
Expand Down
7 changes: 5 additions & 2 deletions x-pack/elastic-agent/pkg/artifact/download/fs/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
)

const (
ascSuffix = ".asc"
ascSuffix = ".asc"
sha512Length = 128
)

// Verifier verifies a downloaded package by comparing with public ASC
Expand Down Expand Up @@ -93,7 +94,9 @@ func (v *Verifier) verifyHash(filename, fullPath string) (bool, error) {
continue
}

expectedHash = strings.TrimSpace(strings.TrimSuffix(line, filename))
if len(line) > sha512Length {
expectedHash = strings.TrimSpace(line[:sha512Length])
}
}

if expectedHash == "" {
Expand Down
12 changes: 6 additions & 6 deletions x-pack/elastic-agent/pkg/artifact/download/http/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f
return "", errors.New(err, "fetching package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
}

destinationFile, err := os.OpenFile(fullPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, packagePermissions)
if err != nil {
return "", errors.New(err, "creating package file failed", errors.TypeFilesystem, errors.M(errors.MetaKeyPath, fullPath))
}
defer destinationFile.Close()

resp, err := e.client.Do(req.WithContext(ctx))
if err != nil {
return "", errors.New(err, "fetching package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
Expand All @@ -142,12 +148,6 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f
return "", errors.New(fmt.Sprintf("call to '%s' returned unsuccessful status code: %d", sourceURI, resp.StatusCode), errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
}

destinationFile, err := os.OpenFile(fullPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, packagePermissions)
if err != nil {
return "", errors.New(err, "creating package file failed", errors.TypeFilesystem, errors.M(errors.MetaKeyPath, fullPath))
}
defer destinationFile.Close()

_, err = io.Copy(destinationFile, resp.Body)
return fullPath, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (

// NewDownloader creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewDownloader(log *logger.Logger, config *artifact.Config, forceSnapshot bool) download.Downloader {
func NewDownloader(log *logger.Logger, config *artifact.Config) download.Downloader {
downloaders := make([]download.Downloader, 0, 3)
downloaders = append(downloaders, fs.NewDownloader(config))

// try snapshot repo before official
if release.Snapshot() || forceSnapshot {
snapDownloader, err := snapshot.NewDownloader(config)
if release.Snapshot() {
snapDownloader, err := snapshot.NewDownloader(config, "")
if err != nil {
log.Error(err)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

// NewVerifier creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool, pgp []byte, forceSnapshot bool) (download.Verifier, error) {
func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool, pgp []byte) (download.Verifier, error) {
verifiers := make([]download.Verifier, 0, 3)

fsVer, err := fs.NewVerifier(config, allowEmptyPgp, pgp)
Expand All @@ -27,8 +27,8 @@ func NewVerifier(log *logger.Logger, config *artifact.Config, allowEmptyPgp bool
verifiers = append(verifiers, fsVer)

// try snapshot repo before official
if release.Snapshot() || forceSnapshot {
snapshotVerifier, err := snapshot.NewVerifier(config, allowEmptyPgp, pgp)
if release.Snapshot() {
snapshotVerifier, err := snapshot.NewVerifier(config, allowEmptyPgp, pgp, "")
if err != nil {
log.Error(err)
} else {
Expand Down
20 changes: 14 additions & 6 deletions x-pack/elastic-agent/pkg/artifact/download/snapshot/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ import (

// NewDownloader creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewDownloader(config *artifact.Config) (download.Downloader, error) {
cfg, err := snapshotConfig(config)
func NewDownloader(config *artifact.Config, versionOverride string) (download.Downloader, error) {
cfg, err := snapshotConfig(config, versionOverride)
if err != nil {
return nil, err
}
return http.NewDownloader(cfg), nil
}

func snapshotConfig(config *artifact.Config) (*artifact.Config, error) {
snapshotURI, err := snapshotURI()
func snapshotConfig(config *artifact.Config, versionOverride string) (*artifact.Config, error) {
snapshotURI, err := snapshotURI(versionOverride)
if err != nil {
return nil, fmt.Errorf("failed to detect remote snapshot repo, proceeding with configured: %v", err)
}
Expand All @@ -43,8 +43,16 @@ func snapshotConfig(config *artifact.Config) (*artifact.Config, error) {
}, nil
}

func snapshotURI() (string, error) {
artifactsURI := fmt.Sprintf("https://artifacts-api.elastic.co/v1/search/%s-SNAPSHOT/elastic-agent", release.Version())
func snapshotURI(versionOverride string) (string, error) {
version := release.Version()
if versionOverride != "" {
if strings.HasSuffix(versionOverride, "-SNAPSHOT") {
versionOverride = strings.TrimSuffix(versionOverride, "-SNAPSHOT")
}
version = versionOverride
}

artifactsURI := fmt.Sprintf("https://artifacts-api.elastic.co/v1/search/%s-SNAPSHOT/elastic-agent", version)
resp, err := gohttp.Get(artifactsURI)
if err != nil {
return "", err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (

// NewVerifier creates a downloader which first checks local directory
// and then fallbacks to remote if configured.
func NewVerifier(config *artifact.Config, allowEmptyPgp bool, pgp []byte) (download.Verifier, error) {
cfg, err := snapshotConfig(config)
func NewVerifier(config *artifact.Config, allowEmptyPgp bool, pgp []byte, versionOverride string) (download.Verifier, error) {
cfg, err := snapshotConfig(config, versionOverride)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 90eed06

Please sign in to comment.