Skip to content

Commit

Permalink
Readall when serving image. (#6201)
Browse files Browse the repository at this point in the history
  • Loading branch information
stereosteve authored Oct 3, 2023
1 parent 1362f0e commit 38525ef
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 153 deletions.
10 changes: 5 additions & 5 deletions mediorum/server/serve_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"golang.org/x/exp/slices"
)

func (ss *MediorumServer) getBlobLocation(c echo.Context) error {
func (ss *MediorumServer) serveBlobLocation(c echo.Context) error {
cid := c.Param("cid")
preferred, _ := ss.rendezvousAllHosts(cid)

Expand Down Expand Up @@ -92,7 +92,7 @@ func (ss *MediorumServer) getBlobLocation(c echo.Context) error {
})
}

func (ss *MediorumServer) getBlobInfo(c echo.Context) error {
func (ss *MediorumServer) serveBlobInfo(c echo.Context) error {
ctx := c.Request().Context()
cid := c.Param("cid")
key := cidutil.ShardCID(cid)
Expand Down Expand Up @@ -129,7 +129,7 @@ func (ss *MediorumServer) ensureNotDelisted(next echo.HandlerFunc) echo.HandlerF
}
}

func (ss *MediorumServer) getBlob(c echo.Context) error {
func (ss *MediorumServer) serveBlob(c echo.Context) error {
ctx := c.Request().Context()
cid := c.Param("cid")

Expand Down Expand Up @@ -388,7 +388,7 @@ func (s *MediorumServer) requireRegisteredSignature(next echo.HandlerFunc) echo.
}
}

func (ss *MediorumServer) serveInternalBlobPull(c echo.Context) error {
func (ss *MediorumServer) serveInternalBlobGET(c echo.Context) error {
ctx := c.Request().Context()
cid := c.Param("cid")
key := cidutil.ShardCID(cid)
Expand All @@ -402,7 +402,7 @@ func (ss *MediorumServer) serveInternalBlobPull(c echo.Context) error {
return c.Stream(200, blob.ContentType(), blob)
}

func (ss *MediorumServer) postBlob(c echo.Context) error {
func (ss *MediorumServer) serveInternalBlobPOST(c echo.Context) error {
if !ss.diskHasSpace() {
return c.String(http.StatusServiceUnavailable, "disk is too full to accept new blobs")
}
Expand Down
145 changes: 145 additions & 0 deletions mediorum/server/serve_image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package server

import (
"fmt"
"io"
"mediorum/cidutil"
"mime"
"strings"
"time"

"github.com/labstack/echo/v4"
"gocloud.dev/blob"
)

func (ss *MediorumServer) serveImage(c echo.Context) error {
// images are small enough to always serve all at once (no 206 range responses)
c.Request().Header.Del("Range")

ctx := c.Request().Context()
jobID := c.Param("jobID")
variant := c.Param("variant")
isOriginalJpg := variant == "original.jpg"

// helper function... only sets cache-control header on success
serveSuccessWithReader := func(blob *blob.Reader) error {
blobData, err := io.ReadAll(blob)
if err != nil {
return err
}
blob.Close()
c.Response().Header().Set(echo.HeaderCacheControl, "public, max-age=2592000, immutable")
return c.Blob(200, blob.ContentType(), blobData)
}

serveSuccess := func(blobPath string) error {
if blob, err := ss.bucket.NewReader(ctx, blobPath, nil); err == nil {
return serveSuccessWithReader(blob)
} else {
return err
}
}

// if the client provided a filename, set it in the header to be auto-populated in download prompt
filenameForDownload := c.QueryParam("filename")
if filenameForDownload != "" {
contentDisposition := mime.QEncoding.Encode("utf-8", filenameForDownload)
c.Response().Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, contentDisposition))
}

// 1. resolve ulid to upload cid
if cid, err := ss.getUploadOrigCID(jobID); err == nil {
c.Response().Header().Set("x-orig-cid", cid)
jobID = cid
}

// 2. if is ba ... serve variant
if strings.HasPrefix(jobID, "ba") {
baCid := jobID

var variantStoragePath string

// parse 150x150 dimensions
// while still allowing original.jpg
w, h, err := parseVariantSize(variant)
if err == nil {
variantStoragePath = cidutil.ImageVariantPath(baCid, variant)
} else if isOriginalJpg {
variantStoragePath = cidutil.ShardCID(baCid)
} else {
return c.String(400, err.Error())
}

c.Response().Header().Set("x-variant-storage-path", variantStoragePath)

// we already have the resized version
if blob, err := ss.bucket.NewReader(ctx, variantStoragePath, nil); err == nil {
return serveSuccessWithReader(blob)
}

// open the orig for resizing
origReader, err := ss.bucket.NewReader(ctx, cidutil.ShardCID(baCid), nil)

// if we don't have orig, fetch from network
if err != nil {
startFetch := time.Now()
host, err := ss.findAndPullBlob(ctx, baCid)
if err != nil {
return c.String(404, err.Error())
}

c.Response().Header().Set("x-fetch-host", host)
c.Response().Header().Set("x-fetch-ok", fmt.Sprintf("%.2fs", time.Since(startFetch).Seconds()))

origReader, err = ss.bucket.NewReader(ctx, cidutil.ShardCID(baCid), nil)
if err != nil {
return err
}
}

// do resize if not original.jpg
if !isOriginalJpg {
resizeStart := time.Now()
resized, _, _ := Resized(".jpg", origReader, w, h, "fill")
w, _ := ss.bucket.NewWriter(ctx, variantStoragePath, nil)
io.Copy(w, resized)
w.Close()
c.Response().Header().Set("x-resize-ok", fmt.Sprintf("%.2fs", time.Since(resizeStart).Seconds()))
}
origReader.Close()

// ... serve it
return serveSuccess(variantStoragePath)
}

// 3. if Qm ... serve legacy
if cidutil.IsLegacyCID(jobID) {

// storage path for Qm images is like:
// QmDirMultihash/150x150.jpg
// (no sharding)
key := jobID + "/" + variant

c.Response().Header().Set("x-variant-storage-path", key)

// have it
if blob, err := ss.bucket.NewReader(ctx, key, nil); err == nil {
return serveSuccessWithReader(blob)
}

// pull blob from another host and store it at key
// keys like QmDirMultiHash/150x150.jpg works fine with findNodeToServeBlob
startFetch := time.Now()
host, err := ss.findAndPullBlob(ctx, key)
if err != nil {
return c.String(404, err.Error())
}

c.Response().Header().Set("x-fetch-host", host)
c.Response().Header().Set("x-fetch-ok", fmt.Sprintf("%.2fs", time.Since(startFetch).Seconds()))
return serveSuccess(key)
}

msg := fmt.Sprintf("variant %s not found for upload %s", variant, jobID)
return c.String(400, msg)
}
134 changes: 1 addition & 133 deletions mediorum/server/serve_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,22 @@ package server
import (
"database/sql"
"fmt"
"io"
"mediorum/cidutil"
"mime"
"net/http"
"strconv"
"strings"
"time"

"github.com/labstack/echo/v4"
"github.com/oklog/ulid/v2"
"gocloud.dev/blob"
"golang.org/x/sync/errgroup"
)

var (
filesFormFieldName = "files"
)

func (ss *MediorumServer) getUpload(c echo.Context) error {
func (ss *MediorumServer) serveUploadDetail(c echo.Context) error {
var upload *Upload
err := ss.crud.DB.First(&upload, "id = ?", c.Param("id")).Error
if err != nil {
Expand Down Expand Up @@ -218,132 +215,3 @@ func (ss *MediorumServer) postUpload(c echo.Context) error {

return c.JSON(status, uploads)
}

func (ss *MediorumServer) getBlobByJobIDAndVariant(c echo.Context) error {
// images are small enough to always serve all at once (no 206 range responses)
c.Request().Header.Del("Range")

ctx := c.Request().Context()
jobID := c.Param("jobID")
variant := c.Param("variant")
isOriginalJpg := variant == "original.jpg"

// helper function... only sets cache-control header on success
serveSuccessWithReader := func(blob *blob.Reader) error {
name := fmt.Sprintf("%s/%s", jobID, variant)
c.Response().Header().Set(echo.HeaderCacheControl, "public, max-age=2592000, immutable")
http.ServeContent(c.Response(), c.Request(), name, blob.ModTime(), blob)
return blob.Close()
}

serveSuccess := func(blobPath string) error {
if blob, err := ss.bucket.NewReader(ctx, blobPath, nil); err == nil {
return serveSuccessWithReader(blob)
} else {
return err
}
}

// if the client provided a filename, set it in the header to be auto-populated in download prompt
filenameForDownload := c.QueryParam("filename")
if filenameForDownload != "" {
contentDisposition := mime.QEncoding.Encode("utf-8", filenameForDownload)
c.Response().Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, contentDisposition))
}

// 1. resolve ulid to upload cid
if cid, err := ss.getUploadOrigCID(jobID); err == nil {
c.Response().Header().Set("x-orig-cid", cid)
jobID = cid
}

// 2. if is ba ... serve variant
if strings.HasPrefix(jobID, "ba") {
baCid := jobID

var variantStoragePath string

// parse 150x150 dimensions
// while still allowing original.jpg
w, h, err := parseVariantSize(variant)
if err == nil {
variantStoragePath = cidutil.ImageVariantPath(baCid, variant)
} else if isOriginalJpg {
variantStoragePath = cidutil.ShardCID(baCid)
} else {
return c.String(400, err.Error())
}

c.Response().Header().Set("x-variant-storage-path", variantStoragePath)

// we already have the resized version
if blob, err := ss.bucket.NewReader(ctx, variantStoragePath, nil); err == nil {
return serveSuccessWithReader(blob)
}

// open the orig for resizing
origReader, err := ss.bucket.NewReader(ctx, cidutil.ShardCID(baCid), nil)

// if we don't have orig, fetch from network
if err != nil {
startFetch := time.Now()
host, err := ss.findAndPullBlob(ctx, baCid)
if err != nil {
return c.String(404, err.Error())
}

c.Response().Header().Set("x-fetch-host", host)
c.Response().Header().Set("x-fetch-ok", fmt.Sprintf("%.2fs", time.Since(startFetch).Seconds()))

origReader, err = ss.bucket.NewReader(ctx, cidutil.ShardCID(baCid), nil)
if err != nil {
return err
}
}

// do resize if not original.jpg
if !isOriginalJpg {
resizeStart := time.Now()
resized, _, _ := Resized(".jpg", origReader, w, h, "fill")
w, _ := ss.bucket.NewWriter(ctx, variantStoragePath, nil)
io.Copy(w, resized)
w.Close()
c.Response().Header().Set("x-resize-ok", fmt.Sprintf("%.2fs", time.Since(resizeStart).Seconds()))
}
origReader.Close()

// ... serve it
return serveSuccess(variantStoragePath)
}

// 3. if Qm ... serve legacy
if cidutil.IsLegacyCID(jobID) {

// storage path for Qm images is like:
// QmDirMultihash/150x150.jpg
// (no sharding)
key := jobID + "/" + variant

c.Response().Header().Set("x-variant-storage-path", key)

// have it
if blob, err := ss.bucket.NewReader(ctx, key, nil); err == nil {
return serveSuccessWithReader(blob)
}

// pull blob from another host and store it at key
// keys like QmDirMultiHash/150x150.jpg works fine with findNodeToServeBlob
startFetch := time.Now()
host, err := ss.findAndPullBlob(ctx, key)
if err != nil {
return c.String(404, err.Error())
}

c.Response().Header().Set("x-fetch-host", host)
c.Response().Header().Set("x-fetch-ok", fmt.Sprintf("%.2fs", time.Since(startFetch).Seconds()))
return serveSuccess(key)
}

msg := fmt.Sprintf("variant %s not found for upload %s", variant, jobID)
return c.String(400, msg)
}
Loading

0 comments on commit 38525ef

Please sign in to comment.