From c774944b1b1142388f7161085e6e5aff390cfe47 Mon Sep 17 00:00:00 2001 From: Daniel Herman Date: Mon, 23 Nov 2020 19:06:44 -0500 Subject: [PATCH] fix(server): serve artifacts directly from disk to support large artifacts When serving very large artifacts, first loading them into memory can potentially cause the pod to go OOM/crash depending on how much memory is available and what limits have been set. Rather than loading it into memory, we can serve files directly from disk. Fixes #4588 Signed-off-by: Daniel Herman --- server/artifacts/artifact_server.go | 54 ++++++++++++++++------------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/server/artifacts/artifact_server.go b/server/artifacts/artifact_server.go index 38d2359db21d..63888e0c7e15 100644 --- a/server/artifacts/artifact_server.go +++ b/server/artifacts/artifact_server.go @@ -7,7 +7,9 @@ import ( "net/http" "os" "path" + "strconv" "strings" + "time" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" @@ -62,15 +64,12 @@ func (a *ArtifactServer) GetArtifact(w http.ResponseWriter, r *http.Request) { return } - data, filename, err := a.getArtifact(ctx, wf, nodeId, artifactName) + err = a.returnArtifact(ctx, w, r, wf, nodeId, artifactName) if err != nil { a.serverInternalError(err, w) return } - - w.Header().Add("Content-Disposition", fmt.Sprintf(`filename="%s"`, filename)) - a.ok(w, data) } func (a *ArtifactServer) GetArtifactByUID(w http.ResponseWriter, r *http.Request) { @@ -96,15 +95,12 @@ func (a *ArtifactServer) GetArtifactByUID(w http.ResponseWriter, r *http.Request return } - data, filename, err := a.getArtifact(ctx, wf, nodeId, artifactName) + err = a.returnArtifact(ctx, w, r, wf, nodeId, artifactName) if err != nil { a.serverInternalError(err, w) return } - - w.Header().Add("Content-Disposition", fmt.Sprintf(`filename="%s"`, filename)) - a.ok(w, data) } func (a *ArtifactServer) gateKeeping(r *http.Request) (context.Context, error) { @@ -123,50 +119,58 @@ func (a *ArtifactServer) gateKeeping(r *http.Request) (context.Context, error) { return a.gatekeeper.Context(ctx) } -func (a *ArtifactServer) ok(w http.ResponseWriter, data []byte) { - w.WriteHeader(200) - _, err := w.Write(data) - if err != nil { - a.serverInternalError(err, w) - } -} - func (a *ArtifactServer) serverInternalError(err error, w http.ResponseWriter) { w.WriteHeader(500) _, _ = w.Write([]byte(err.Error())) } -func (a *ArtifactServer) getArtifact(ctx context.Context, wf *wfv1.Workflow, nodeId, artifactName string) ([]byte, string, error) { +func (a *ArtifactServer) returnArtifact(ctx context.Context, w http.ResponseWriter, r *http.Request, wf *wfv1.Workflow, nodeId, artifactName string) error { kubeClient := auth.GetKubeClient(ctx) art := wf.Status.Nodes[nodeId].Outputs.GetArtifactByName(artifactName) if art == nil { - return nil, "", fmt.Errorf("artifact not found") + return fmt.Errorf("artifact not found") } driver, err := a.artDriverFactory(art, resources{kubeClient, wf.Namespace}) if err != nil { - return nil, "", err + return err } tmp, err := ioutil.TempFile("/tmp", "artifact") if err != nil { - return nil, "", err + return err } tmpPath := tmp.Name() defer func() { _ = os.Remove(tmpPath) }() err = driver.Load(art, tmpPath) if err != nil { - return nil, "", err + return err + } + + file, err := os.Open(tmpPath) + + if err != nil { + return err } - file, err := ioutil.ReadFile(tmpPath) + defer file.Close() + + stats, err := file.Stat() + if err != nil { - return nil, "", err + return err } - log.WithFields(log.Fields{"size": len(file)}).Debug("Artifact file size") - return file, path.Base(art.GetKey()), nil + contentLength := strconv.FormatInt(stats.Size(), 10) + log.WithFields(log.Fields{"size": contentLength}).Debug("Artifact file size") + + w.Header().Add("Content-Disposition", fmt.Sprintf(`filename="%s"`, path.Base(art.GetKey()))) + w.WriteHeader(200) + + http.ServeContent(w, r, "", time.Time{}, file) + + return nil } func (a *ArtifactServer) getWorkflowAndValidate(ctx context.Context, namespace string, workflowName string) (*wfv1.Workflow, error) {