Skip to content

Commit

Permalink
Implement ndjson HTTP delegated routing GET API
Browse files Browse the repository at this point in the history
Implement the streaming API for HTTP delegated routing. The lookup
response can now be encoded as IPNI response payload or HTTP delegated
routing `GET` provider records.
  • Loading branch information
masih committed Jan 28, 2023
1 parent aa603a6 commit 73bf595
Show file tree
Hide file tree
Showing 9 changed files with 432 additions and 170 deletions.
69 changes: 60 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
# :knot: caskadht

`caskadht`, pronounced "Cascade-DHT", is a service that:
* exposes an [IPNI-compatible](https://github.com/ipni/specs/blob/main/IPNI.md#get-multihashmultihash) `GET /multihash/<multihash>` endpoint over HTTP,

* exposes:
* `GET /routing/v1/providers/<cid>` compatible with
IPFS [HTTP delegated routing](https://github.com/ipfs/specs/pull/337), and
* `GET /multihash/<multihash>` compatible
with [IPNI HTTP query API](https://github.com/ipni/specs/blob/main/IPNI.md#get-multihashmultihash)
* cascades lookup requests over the IPFS Kademlia DHT,
* uses the accelerated DHT client when possible, and
* steams the results back over `ndjson` whenever the request `Accept`s it or regular JSON otherwise.
* steams the results back over `ndjson` whenever the request `Accept` header permits it, or
non-streaming JSON otherwise.

## Install

Expand Down Expand Up @@ -38,13 +44,19 @@ To run the `caskadht` HTTP server locally, execute:
```shell
$ go run cmd/caskadht/main.go
```
The above command starts the HTTP API exposed on default listen address: `http://localhost:40080`.
You can then start looking up multihashes, which would cascade onto the DHT. Example:
```shell
$ curl http://localhost:40080/multihash/QmcroxBV9PBPUg2LfeusC25x1C4mckSmQy6hD5rmuvugfj -v
You can then start looking up multihashes, which would cascade onto the DHT.
To shutdown the server, interrupt the terminal by pressing `Ctrl + C`
#### Example IPNI `ndjson` response:
```text
$ curl http://localhost:40080/multihash/QmfQJymEUXsGNzHMmpGYmUcFiAtGw2ia97EXNDVDZbZjgm -v --max-time 1
* Trying 127.0.0.1:40080...
* Connected to localhost (127.0.0.1) port 40080 (#0)
> GET /multihash/QmcroxBV9PBPUg2LfeusC25x1C4mckSmQy6hD5rmuvugfj HTTP/1.1
> GET /multihash/QmfQJymEUXsGNzHMmpGYmUcFiAtGw2ia97EXNDVDZbZjgm HTTP/1.1
> Host: localhost:40080
> User-Agent: curl/7.86.0
> Accept: */*
Expand All @@ -54,14 +66,53 @@ $ curl http://localhost:40080/multihash/QmcroxBV9PBPUg2LfeusC25x1C4mckSmQy6hD5rm
< Connection: Keep-Alive
< Content-Type: application/x-ndjson
< X-Content-Type-Options: nosniff
< Date: Thu, 26 Jan 2023 12:17:02 GMT
< Date: Sat, 28 Jan 2023 18:06:44 GMT
< Transfer-Encoding: chunked
<
{"MultihashResults":[{"Multihash":"EiDXvX3xT1nGu2ZNdAM0rjm9g+/GAyirmnV5MfAVHoPLsg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWFRY4zb9Yvh7Pm5itdpVogtw2XDs68VgKwF1SkEy7eiEC","Addrs":[]}}]}]}
{"MultihashResults":[{"Multihash":"EiD9hrEwE3KHRG4OfXY7il3OXAFiW/oAuFBR5Rv0E9CFCg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWHVXoJnv2ifmr9K6LWwJPXxkfvzZRHzjiTZMvybeTnwPy","Addrs":["/ip4/145.40.89.101/tcp/4001","/ip4/145.40.89.101/tcp/4002/ws","/ip4/145.40.89.101/udp/4001/quic","/ip6/2604:1380:45f1:d800::1/tcp/4001","/ip6/2604:1380:45f1:d800::1/tcp/4002/ws","/ip6/2604:1380:45f1:d800::1/udp/4001/quic"]}}]}]}

{"MultihashResults":[{"Multihash":"EiD9hrEwE3KHRG4OfXY7il3OXAFiW/oAuFBR5Rv0E9CFCg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWDpp7U7W9Q8feMZPPEpPP5FKXTUakLgnVLbavfjb9mzrT","Addrs":["/ip4/147.75.80.75/tcp/4001","/ip4/147.75.80.75/tcp/4002/ws","/ip4/147.75.80.75/udp/4001/quic","/ip6/2604:1380:4601:f600::5/tcp/4001","/ip6/2604:1380:4601:f600::5/tcp/4002/ws","/ip6/2604:1380:4601:f600::5/udp/4001/quic"]}}]}]}

{"MultihashResults":[{"Multihash":"EiD9hrEwE3KHRG4OfXY7il3OXAFiW/oAuFBR5Rv0E9CFCg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWCrBiagtZMzpZePCr1tfBbrZTh4BRQf7JurRqNMRi8YHF","Addrs":["/ip4/147.75.87.65/tcp/4001","/ip4/147.75.87.65/tcp/4002/ws","/ip4/147.75.87.65/udp/4001/quic","/ip6/2604:1380:4601:f600::1/tcp/4001","/ip6/2604:1380:4601:f600::1/tcp/4002/ws","/ip6/2604:1380:4601:f600::1/udp/4001/quic"]}}]}]}

{"MultihashResults":[{"Multihash":"EiD9hrEwE3KHRG4OfXY7il3OXAFiW/oAuFBR5Rv0E9CFCg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWRNijznEQoXrxBeNLb2TqbSFm8gG8jKtfEsbC1C9nPqce","Addrs":["/ip4/147.75.87.211/tcp/4001","/ip4/147.75.87.211/tcp/4002/ws","/ip4/147.75.87.211/udp/4001/quic","/ip6/2604:1380:4601:f600::3/tcp/4001","/ip6/2604:1380:4601:f600::3/tcp/4002/ws","/ip6/2604:1380:4601:f600::3/udp/4001/quic"]}}]}]}

* Operation timed out after 1005 milliseconds with 1890 bytes received
* Closing connection 0
curl: (28) Operation timed out after 1005 milliseconds with 1890 bytes received
```
To shutdown the server, interrupt the terminal by pressing `Ctrl + C`
#### Example IPFS Delegated Routing `ndjson` Response
```text
$ curl http://localhost:40080/routing/v1/providers/QmfQJymEUXsGNzHMmpGYmUcFiAtGw2ia97EXNDVDZbZjgm -v --max-time 1
* Trying 127.0.0.1:40080...
* Connected to localhost (127.0.0.1) port 40080 (#0)
> GET /routing/v1/providers/QmfQJymEUXsGNzHMmpGYmUcFiAtGw2ia97EXNDVDZbZjgm HTTP/1.1
> Host: localhost:40080
> User-Agent: curl/7.86.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Connection: Keep-Alive
< Content-Type: application/x-ndjson
< X-Content-Type-Options: nosniff
< Date: Sat, 28 Jan 2023 18:07:40 GMT
< Transfer-Encoding: chunked
<
{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWHVXoJnv2ifmr9K6LWwJPXxkfvzZRHzjiTZMvybeTnwPy","Addrs":["/ip4/145.40.89.101/tcp/4001","/ip4/145.40.89.101/tcp/4002/ws","/ip4/145.40.89.101/udp/4001/quic","/ip6/2604:1380:45f1:d800::1/tcp/4001","/ip6/2604:1380:45f1:d800::1/tcp/4002/ws","/ip6/2604:1380:45f1:d800::1/udp/4001/quic"]}

{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWDpp7U7W9Q8feMZPPEpPP5FKXTUakLgnVLbavfjb9mzrT","Addrs":["/ip4/147.75.80.75/tcp/4001","/ip4/147.75.80.75/tcp/4002/ws","/ip4/147.75.80.75/udp/4001/quic","/ip6/2604:1380:4601:f600::5/tcp/4001","/ip6/2604:1380:4601:f600::5/tcp/4002/ws","/ip6/2604:1380:4601:f600::5/udp/4001/quic"]}

{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWCrBiagtZMzpZePCr1tfBbrZTh4BRQf7JurRqNMRi8YHF","Addrs":["/ip4/147.75.87.65/tcp/4001","/ip4/147.75.87.65/tcp/4002/ws","/ip4/147.75.87.65/udp/4001/quic","/ip6/2604:1380:4601:f600::1/tcp/4001","/ip6/2604:1380:4601:f600::1/tcp/4002/ws","/ip6/2604:1380:4601:f600::1/udp/4001/quic"]}

{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWRNijznEQoXrxBeNLb2TqbSFm8gG8jKtfEsbC1C9nPqce","Addrs":["/ip4/147.75.87.211/tcp/4001","/ip4/147.75.87.211/tcp/4002/ws","/ip4/147.75.87.211/udp/4001/quic","/ip6/2604:1380:4601:f600::3/tcp/4001","/ip6/2604:1380:4601:f600::3/tcp/4002/ws","/ip6/2604:1380:4601:f600::3/udp/4001/quic"]}

* Operation timed out after 1001 milliseconds with 1378 bytes received
* Closing connection 0
curl: (28) Operation timed out after 1001 milliseconds with 1378 bytes received
```
## License
Expand Down
206 changes: 49 additions & 157 deletions caskadht.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@ package cascadht

import (
"context"
"encoding/json"
"io"
"mime"
"net"
"net/http"
"path"
"strings"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipns"
Expand All @@ -19,7 +15,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-varint"
)

Expand All @@ -31,39 +26,18 @@ var (
cascadeMetadata = varint.ToUvarint(uint64(multicodec.TransportBitswap))
)

type (
Caskadht struct {
*options
std *dht.IpfsDHT
acc *fullrt.FullRT
s *http.Server
type Caskadht struct {
*options
std *dht.IpfsDHT
acc *fullrt.FullRT
s *http.Server

// Context and cancellation used to terminate streaming responses on shutdown.
ctx context.Context
cancel context.CancelFunc
}

response struct {
MultihashResults []MultihashResult
}
MultihashResult struct {
Multihash multihash.Multihash
ProviderResults []ProviderResult
}
ProviderResult struct {
ContextID []byte
Metadata []byte
Provider peer.AddrInfo
}
)

const (
ipfsProtocolPrefix = "/ipfs"
// Context and cancellation used to terminate streaming responses on shutdown.
ctx context.Context
cancel context.CancelFunc
}

mediaTypeNDJson = "application/x-ndjson"
mediaTypeJson = "application/json"
mediaTypeAny = "*/*"
)
const ipfsProtocolPrefix = "/ipfs"

func New(o ...Option) (*Caskadht, error) {
opts, err := newOptions(o...)
Expand Down Expand Up @@ -118,6 +92,7 @@ func (c *Caskadht) Start(ctx context.Context) error {
func (c *Caskadht) serveMux() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/multihash/", c.handleMhSubtree)
mux.HandleFunc("/routing/v1/providers/", c.handleRoutingV1ProvidersSubtree)
mux.HandleFunc("/ready", c.handleReady)
mux.HandleFunc("/", c.handleCatchAll)
return mux
Expand All @@ -126,150 +101,67 @@ func (c *Caskadht) serveMux() *http.ServeMux {
func (c *Caskadht) handleMhSubtree(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
c.handleGetMh(w, r)
c.handleLookup(newIPNILookupResponseWriter(w), r)
default:
discardBody(r)
http.Error(w, "", http.StatusNotFound)
}
}

func (c *Caskadht) handleGetMh(w http.ResponseWriter, r *http.Request) {
discardBody(r)

smh := strings.TrimPrefix(path.Base(r.URL.Path), "multihash/")
logger := logger.With("mh", smh)
mh, err := multihash.FromB58String(smh)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

okNDJson, okJson, err := acceptsJson(r)
if err != nil {
logger.Debugw("Failed to check accepted response media type", "err", err)
http.Error(w, "invalid Accept header", http.StatusBadRequest)
return
}
flusher, okFlusher := w.(http.Flusher)
if !okFlusher && !okJson && okNDJson {
// Respond with error if the request only accepts ndjson and the server does not support
// streaming.
http.Error(w, "server does not support streaming response", http.StatusBadRequest)
return
}
if !okJson && !okNDJson {
http.Error(w, "media type not supported", http.StatusBadRequest)
return
func (c *Caskadht) handleRoutingV1ProvidersSubtree(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
c.handleLookup(newDelegatedRoutingLookupResponseWriter(w), r)
case http.MethodPut:
discardBody(r)
http.Error(w, "", http.StatusNotImplemented)
default:
discardBody(r)
http.Error(w, "", http.StatusNotFound)
}
}

ctx, cancel := context.WithCancel(r.Context())
key := cid.NewCidV1(cid.Raw, mh)
pch := c.cascadeFindProviders(ctx, key)
// TODO: Decide response media type based on `q` weighting param in Accept.
// For now always prefer streaming responses.
if !okFlusher && okJson {
// We cannot stream results and the client accepts JSON; respond with non-streaming JSON.
res := MultihashResult{
Multihash: mh,
}
JSON_LOOP:
for {
select {
case <-c.ctx.Done():
break JSON_LOOP
case provider, ok := <-pch:
if !ok {
break JSON_LOOP
}
res.ProviderResults = append(res.ProviderResults, ProviderResult{
ContextID: cascadeContextID,
Metadata: cascadeMetadata,
Provider: provider,
})
}
}
cancel()
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(response{
MultihashResults: []MultihashResult{res},
}); err != nil {
logger.Errorw("failed to write provider results", "count", len(res.ProviderResults), "err", err)
func (c *Caskadht) handleLookup(w lookupResponseWriter, r *http.Request) {
if err := w.Accept(r); err != nil {
switch e := err.(type) {
case errHttpResponse:
e.WriteTo(w)
default:
logger.Errorw("Failed to accept lookup request", "err", err)
http.Error(w, "", http.StatusInternalServerError)
}
return
}

w.Header().Set("Content-Type", mediaTypeNDJson)
w.Header().Set("Connection", "Keep-Alive")
w.Header().Set("X-Content-Type-Options", "nosniff")
encoder := json.NewEncoder(w)
var count int

NDJSON_LOOP:
ctx, cancel := context.WithCancel(r.Context())
pch := c.cascadeFindProviders(ctx, w.Key())
defer cancel()
LOOP:
for {
select {
case <-c.ctx.Done():
break NDJSON_LOOP
logger.Debugw("Interrupted while responding to lookup", "key", w.Key(), "err", ctx.Err())
break LOOP
case provider, ok := <-pch:
if !ok {
break NDJSON_LOOP
}
// TODO: Restructure response once there is a more optimal way to stream results.
// See: https://github.com/ipni/specs/issues/8
if err := encoder.Encode(response{
MultihashResults: []MultihashResult{
{
Multihash: mh,
ProviderResults: []ProviderResult{
{
ContextID: cascadeContextID,
Metadata: cascadeMetadata,
Provider: provider,
},
},
},
},
}); err != nil {
logger.Errorw("Failed to encode ndjson response", "err", err)
break NDJSON_LOOP
logger.Debugw("No more provider records", "key", w.Key())
break LOOP
}
if _, err := w.Write(newline); err != nil {
logger.Errorw("Failed to encode ndjson response", "err", err)
break NDJSON_LOOP
if err := w.WriteProviderRecord(providerRecord{AddrInfo: provider}); err != nil {
logger.Errorw("Failed to encode provider record", "err", err)
break LOOP
}
flusher.Flush()
count++
}
}
cancel()
logger.Debugw("Finished streaming results", "count", count)
if count == 0 {
http.Error(w, "", http.StatusNotFound)
}
}

func acceptsJson(r *http.Request) (ndjson bool, json bool, err error) {
accepts := r.Header.Values("Accept")
var mt string
for _, accept := range accepts {
mt, _, err = mime.ParseMediaType(accept)
if err != nil {
return
}
switch mt {
case mediaTypeNDJson:
ndjson = true
case mediaTypeJson:
json = true
case mediaTypeAny:
ndjson = true
json = true
}
if json && ndjson {
// Return early if both is supported.
return
if err := w.Close(); err != nil {
switch e := err.(type) {
case errHttpResponse:
e.WriteTo(w)
default:
logger.Errorw("Failed to finalize lookup results", "err", err)
http.Error(w, "", http.StatusInternalServerError)
}
}
return
}

func (c *Caskadht) cascadeFindProviders(ctx context.Context, key cid.Cid) <-chan peer.AddrInfo {
Expand Down
16 changes: 16 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cascadht

import "net/http"

type errHttpResponse struct {
message string
status int
}

func (e errHttpResponse) Error() string {
return e.message
}

func (e errHttpResponse) WriteTo(w http.ResponseWriter) {
http.Error(w, e.message, e.status)
}
Loading

0 comments on commit 73bf595

Please sign in to comment.