Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

booster-http: implement IPFS HTTP gateway #1225

Merged
merged 6 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions cmd/booster-bitswap/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/filecoin-project/boost/api"
bclient "github.com/filecoin-project/boost/api/client"
cliutil "github.com/filecoin-project/boost/cli/util"
"github.com/filecoin-project/boost/cmd/booster-bitswap/filters"
"github.com/filecoin-project/boost/cmd/booster-bitswap/remoteblockstore"
"github.com/filecoin-project/boost/cmd/lib/filters"
"github.com/filecoin-project/boost/cmd/lib/remoteblockstore"
"github.com/filecoin-project/boost/metrics"
"github.com/filecoin-project/boostd-data/shared/tracing"
"github.com/filecoin-project/go-jsonrpc"
Expand Down Expand Up @@ -137,7 +137,19 @@ var runCmd = &cli.Command{
}
defer bcloser()

remoteStore := remoteblockstore.NewRemoteBlockstore(bapi)
bitswapBlockMetrics := remoteblockstore.BlockMetrics{
GetRequestCount: metrics.BitswapRblsGetRequestCount,
GetFailResponseCount: metrics.BitswapRblsGetFailResponseCount,
GetSuccessResponseCount: metrics.BitswapRblsGetSuccessResponseCount,
BytesSentCount: metrics.BitswapRblsBytesSentCount,
HasRequestCount: metrics.BitswapRblsHasRequestCount,
HasFailResponseCount: metrics.BitswapRblsHasFailResponseCount,
HasSuccessResponseCount: metrics.BitswapRblsHasSuccessResponseCount,
GetSizeRequestCount: metrics.BitswapRblsGetSizeRequestCount,
GetSizeFailResponseCount: metrics.BitswapRblsGetSizeFailResponseCount,
GetSizeSuccessResponseCount: metrics.BitswapRblsGetSizeSuccessResponseCount,
}
remoteStore := remoteblockstore.NewRemoteBlockstore(bapi, bitswapBlockMetrics)
// Create the server API
port := cctx.Int("port")
repoDir, err := homedir.Expand(cctx.String(FlagRepo.Name))
Expand Down
237 changes: 237 additions & 0 deletions cmd/booster-http/blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package main

// Copied from
// https://github.com/ipfs/go-libipfs/blob/c76138366f1a7c416c481a89b3e71ed29e81a641/examples/gateway/common/blocks.go

import (
"context"
"errors"
"fmt"
gopath "path"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
bsfetcher "github.com/ipfs/go-fetcher/impl/blockservice"
blockstore "github.com/ipfs/go-ipfs-blockstore"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-libipfs/blocks"
"github.com/ipfs/go-libipfs/files"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-namesys"
"github.com/ipfs/go-namesys/resolve"
ipfspath "github.com/ipfs/go-path"
"github.com/ipfs/go-path/resolver"
"github.com/ipfs/go-unixfs"
ufile "github.com/ipfs/go-unixfs/file"
uio "github.com/ipfs/go-unixfs/io"
"github.com/ipfs/go-unixfsnode"
iface "github.com/ipfs/interface-go-ipfs-core"
nsopts "github.com/ipfs/interface-go-ipfs-core/options/namesys"
ifacepath "github.com/ipfs/interface-go-ipfs-core/path"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/schema"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
mc "github.com/multiformats/go-multicodec"
)

type BlocksGateway struct {
blockStore blockstore.Blockstore
blockService blockservice.BlockService
dagService format.DAGService
resolver resolver.Resolver

// Optional routing system to handle /ipns addresses.
namesys namesys.NameSystem
routing routing.ValueStore
}

func NewBlocksGateway(blockService blockservice.BlockService, routing routing.ValueStore) (*BlocksGateway, error) {
// Setup the DAG services, which use the CAR block store.
dagService := merkledag.NewDAGService(blockService)

// Setup the UnixFS resolver.
fetcherConfig := bsfetcher.NewFetcherConfig(blockService)
fetcherConfig.PrototypeChooser = dagpb.AddSupportToChooser(func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) {
if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok {
return tlnkNd.LinkTargetNodePrototype(), nil
}
return basicnode.Prototype.Any, nil
})
fetcher := fetcherConfig.WithReifier(unixfsnode.Reify)
resolver := resolver.NewBasicResolver(fetcher)

// Setup a name system so that we are able to resolve /ipns links.
var (
ns namesys.NameSystem
err error
)
if routing != nil {
ns, err = namesys.NewNameSystem(routing)
if err != nil {
return nil, err
}
}

return &BlocksGateway{
blockStore: blockService.Blockstore(),
blockService: blockService,
dagService: dagService,
resolver: resolver,
routing: routing,
namesys: ns,
}, nil
}

func (api *BlocksGateway) GetUnixFsNode(ctx context.Context, p ifacepath.Resolved) (files.Node, error) {
nd, err := api.resolveNode(ctx, p)
if err != nil {
return nil, err
}

return ufile.NewUnixfsFile(ctx, api.dagService, nd)
}

func (api *BlocksGateway) LsUnixFsDir(ctx context.Context, p ifacepath.Resolved) (<-chan iface.DirEntry, error) {
node, err := api.resolveNode(ctx, p)
if err != nil {
return nil, err
}

dir, err := uio.NewDirectoryFromNode(api.dagService, node)
if err != nil {
return nil, err
}

out := make(chan iface.DirEntry, uio.DefaultShardWidth)

go func() {
defer close(out)
for l := range dir.EnumLinksAsync(ctx) {
select {
case out <- api.processLink(ctx, l):
case <-ctx.Done():
return
}
}
}()

return out, nil
}

func (api *BlocksGateway) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
return api.blockService.GetBlock(ctx, c)
}

func (api *BlocksGateway) GetIPNSRecord(ctx context.Context, c cid.Cid) ([]byte, error) {
if api.routing == nil {
return nil, routing.ErrNotSupported
}

// Fails fast if the CID is not an encoded Libp2p Key, avoids wasteful
// round trips to the remote routing provider.
if mc.Code(c.Type()) != mc.Libp2pKey {
return nil, errors.New("provided cid is not an encoded libp2p key")
}

// The value store expects the key itself to be encoded as a multihash.
id, err := peer.FromCid(c)
if err != nil {
return nil, err
}

return api.routing.GetValue(ctx, "/ipns/"+string(id))
}

func (api *BlocksGateway) GetDNSLinkRecord(ctx context.Context, hostname string) (ifacepath.Path, error) {
if api.namesys != nil {
p, err := api.namesys.Resolve(ctx, "/ipns/"+hostname, nsopts.Depth(1))
if err == namesys.ErrResolveRecursion {
err = nil
}
return ifacepath.New(p.String()), err
}

return nil, errors.New("not implemented")
}

func (api *BlocksGateway) IsCached(ctx context.Context, p ifacepath.Path) bool {
rp, err := api.ResolvePath(ctx, p)
if err != nil {
return false
}

has, _ := api.blockStore.Has(ctx, rp.Cid())
return has
}

func (api *BlocksGateway) ResolvePath(ctx context.Context, p ifacepath.Path) (ifacepath.Resolved, error) {
if _, ok := p.(ifacepath.Resolved); ok {
return p.(ifacepath.Resolved), nil
}

err := p.IsValid()
if err != nil {
return nil, err
}

ipath := ipfspath.Path(p.String())
if ipath.Segments()[0] == "ipns" {
ipath, err = resolve.ResolveIPNS(ctx, api.namesys, ipath)
if err != nil {
return nil, err
}
}

if ipath.Segments()[0] != "ipfs" {
return nil, fmt.Errorf("unsupported path namespace: %s", p.Namespace())
}

node, rest, err := api.resolver.ResolveToLastNode(ctx, ipath)
if err != nil {
return nil, err
}

root, err := cid.Parse(ipath.Segments()[1])
if err != nil {
return nil, err
}

return ifacepath.NewResolvedPath(ipath, node, root, gopath.Join(rest...)), nil
}

func (api *BlocksGateway) resolveNode(ctx context.Context, p ifacepath.Path) (format.Node, error) {
rp, err := api.ResolvePath(ctx, p)
if err != nil {
return nil, err
}

node, err := api.dagService.Get(ctx, rp.Cid())
if err != nil {
return nil, fmt.Errorf("get node: %w", err)
}
return node, nil
}

func (api *BlocksGateway) processLink(ctx context.Context, result unixfs.LinkResult) iface.DirEntry {
if result.Err != nil {
return iface.DirEntry{Err: result.Err}
}

link := iface.DirEntry{
Name: result.Link.Name,
Cid: result.Link.Cid,
}

switch link.Cid.Type() {
case cid.Raw:
link.Type = iface.TFile
link.Size = result.Link.Size
case cid.DagProtobuf:
link.Size = result.Link.Size
}

return link
}
101 changes: 101 additions & 0 deletions cmd/booster-http/gateway_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package main

import (
"fmt"
"github.com/ipfs/go-libipfs/gateway"
"mime"
"net/http"
"strings"
)

type gatewayHandler struct {
gwh http.Handler
supportedFormats map[string]struct{}
}

func newGatewayHandler(gw *BlocksGateway, supportedFormats []string) http.Handler {
headers := map[string][]string{}
gateway.AddAccessControlHeaders(headers)

fmtsMap := make(map[string]struct{}, len(supportedFormats))
for _, f := range supportedFormats {
fmtsMap[f] = struct{}{}
}

return &gatewayHandler{
gwh: gateway.NewHandler(gateway.Config{Headers: headers}, gw),
supportedFormats: fmtsMap,
}
}

func (h *gatewayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
responseFormat, _, err := customResponseFormat(r)
if err != nil {
webError(w, fmt.Errorf("error while processing the Accept header: %w", err), http.StatusBadRequest)
return
}

if _, ok := h.supportedFormats[responseFormat]; !ok {
if responseFormat == "" {
responseFormat = "unixfs"
}
webError(w, fmt.Errorf("unsupported response format: %s", responseFormat), http.StatusBadRequest)
return
}

h.gwh.ServeHTTP(w, r)
}

func webError(w http.ResponseWriter, err error, code int) {
http.Error(w, err.Error(), code)
}

// Unfortunately this function is not exported from go-libipfs so we need to copy it here.
// return explicit response format if specified in request as query parameter or via Accept HTTP header
func customResponseFormat(r *http.Request) (mediaType string, params map[string]string, err error) {
if formatParam := r.URL.Query().Get("format"); formatParam != "" {
// translate query param to a content type
switch formatParam {
case "raw":
return "application/vnd.ipld.raw", nil, nil
case "car":
return "application/vnd.ipld.car", nil, nil
case "tar":
return "application/x-tar", nil, nil
case "json":
return "application/json", nil, nil
case "cbor":
return "application/cbor", nil, nil
case "dag-json":
return "application/vnd.ipld.dag-json", nil, nil
case "dag-cbor":
return "application/vnd.ipld.dag-cbor", nil, nil
case "ipns-record":
return "application/vnd.ipfs.ipns-record", nil, nil
}
}
// Browsers and other user agents will send Accept header with generic types like:
// Accept:text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8
// We only care about explicit, vendor-specific content-types and respond to the first match (in order).
// TODO: make this RFC compliant and respect weights (eg. return CAR for Accept:application/vnd.ipld.dag-json;q=0.1,application/vnd.ipld.car;q=0.2)
for _, header := range r.Header.Values("Accept") {
for _, value := range strings.Split(header, ",") {
accept := strings.TrimSpace(value)
// respond to the very first matching content type
if strings.HasPrefix(accept, "application/vnd.ipld") ||
strings.HasPrefix(accept, "application/x-tar") ||
strings.HasPrefix(accept, "application/json") ||
strings.HasPrefix(accept, "application/cbor") ||
strings.HasPrefix(accept, "application/vnd.ipfs") {
mediatype, params, err := mime.ParseMediaType(accept)
if err != nil {
return "", nil, err
}
return mediatype, params, nil
}
}
}
// If none of special-cased content types is found, return empty string
// to indicate default, implicit UnixFS response should be prepared
return "", nil, nil
}
Loading