diff --git a/README.md b/README.md index 65a116d..36b4a30 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,16 @@ # :knot: caskadht `caskadht`, pronounced "Cascade-DHT", is a service that: -* exposes an [IPNI-compatible](https://github.com/ipni/specs/blob/main/IPNI.md#get-multihashmultihash) `GET /multihash/` endpoint over HTTP, + +* exposes: + * `GET /routing/v1/providers/` compatible with + IPFS [HTTP delegated routing](https://github.com/ipfs/specs/pull/337), and + * `GET /multihash/` compatible + with [IPNI HTTP query API](https://github.com/ipni/specs/blob/main/IPNI.md#get-multihashmultihash) * cascades lookup requests over the IPFS Kademlia DHT, * uses the accelerated DHT client when possible, and -* steams the results back over `ndjson` whenever the request `Accept`s it or regular JSON otherwise. +* steams the results back over `ndjson` whenever the request `Accept` header permits it, or + non-streaming JSON otherwise. ## Install @@ -38,13 +44,19 @@ To run the `caskadht` HTTP server locally, execute: ```shell $ go run cmd/caskadht/main.go ``` + The above command starts the HTTP API exposed on default listen address: `http://localhost:40080`. -You can then start looking up multihashes, which would cascade onto the DHT. Example: -```shell -$ curl http://localhost:40080/multihash/QmcroxBV9PBPUg2LfeusC25x1C4mckSmQy6hD5rmuvugfj -v +You can then start looking up multihashes, which would cascade onto the DHT. + +To shutdown the server, interrupt the terminal by pressing `Ctrl + C` + +#### Example IPNI `ndjson` response: + +```text +$ curl http://localhost:40080/multihash/QmfQJymEUXsGNzHMmpGYmUcFiAtGw2ia97EXNDVDZbZjgm -v --max-time 1 * Trying 127.0.0.1:40080... * Connected to localhost (127.0.0.1) port 40080 (#0) -> GET /multihash/QmcroxBV9PBPUg2LfeusC25x1C4mckSmQy6hD5rmuvugfj HTTP/1.1 +> GET /multihash/QmfQJymEUXsGNzHMmpGYmUcFiAtGw2ia97EXNDVDZbZjgm HTTP/1.1 > Host: localhost:40080 > User-Agent: curl/7.86.0 > Accept: */* @@ -54,14 +66,53 @@ $ curl http://localhost:40080/multihash/QmcroxBV9PBPUg2LfeusC25x1C4mckSmQy6hD5rm < Connection: Keep-Alive < Content-Type: application/x-ndjson < X-Content-Type-Options: nosniff -< Date: Thu, 26 Jan 2023 12:17:02 GMT +< Date: Sat, 28 Jan 2023 18:06:44 GMT < Transfer-Encoding: chunked < -{"MultihashResults":[{"Multihash":"EiDXvX3xT1nGu2ZNdAM0rjm9g+/GAyirmnV5MfAVHoPLsg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWFRY4zb9Yvh7Pm5itdpVogtw2XDs68VgKwF1SkEy7eiEC","Addrs":[]}}]}]} +{"MultihashResults":[{"Multihash":"EiD9hrEwE3KHRG4OfXY7il3OXAFiW/oAuFBR5Rv0E9CFCg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWHVXoJnv2ifmr9K6LWwJPXxkfvzZRHzjiTZMvybeTnwPy","Addrs":["/ip4/145.40.89.101/tcp/4001","/ip4/145.40.89.101/tcp/4002/ws","/ip4/145.40.89.101/udp/4001/quic","/ip6/2604:1380:45f1:d800::1/tcp/4001","/ip6/2604:1380:45f1:d800::1/tcp/4002/ws","/ip6/2604:1380:45f1:d800::1/udp/4001/quic"]}}]}]} + +{"MultihashResults":[{"Multihash":"EiD9hrEwE3KHRG4OfXY7il3OXAFiW/oAuFBR5Rv0E9CFCg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWDpp7U7W9Q8feMZPPEpPP5FKXTUakLgnVLbavfjb9mzrT","Addrs":["/ip4/147.75.80.75/tcp/4001","/ip4/147.75.80.75/tcp/4002/ws","/ip4/147.75.80.75/udp/4001/quic","/ip6/2604:1380:4601:f600::5/tcp/4001","/ip6/2604:1380:4601:f600::5/tcp/4002/ws","/ip6/2604:1380:4601:f600::5/udp/4001/quic"]}}]}]} +{"MultihashResults":[{"Multihash":"EiD9hrEwE3KHRG4OfXY7il3OXAFiW/oAuFBR5Rv0E9CFCg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWCrBiagtZMzpZePCr1tfBbrZTh4BRQf7JurRqNMRi8YHF","Addrs":["/ip4/147.75.87.65/tcp/4001","/ip4/147.75.87.65/tcp/4002/ws","/ip4/147.75.87.65/udp/4001/quic","/ip6/2604:1380:4601:f600::1/tcp/4001","/ip6/2604:1380:4601:f600::1/tcp/4002/ws","/ip6/2604:1380:4601:f600::1/udp/4001/quic"]}}]}]} + +{"MultihashResults":[{"Multihash":"EiD9hrEwE3KHRG4OfXY7il3OXAFiW/oAuFBR5Rv0E9CFCg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWRNijznEQoXrxBeNLb2TqbSFm8gG8jKtfEsbC1C9nPqce","Addrs":["/ip4/147.75.87.211/tcp/4001","/ip4/147.75.87.211/tcp/4002/ws","/ip4/147.75.87.211/udp/4001/quic","/ip6/2604:1380:4601:f600::3/tcp/4001","/ip6/2604:1380:4601:f600::3/tcp/4002/ws","/ip6/2604:1380:4601:f600::3/udp/4001/quic"]}}]}]} + +* Operation timed out after 1005 milliseconds with 1890 bytes received +* Closing connection 0 +curl: (28) Operation timed out after 1005 milliseconds with 1890 bytes received ``` -To shutdown the server, interrupt the terminal by pressing `Ctrl + C` +#### Example IPFS Delegated Routing `ndjson` Response + +```text +$ curl http://localhost:40080/routing/v1/providers/QmfQJymEUXsGNzHMmpGYmUcFiAtGw2ia97EXNDVDZbZjgm -v --max-time 1 +* Trying 127.0.0.1:40080... +* Connected to localhost (127.0.0.1) port 40080 (#0) +> GET /routing/v1/providers/QmfQJymEUXsGNzHMmpGYmUcFiAtGw2ia97EXNDVDZbZjgm HTTP/1.1 +> Host: localhost:40080 +> User-Agent: curl/7.86.0 +> Accept: */* +> +* Mark bundle as not supporting multiuse +< HTTP/1.1 200 OK +< Connection: Keep-Alive +< Content-Type: application/x-ndjson +< X-Content-Type-Options: nosniff +< Date: Sat, 28 Jan 2023 18:07:40 GMT +< Transfer-Encoding: chunked +< +{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWHVXoJnv2ifmr9K6LWwJPXxkfvzZRHzjiTZMvybeTnwPy","Addrs":["/ip4/145.40.89.101/tcp/4001","/ip4/145.40.89.101/tcp/4002/ws","/ip4/145.40.89.101/udp/4001/quic","/ip6/2604:1380:45f1:d800::1/tcp/4001","/ip6/2604:1380:45f1:d800::1/tcp/4002/ws","/ip6/2604:1380:45f1:d800::1/udp/4001/quic"]} + +{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWDpp7U7W9Q8feMZPPEpPP5FKXTUakLgnVLbavfjb9mzrT","Addrs":["/ip4/147.75.80.75/tcp/4001","/ip4/147.75.80.75/tcp/4002/ws","/ip4/147.75.80.75/udp/4001/quic","/ip6/2604:1380:4601:f600::5/tcp/4001","/ip6/2604:1380:4601:f600::5/tcp/4002/ws","/ip6/2604:1380:4601:f600::5/udp/4001/quic"]} + +{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWCrBiagtZMzpZePCr1tfBbrZTh4BRQf7JurRqNMRi8YHF","Addrs":["/ip4/147.75.87.65/tcp/4001","/ip4/147.75.87.65/tcp/4002/ws","/ip4/147.75.87.65/udp/4001/quic","/ip6/2604:1380:4601:f600::1/tcp/4001","/ip6/2604:1380:4601:f600::1/tcp/4002/ws","/ip6/2604:1380:4601:f600::1/udp/4001/quic"]} + +{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWRNijznEQoXrxBeNLb2TqbSFm8gG8jKtfEsbC1C9nPqce","Addrs":["/ip4/147.75.87.211/tcp/4001","/ip4/147.75.87.211/tcp/4002/ws","/ip4/147.75.87.211/udp/4001/quic","/ip6/2604:1380:4601:f600::3/tcp/4001","/ip6/2604:1380:4601:f600::3/tcp/4002/ws","/ip6/2604:1380:4601:f600::3/udp/4001/quic"]} + +* Operation timed out after 1001 milliseconds with 1378 bytes received +* Closing connection 0 +curl: (28) Operation timed out after 1001 milliseconds with 1378 bytes received +``` ## License diff --git a/caskadht.go b/caskadht.go index b8524a2..7bed469 100644 --- a/caskadht.go +++ b/caskadht.go @@ -2,13 +2,9 @@ package cascadht import ( "context" - "encoding/json" "io" - "mime" "net" "net/http" - "path" - "strings" "github.com/ipfs/go-cid" "github.com/ipfs/go-ipns" @@ -19,7 +15,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "github.com/multiformats/go-multicodec" - "github.com/multiformats/go-multihash" "github.com/multiformats/go-varint" ) @@ -31,39 +26,18 @@ var ( cascadeMetadata = varint.ToUvarint(uint64(multicodec.TransportBitswap)) ) -type ( - Caskadht struct { - *options - std *dht.IpfsDHT - acc *fullrt.FullRT - s *http.Server +type Caskadht struct { + *options + std *dht.IpfsDHT + acc *fullrt.FullRT + s *http.Server - // Context and cancellation used to terminate streaming responses on shutdown. - ctx context.Context - cancel context.CancelFunc - } - - response struct { - MultihashResults []MultihashResult - } - MultihashResult struct { - Multihash multihash.Multihash - ProviderResults []ProviderResult - } - ProviderResult struct { - ContextID []byte - Metadata []byte - Provider peer.AddrInfo - } -) - -const ( - ipfsProtocolPrefix = "/ipfs" + // Context and cancellation used to terminate streaming responses on shutdown. + ctx context.Context + cancel context.CancelFunc +} - mediaTypeNDJson = "application/x-ndjson" - mediaTypeJson = "application/json" - mediaTypeAny = "*/*" -) +const ipfsProtocolPrefix = "/ipfs" func New(o ...Option) (*Caskadht, error) { opts, err := newOptions(o...) @@ -118,6 +92,7 @@ func (c *Caskadht) Start(ctx context.Context) error { func (c *Caskadht) serveMux() *http.ServeMux { mux := http.NewServeMux() mux.HandleFunc("/multihash/", c.handleMhSubtree) + mux.HandleFunc("/routing/v1/providers/", c.handleRoutingV1ProvidersSubtree) mux.HandleFunc("/ready", c.handleReady) mux.HandleFunc("/", c.handleCatchAll) return mux @@ -126,150 +101,67 @@ func (c *Caskadht) serveMux() *http.ServeMux { func (c *Caskadht) handleMhSubtree(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: - c.handleGetMh(w, r) + c.handleLookup(newIPNILookupResponseWriter(w), r) default: discardBody(r) http.Error(w, "", http.StatusNotFound) } } -func (c *Caskadht) handleGetMh(w http.ResponseWriter, r *http.Request) { - discardBody(r) - - smh := strings.TrimPrefix(path.Base(r.URL.Path), "multihash/") - logger := logger.With("mh", smh) - mh, err := multihash.FromB58String(smh) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - okNDJson, okJson, err := acceptsJson(r) - if err != nil { - logger.Debugw("Failed to check accepted response media type", "err", err) - http.Error(w, "invalid Accept header", http.StatusBadRequest) - return - } - flusher, okFlusher := w.(http.Flusher) - if !okFlusher && !okJson && okNDJson { - // Respond with error if the request only accepts ndjson and the server does not support - // streaming. - http.Error(w, "server does not support streaming response", http.StatusBadRequest) - return - } - if !okJson && !okNDJson { - http.Error(w, "media type not supported", http.StatusBadRequest) - return +func (c *Caskadht) handleRoutingV1ProvidersSubtree(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + c.handleLookup(newDelegatedRoutingLookupResponseWriter(w), r) + case http.MethodPut: + discardBody(r) + http.Error(w, "", http.StatusNotImplemented) + default: + discardBody(r) + http.Error(w, "", http.StatusNotFound) } +} - ctx, cancel := context.WithCancel(r.Context()) - key := cid.NewCidV1(cid.Raw, mh) - pch := c.cascadeFindProviders(ctx, key) - // TODO: Decide response media type based on `q` weighting param in Accept. - // For now always prefer streaming responses. - if !okFlusher && okJson { - // We cannot stream results and the client accepts JSON; respond with non-streaming JSON. - res := MultihashResult{ - Multihash: mh, - } - JSON_LOOP: - for { - select { - case <-c.ctx.Done(): - break JSON_LOOP - case provider, ok := <-pch: - if !ok { - break JSON_LOOP - } - res.ProviderResults = append(res.ProviderResults, ProviderResult{ - ContextID: cascadeContextID, - Metadata: cascadeMetadata, - Provider: provider, - }) - } - } - cancel() - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(response{ - MultihashResults: []MultihashResult{res}, - }); err != nil { - logger.Errorw("failed to write provider results", "count", len(res.ProviderResults), "err", err) +func (c *Caskadht) handleLookup(w lookupResponseWriter, r *http.Request) { + if err := w.Accept(r); err != nil { + switch e := err.(type) { + case errHttpResponse: + e.WriteTo(w) + default: + logger.Errorw("Failed to accept lookup request", "err", err) + http.Error(w, "", http.StatusInternalServerError) } return } - w.Header().Set("Content-Type", mediaTypeNDJson) - w.Header().Set("Connection", "Keep-Alive") - w.Header().Set("X-Content-Type-Options", "nosniff") - encoder := json.NewEncoder(w) - var count int - -NDJSON_LOOP: + ctx, cancel := context.WithCancel(r.Context()) + pch := c.cascadeFindProviders(ctx, w.Key()) + defer cancel() +LOOP: for { select { case <-c.ctx.Done(): - break NDJSON_LOOP + logger.Debugw("Interrupted while responding to lookup", "key", w.Key(), "err", ctx.Err()) + break LOOP case provider, ok := <-pch: if !ok { - break NDJSON_LOOP - } - // TODO: Restructure response once there is a more optimal way to stream results. - // See: https://github.com/ipni/specs/issues/8 - if err := encoder.Encode(response{ - MultihashResults: []MultihashResult{ - { - Multihash: mh, - ProviderResults: []ProviderResult{ - { - ContextID: cascadeContextID, - Metadata: cascadeMetadata, - Provider: provider, - }, - }, - }, - }, - }); err != nil { - logger.Errorw("Failed to encode ndjson response", "err", err) - break NDJSON_LOOP + logger.Debugw("No more provider records", "key", w.Key()) + break LOOP } - if _, err := w.Write(newline); err != nil { - logger.Errorw("Failed to encode ndjson response", "err", err) - break NDJSON_LOOP + if err := w.WriteProviderRecord(providerRecord{AddrInfo: provider}); err != nil { + logger.Errorw("Failed to encode provider record", "err", err) + break LOOP } - flusher.Flush() - count++ } } - cancel() - logger.Debugw("Finished streaming results", "count", count) - if count == 0 { - http.Error(w, "", http.StatusNotFound) - } -} - -func acceptsJson(r *http.Request) (ndjson bool, json bool, err error) { - accepts := r.Header.Values("Accept") - var mt string - for _, accept := range accepts { - mt, _, err = mime.ParseMediaType(accept) - if err != nil { - return - } - switch mt { - case mediaTypeNDJson: - ndjson = true - case mediaTypeJson: - json = true - case mediaTypeAny: - ndjson = true - json = true - } - if json && ndjson { - // Return early if both is supported. - return + if err := w.Close(); err != nil { + switch e := err.(type) { + case errHttpResponse: + e.WriteTo(w) + default: + logger.Errorw("Failed to finalize lookup results", "err", err) + http.Error(w, "", http.StatusInternalServerError) } } - return } func (c *Caskadht) cascadeFindProviders(ctx context.Context, key cid.Cid) <-chan peer.AddrInfo { diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..e0a4a76 --- /dev/null +++ b/errors.go @@ -0,0 +1,16 @@ +package cascadht + +import "net/http" + +type errHttpResponse struct { + message string + status int +} + +func (e errHttpResponse) Error() string { + return e.message +} + +func (e errHttpResponse) WriteTo(w http.ResponseWriter) { + http.Error(w, e.message, e.status) +} diff --git a/go.mod b/go.mod index 43cabfc..258efa2 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,12 @@ go 1.19 require ( github.com/ipfs/go-cid v0.3.2 - github.com/ipfs/go-ipns v0.2.0 + github.com/ipfs/go-ipns v0.3.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/libp2p/go-libp2p v0.24.2 github.com/libp2p/go-libp2p-kad-dht v0.20.0 github.com/libp2p/go-libp2p-record v0.2.0 + github.com/multiformats/go-multiaddr v0.8.0 github.com/multiformats/go-multicodec v0.7.0 github.com/multiformats/go-multihash v0.2.1 github.com/multiformats/go-varint v0.0.7 @@ -77,7 +78,6 @@ require ( github.com/mr-tron/base58 v1.2.0 // indirect github.com/multiformats/go-base32 v0.1.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect - github.com/multiformats/go-multiaddr v0.8.0 // indirect github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect github.com/multiformats/go-multibase v0.1.1 // indirect diff --git a/go.sum b/go.sum index d973324..17f23fb 100644 --- a/go.sum +++ b/go.sum @@ -269,8 +269,8 @@ github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1Y github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc= github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8= github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ= -github.com/ipfs/go-ipns v0.2.0 h1:BgmNtQhqOw5XEZ8RAfWEpK4DhqaYiuP6h71MhIp7xXU= -github.com/ipfs/go-ipns v0.2.0/go.mod h1:3cLT2rbvgPZGkHJoPO1YMJeh6LtkxopCkKFcio/wE24= +github.com/ipfs/go-ipns v0.3.0 h1:ai791nTgVo+zTuq2bLvEGmWP1M0A6kGTXUsgv/Yq67A= +github.com/ipfs/go-ipns v0.3.0/go.mod h1:3cLT2rbvgPZGkHJoPO1YMJeh6LtkxopCkKFcio/wE24= github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM= github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8= github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo= diff --git a/response_writer.go b/response_writer.go new file mode 100644 index 0000000..e85cec4 --- /dev/null +++ b/response_writer.go @@ -0,0 +1,25 @@ +package cascadht + +import ( + "io" + "net/http" + + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/peer" +) + +type ( + providerRecord struct { + peer.AddrInfo + } + acceptor interface { + Accept(r *http.Request) error + } + lookupResponseWriter interface { + io.Closer + http.ResponseWriter + acceptor + Key() cid.Cid + WriteProviderRecord(providerRecord) error + } +) diff --git a/response_writer_dr.go b/response_writer_dr.go new file mode 100644 index 0000000..91b6fb4 --- /dev/null +++ b/response_writer_dr.go @@ -0,0 +1,90 @@ +package cascadht + +import ( + "net/http" + "path" + "strings" + + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multicodec" +) + +var ( + _ lookupResponseWriter = (*delegatedRoutingLookupResponseWriter)(nil) + + drProtocolBitswap = multicodec.TransportBitswap.String() +) + +const drSchemaBitswap = "bitswap" + +type ( + delegatedRoutingLookupResponseWriter struct { + jsonAcceptor + key cid.Cid + result drProviderRecords + } + drProviderRecords struct { + Providers []drProviderRecord + } + drProviderRecord struct { + Protocol string + Schema string + ID peer.ID + Addrs []multiaddr.Multiaddr + } +) + +func newDelegatedRoutingLookupResponseWriter(w http.ResponseWriter) lookupResponseWriter { + return &delegatedRoutingLookupResponseWriter{ + jsonAcceptor: newJsonAcceptor(w), + } +} + +func (d *delegatedRoutingLookupResponseWriter) Accept(r *http.Request) error { + if err := d.jsonAcceptor.Accept(r); err != nil { + return err + } + sc := strings.TrimPrefix(path.Base(r.URL.Path), "routing/v1/providers/") + var err error + d.key, err = cid.Decode(sc) + if err != nil { + return errHttpResponse{message: err.Error(), status: http.StatusBadRequest} + } + return nil +} +func (d *delegatedRoutingLookupResponseWriter) Key() cid.Cid { + return d.key +} + +func (d *delegatedRoutingLookupResponseWriter) WriteProviderRecord(provider providerRecord) error { + rec := drProviderRecord{ + Protocol: drProtocolBitswap, + Schema: drSchemaBitswap, + ID: provider.ID, + Addrs: provider.Addrs, + } + if d.nd { + if err := d.encoder.Encode(rec); err != nil { + logger.Errorw("Failed to encode ndjson response", "err", err) + return err + } + if _, err := d.w.Write(newline); err != nil { + logger.Errorw("Failed to encode ndjson response", "err", err) + return err + } + if d.f != nil { + d.f.Flush() + } + } + d.result.Providers = append(d.result.Providers, rec) + return nil +} + +func (d *delegatedRoutingLookupResponseWriter) Close() error { + if d.nd { + return nil + } + return d.encoder.Encode(d.result) +} diff --git a/response_writer_ipni.go b/response_writer_ipni.go new file mode 100644 index 0000000..08158fe --- /dev/null +++ b/response_writer_ipni.go @@ -0,0 +1,100 @@ +package cascadht + +import ( + "net/http" + "path" + "strings" + + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multihash" +) + +var _ lookupResponseWriter = (*ipniLookupResponseWriter)(nil) + +type ( + ipniLookupResponseWriter struct { + jsonAcceptor + result MultihashResult + count int + } + ipniResults struct { + MultihashResults []MultihashResult + } + MultihashResult struct { + Multihash multihash.Multihash + ProviderResults []ProviderResult + } + ProviderResult struct { + ContextID []byte + Metadata []byte + Provider peer.AddrInfo + } +) + +func newIPNILookupResponseWriter(w http.ResponseWriter) lookupResponseWriter { + return &ipniLookupResponseWriter{ + jsonAcceptor: newJsonAcceptor(w), + } +} + +func (i *ipniLookupResponseWriter) Accept(r *http.Request) error { + if err := i.jsonAcceptor.Accept(r); err != nil { + return err + } + smh := strings.TrimPrefix(path.Base(r.URL.Path), "multihash/") + var err error + i.result.Multihash, err = multihash.FromB58String(smh) + if err != nil { + return errHttpResponse{message: err.Error(), status: http.StatusBadRequest} + } + return nil +} + +func (i *ipniLookupResponseWriter) Key() cid.Cid { + return cid.NewCidV1(cid.Raw, i.result.Multihash) +} + +func (i *ipniLookupResponseWriter) WriteProviderRecord(provider providerRecord) error { + rec := ProviderResult{ + ContextID: cascadeContextID, + Metadata: cascadeMetadata, + Provider: provider.AddrInfo, + } + if i.nd { + if err := i.encoder.Encode(ipniResults{ + MultihashResults: []MultihashResult{ + { + Multihash: i.result.Multihash, + ProviderResults: []ProviderResult{rec}, + }, + }, + }); err != nil { + logger.Errorw("Failed to encode ndjson response", "err", err) + return err + } + if _, err := i.w.Write(newline); err != nil { + logger.Errorw("Failed to encode ndjson response", "err", err) + return err + } + if i.f != nil { + i.f.Flush() + } + i.count++ + } + i.result.ProviderResults = append(i.result.ProviderResults, rec) + return nil +} + +func (i *ipniLookupResponseWriter) Close() error { + if i.count == 0 { + return errHttpResponse{status: http.StatusNotFound} + } + logger.Debugw("Finished writing ipni results", "count", i.count) + if i.nd { + return nil + } + return i.encoder.Encode(ipniResults{ + MultihashResults: []MultihashResult{i.result}, + }) +} diff --git a/response_writer_json.go b/response_writer_json.go new file mode 100644 index 0000000..896799a --- /dev/null +++ b/response_writer_json.go @@ -0,0 +1,88 @@ +package cascadht + +import ( + "encoding/json" + "mime" + "net/http" +) + +const ( + mediaTypeNDJson = "application/x-ndjson" + mediaTypeJson = "application/json" + mediaTypeAny = "*/*" +) + +var ( + _ acceptor = (*jsonAcceptor)(nil) + _ http.ResponseWriter = (*jsonAcceptor)(nil) +) + +type jsonAcceptor struct { + w http.ResponseWriter + f http.Flusher + encoder *json.Encoder + nd bool +} + +func newJsonAcceptor(w http.ResponseWriter) jsonAcceptor { + return jsonAcceptor{ + w: w, + encoder: json.NewEncoder(w), + } +} + +func (i *jsonAcceptor) Accept(r *http.Request) error { + accepts := r.Header.Values("Accept") + var okJson bool + for _, accept := range accepts { + mt, _, err := mime.ParseMediaType(accept) + if err != nil { + logger.Debugw("Failed to check accepted response media type", "err", err) + return errHttpResponse{message: "invalid Accept header", status: http.StatusBadRequest} + } + switch mt { + case mediaTypeNDJson: + i.nd = true + case mediaTypeJson: + okJson = true + case mediaTypeAny: + i.nd = true + okJson = true + } + if i.nd && okJson { + break + } + } + + var okFlusher bool + i.f, okFlusher = i.w.(http.Flusher) + if !okFlusher && !okJson && i.nd { + // Respond with error if the request only accepts ndjson and the server does not support + // streaming. + return errHttpResponse{message: "server does not support streaming response", status: http.StatusBadRequest} + } + if !okJson && !i.nd { + return errHttpResponse{message: "media type not supported", status: http.StatusBadRequest} + } + + if i.nd { + i.w.Header().Set("Content-Type", mediaTypeNDJson) + i.w.Header().Set("Connection", "Keep-Alive") + i.w.Header().Set("X-Content-Type-Options", "nosniff") + } else { + i.w.Header().Set("Content-Type", mediaTypeJson) + } + return nil +} + +func (i *jsonAcceptor) Header() http.Header { + return i.w.Header() +} + +func (i *jsonAcceptor) Write(b []byte) (int, error) { + return i.w.Write(b) +} + +func (i *jsonAcceptor) WriteHeader(code int) { + i.w.WriteHeader(code) +}