Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream archived content #472

Merged
merged 23 commits into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Validate handlebarsjs stream configuration templates. [#445](https://github.com/elastic/package-registry/pull/445)
* Serve favicon as embedded resource. [#468](https://github.com/elastic/package-registry/pull/468)
* Generate index.json file. [#470](https://github.com/elastic/package-registry/pull/470)
* Stream archived package content. [#472](https://github.com/elastic/package-registry/pull/472)

### Deprecated

Expand Down
112 changes: 112 additions & 0 deletions archiver/archive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package archiver

import (
"archive/tar"
"compress/gzip"
"io"
"log"
"os"
"path/filepath"

"github.com/pkg/errors"
)

// ArchivePackage method builds and streams an archive with package content.
func ArchivePackage(w io.Writer, packagePath string) error {
gzipWriter := gzip.NewWriter(w)
tarWriter := tar.NewWriter(gzipWriter)
defer func() {
err := tarWriter.Close()
if err != nil {
log.Printf("Error occurred while closing tar writer: %v", err)
}

err = gzipWriter.Close()
if err != nil {
log.Printf("Error occurred while closing gzip writer: %v", err)
}
}()

err := filepath.Walk(packagePath, func(path string, info os.FileInfo, err error) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have some logic to skip hidden files that might be created by the system like .DS_Store?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't pollute the code with references to files that are ".gitignored" anyway. The docker image is created and pushed in the CI environment, I don't think we can be afraid of presence of such files there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is not only run inside docker but also locally.

if err != nil {
return err
}

relativePath, err := filepath.Rel(packagePath, path)
if err != nil {
return errors.Wrapf(err, "finding relative path failed (packagePath: %s, path: %s)", packagePath, path)
}

if relativePath == "." {
return nil
}

header, err := buildArchiveHeader(info, relativePath)
if err != nil {
return errors.Wrapf(err, "building archive header failed (path: %s)", relativePath)
}

err = tarWriter.WriteHeader(header)
if err != nil {
return errors.Wrapf(err, "writing header failed (path: %s)", relativePath)
}

if !info.IsDir() {
err = writeFileContentToArchive(path, tarWriter)
if err != nil {
return errors.Wrapf(err, "archiving file content failed (path: %s)", path)
}
}
return nil
})
if err != nil {
return errors.Wrapf(err, "processing package path '%s' failed", packagePath)
}

err = tarWriter.Flush()
if err != nil {
return errors.Wrap(err, "flushing tar writer failed")
}

err = gzipWriter.Flush()
if err != nil {
return errors.Wrap(err, "flushing gzip writer failed")
}
return nil
}

func buildArchiveHeader(info os.FileInfo, relativePath string) (*tar.Header, error) {
header, err := tar.FileInfoHeader(info, "")
if err != nil {
return nil, errors.Wrapf(err, "reading file info header failed (info: %s)", info.Name())
}

header.Name = relativePath
if info.IsDir() {
header.Name = header.Name + "/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this / dependent on the platfrom or always /?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added an additional condition to check the /.

}
return header, nil
}

func writeFileContentToArchive(path string, writer io.Writer) error {
f, err := os.Open(path)
if err != nil {
return errors.Wrapf(err, "opening file failed (path: %s)", path)
}
defer func() {
err := f.Close()
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using a named return to still return an error in case this fails instead of writing it to the log file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is in defer, so it's not on the direct execution path. Also, if we've already started streaming and something really bad happens, we won't inform the use what happened, we can just cut the response stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I get This code is in defer, so it's not on the direct execution path. You can still assign the error with a named return. So the function that calls "writeFileContentToArchive" can decide if it wants to drop the error or not.

Copy link
Contributor Author

@mtojek mtojek May 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but on the other hand you can overwrite errors that caused the problem: https://play.golang.org/p/glEov8stXWq

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used multierror for this. Fixed.

log.Printf("Error occurred while closing file (path: %s): %v", path, err)
}
}()

_, err = io.Copy(writer, f)
if err != nil {
return errors.Wrapf(err, "copying file content failed (path: %s)", path)
}
return nil
}
59 changes: 59 additions & 0 deletions artifacts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package main
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this code under util so we can reuse it? Lets assume we have one day a tool elastic-package that allows us to validate a package and also build the .tar.gz file to upload manually to Kibana. Would be nice to share this code there. It will probably required to separate the handler code from the building the tar.gz code but I would assume this makes it easier to test if not already the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with moving it to a different place except utils, commons, shared, etc. ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong opinion around the name. I assume this means we should also move the other utils code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire project can be adjusted to the standard layout: https://github.com/golang-standards/project-layout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to "archiver"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM to cleanup to follow the standard layout but lets do it in small steps.


import (
"log"
"net/http"
"os"
"path/filepath"
"time"

"github.com/blang/semver"
"github.com/gorilla/mux"
"github.com/pkg/errors"

"github.com/elastic/package-registry/archiver"
)

const artifactsRouterPath = "/epr/{packageName}/{packageName:[a-z_]+}-{packageVersion}.tar.gz"

var errArtifactNotFound = errors.New("artifact not found")

func artifactsHandler(packagesBasePath string, cacheTime time.Duration) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
packageName := vars["packageName"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to validate that these entries exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Package name is restricted with regex:

const artifactsRouterPath = "/epr/{packageName}/{packageName:[a-z_]+}-{packageVersion}.tar.gz"

Package version is checked with semver.Parse:

		_, err := semver.Parse(packageVersion)
		if err != nil {
			badRequest(w, "invalid package version")
			return
		}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was more referring to check if the key exists: vars["packageName"] but as we have it in the router path I guess this should never happen as otherwise the routing would be wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

packageVersion := vars["packageVersion"]

_, err := semver.Parse(packageVersion)
if err != nil {
badRequest(w, "invalid package version")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to print out the package version that was invalid here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version is in URL and I don't expect that the user visits this endpoint anyway. I would leave it as is.

return
}

packagePath := filepath.Join(packagesBasePath, packageName, packageVersion)
_, err = os.Stat(packagePath)
if os.IsNotExist(err) {
notFoundError(w, errArtifactNotFound)
return
}
if err != nil {
log.Printf("stat package path '%s' failed: %v", packagePath, err)

http.Error(w, "internal server error", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/gzip")
cacheHeaders(w, cacheTime)

err = archiver.ArchivePackage(w, packagePath)
if err != nil {
log.Printf("archiving package path '%s' failed: %v", packagePath, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the error here already part of w so we don't need to have a badRequest here?

Copy link
Contributor Author

@mtojek mtojek May 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the handler has already started streaming response (an archive) and an internal error occurs, you can't do anything in this situation. Headers are already sent.

You can simply cut-off the stream and wait for your upstream dependency to retry the activity (as the package is corrupted).

return
}
}
}
34 changes: 0 additions & 34 deletions dev/generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,40 +211,6 @@ func buildPackage(packagesBasePath string, p util.Package) error {
return err
}
}

if tarGz {
mtojek marked this conversation as resolved.
Show resolved Hide resolved
tarGzDirPath := filepath.Join(packagesBasePath, "..", "epr", p.Name)
err = os.MkdirAll(tarGzDirPath, 0755)
if err != nil {
return err
}

tarGzName := p.Name + "-" + p.Version
dst := filepath.Join(tarGzDirPath, tarGzName)

// As the package directories are now {packagename}/{version} when just running tar, the dir inside
// the package had the wrong name. Using `-s` or `--transform` for some reason worked on the command line
// but not when run through Golang. So the hack for now is to just copy over all files with the correct name
// and then run tar on it.
// This could become even useful in the future as things like images or videos should potentially not be part of
// a tar.gz to keep it small.
src := filepath.Join(packagesBasePath, p.Name, p.Version) + "/"
err := CopyPackage(src, dst)
if err != nil {
return errors.Wrapf(err, "copying package content failed (path: %s)", p.GetPath())
}

err = sh.RunV("tar", "czf", filepath.Join(packagesBasePath, "..", "epr", p.Name, tarGzName+".tar.gz"), "-C", tarGzDirPath, tarGzName+"/")
if err != nil {
return errors.Wrapf(err, "compressing package failed (path: %s)", p.GetPath())
}

err = os.RemoveAll(dst)
if err != nil {
return err
}
}

return nil
}

Expand Down
27 changes: 27 additions & 0 deletions docs/api/example-0.0.2.tar.gz-preview.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
0 docs/
622 docs/README.md
0 elasticsearch/
0 elasticsearch/ingest-pipeline/
892 elasticsearch/ingest-pipeline/pipeline-entry.json
2071 elasticsearch/ingest-pipeline/pipeline-http.json
887 elasticsearch/ingest-pipeline/pipeline-json.json
3584 elasticsearch/ingest-pipeline/pipeline-plaintext.json
900 elasticsearch/ingest-pipeline/pipeline-tcp.json
0 img/
482070 img/kibana-envoyproxy.jpg
1916 index.json
0 kibana/
0 kibana/dashboard/
2221 kibana/dashboard/0c610510-5cbd-11e9-8477-077ec9664dbd.json
0 kibana/index-pattern/
91348 kibana/index-pattern/filebeat.json
0 kibana/infrastructure-ui-source/
797 kibana/infrastructure-ui-source/default.json
0 kibana/visualization/
1863 kibana/visualization/0a994af0-5c9d-11e9-8477-077ec9664dbd.json
1982 kibana/visualization/36f872a0-5c03-11e9-85b4-19d0072eb4f2.json
2572 kibana/visualization/38f96190-5c99-11e9-8477-077ec9664dbd.json
1995 kibana/visualization/7e4084e0-5c99-11e9-8477-077ec9664dbd.json
1849 kibana/visualization/80844540-5c97-11e9-8477-077ec9664dbd.json
1920 kibana/visualization/ab48c3f0-5ca6-11e9-8477-077ec9664dbd.json
319 manifest.yml
2 changes: 1 addition & 1 deletion docs/api/index.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"service.name": "package-registry",
"version": "0.4.0"
}
}
1 change: 1 addition & 0 deletions docs/api/package-invalid-version.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
invalid package version
1 change: 1 addition & 0 deletions docs/api/package-missing.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
artifact not found
1 change: 1 addition & 0 deletions docs/api/package-version-not-found.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
artifact not found
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func getConfig() (*Config, error) {
}

func getRouter(config Config, packagesBasePath string) (*mux.Router, error) {
artifactsHandler := artifactsHandler(packagesBasePath, config.CacheTimeCatchAll)
faviconHandleFunc, err := faviconHandler(config.CacheTimeCatchAll)
if err != nil {
return nil, err
Expand All @@ -136,7 +137,7 @@ func getRouter(config Config, packagesBasePath string) (*mux.Router, error) {
router.HandleFunc("/categories", categoriesHandler(packagesBasePath, config.CacheTimeCategories))
router.HandleFunc("/health", healthHandler)
router.HandleFunc("/favicon.ico", faviconHandleFunc)
router.PathPrefix("/epr").HandlerFunc(catchAll(http.Dir(config.PublicDir), config.CacheTimeCatchAll))
router.HandleFunc(artifactsRouterPath, artifactsHandler)
router.PathPrefix("/package").HandlerFunc(catchAll(http.Dir(config.PublicDir), config.CacheTimeCatchAll))
router.Use(loggingMiddleware)
return router, nil
Expand Down
63 changes: 58 additions & 5 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
package main

import (
"archive/tar"
"bytes"
"compress/gzip"
"flag"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand All @@ -27,7 +31,6 @@ var (
)

func TestEndpoints(t *testing.T) {

publicPath := "./testdata/public"
packagesBasePath := publicPath + "/package"

Expand Down Expand Up @@ -71,6 +74,31 @@ func TestEndpoints(t *testing.T) {
}
}

func TestArtifacts(t *testing.T) {
publicPath := "./testdata/public"
packagesBasePath := publicPath + "/package"

artifactsHandler := artifactsHandler(packagesBasePath, testCacheTime)

tests := []struct {
endpoint string
path string
file string
handler func(w http.ResponseWriter, r *http.Request)
}{
{"/epr/example/example-0.0.2.tar.gz", artifactsRouterPath, "example-0.0.2.tar.gz-preview.txt", artifactsHandler},
{"/epr/example/example-999.0.2.tar.gz", artifactsRouterPath, "package-version-not-found.txt", artifactsHandler},
{"/epr/example/missing-0.1.2.tar.gz", artifactsRouterPath, "package-missing.txt", artifactsHandler},
{"/epr/example/example-a.b.c.tar.gz", artifactsRouterPath, "package-invalid-version.txt", artifactsHandler},
}

for _, test := range tests {
t.Run(test.endpoint, func(t *testing.T) {
runEndpoint(t, test.endpoint, test.path, test.file, test.handler)
})
}
}

func runEndpoint(t *testing.T, endpoint, path, file string, handler func(w http.ResponseWriter, r *http.Request)) {
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
Expand All @@ -89,8 +117,13 @@ func runEndpoint(t *testing.T, endpoint, path, file string, handler func(w http.

fullPath := "./docs/api/" + file

recorded := recorder.Body.Bytes()
if strings.HasSuffix(file, "-preview.txt") {
recorded = listArchivedFiles(t, recorded)
}

if *generateFlag {
err = ioutil.WriteFile(fullPath, recorder.Body.Bytes(), 0644)
err = ioutil.WriteFile(fullPath, recorded, 0644)
if err != nil {
t.Fatal(err)
}
Expand All @@ -101,11 +134,31 @@ func runEndpoint(t *testing.T, endpoint, path, file string, handler func(w http.
t.Fatal(err)
}

assert.Equal(t, strings.TrimSpace(string(data)), strings.TrimSpace(recorder.Body.String()))
assert.Equal(t, bytes.TrimSpace(data), bytes.TrimSpace(recorded))

// Skip cache check if 400 error
if recorder.Code != 400 {
// Skip cache check if 4xx error
if recorder.Code >= 200 && recorder.Code < 300 {
cacheTime := fmt.Sprintf("%.0f", testCacheTime.Seconds())
assert.Equal(t, recorder.Header()["Cache-Control"], []string{"max-age=" + cacheTime, "public"})
}
}

func listArchivedFiles(t *testing.T, body []byte) []byte {
gzippedReader, err := gzip.NewReader(bytes.NewReader(body))
require.NoError(t, err)

tarReader := tar.NewReader(gzippedReader)

var listing bytes.Buffer

for {
header, err := tarReader.Next()
if err == io.EOF {
break
}
require.NoError(t, err)

listing.WriteString(fmt.Sprintf("%d %s\n", header.Size, header.Name))
}
return listing.Bytes()
}
2 changes: 1 addition & 1 deletion util/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewDataset(basePath string, p *Package) (*DataSet, error) {
}
}

if !IsValidRelase(d.Release) {
if !IsValidRelease(d.Release) {
return nil, fmt.Errorf("invalid release: %s", d.Release)
}

Expand Down
Loading