Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ndjson HTTP delegated routing GET API #5

Merged
merged 1 commit into from
Jan 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 60 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
# :knot: caskadht

`caskadht`, pronounced "Cascade-DHT", is a service that:
* exposes an [IPNI-compatible](https://github.com/ipni/specs/blob/main/IPNI.md#get-multihashmultihash) `GET /multihash/<multihash>` endpoint over HTTP,

* exposes:
* `GET /routing/v1/providers/<cid>` compatible with
IPFS [HTTP delegated routing](https://github.com/ipfs/specs/pull/337), and
* `GET /multihash/<multihash>` compatible
with [IPNI HTTP query API](https://github.com/ipni/specs/blob/main/IPNI.md#get-multihashmultihash)
* cascades lookup requests over the IPFS Kademlia DHT,
* uses the accelerated DHT client when possible, and
* steams the results back over `ndjson` whenever the request `Accept`s it or regular JSON otherwise.
* steams the results back over `ndjson` whenever the request `Accept` header permits it, or
non-streaming JSON otherwise.

## Install

Expand Down Expand Up @@ -38,13 +44,19 @@ To run the `caskadht` HTTP server locally, execute:
```shell
$ go run cmd/caskadht/main.go
```

The above command starts the HTTP API exposed on default listen address: `http://localhost:40080`.
You can then start looking up multihashes, which would cascade onto the DHT. Example:
```shell
$ curl http://localhost:40080/multihash/QmcroxBV9PBPUg2LfeusC25x1C4mckSmQy6hD5rmuvugfj -v
You can then start looking up multihashes, which would cascade onto the DHT.

To shutdown the server, interrupt the terminal by pressing `Ctrl + C`

#### Example IPNI `ndjson` response:

```text
$ curl http://localhost:40080/multihash/QmfQJymEUXsGNzHMmpGYmUcFiAtGw2ia97EXNDVDZbZjgm -v --max-time 1
* Trying 127.0.0.1:40080...
* Connected to localhost (127.0.0.1) port 40080 (#0)
> GET /multihash/QmcroxBV9PBPUg2LfeusC25x1C4mckSmQy6hD5rmuvugfj HTTP/1.1
> GET /multihash/QmfQJymEUXsGNzHMmpGYmUcFiAtGw2ia97EXNDVDZbZjgm HTTP/1.1
> Host: localhost:40080
> User-Agent: curl/7.86.0
> Accept: */*
Expand All @@ -54,14 +66,53 @@ $ curl http://localhost:40080/multihash/QmcroxBV9PBPUg2LfeusC25x1C4mckSmQy6hD5rm
< Connection: Keep-Alive
< Content-Type: application/x-ndjson
< X-Content-Type-Options: nosniff
< Date: Thu, 26 Jan 2023 12:17:02 GMT
< Date: Sat, 28 Jan 2023 18:06:44 GMT
< Transfer-Encoding: chunked
<
{"MultihashResults":[{"Multihash":"EiDXvX3xT1nGu2ZNdAM0rjm9g+/GAyirmnV5MfAVHoPLsg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWFRY4zb9Yvh7Pm5itdpVogtw2XDs68VgKwF1SkEy7eiEC","Addrs":[]}}]}]}
{"MultihashResults":[{"Multihash":"EiD9hrEwE3KHRG4OfXY7il3OXAFiW/oAuFBR5Rv0E9CFCg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWHVXoJnv2ifmr9K6LWwJPXxkfvzZRHzjiTZMvybeTnwPy","Addrs":["/ip4/145.40.89.101/tcp/4001","/ip4/145.40.89.101/tcp/4002/ws","/ip4/145.40.89.101/udp/4001/quic","/ip6/2604:1380:45f1:d800::1/tcp/4001","/ip6/2604:1380:45f1:d800::1/tcp/4002/ws","/ip6/2604:1380:45f1:d800::1/udp/4001/quic"]}}]}]}

{"MultihashResults":[{"Multihash":"EiD9hrEwE3KHRG4OfXY7il3OXAFiW/oAuFBR5Rv0E9CFCg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWDpp7U7W9Q8feMZPPEpPP5FKXTUakLgnVLbavfjb9mzrT","Addrs":["/ip4/147.75.80.75/tcp/4001","/ip4/147.75.80.75/tcp/4002/ws","/ip4/147.75.80.75/udp/4001/quic","/ip6/2604:1380:4601:f600::5/tcp/4001","/ip6/2604:1380:4601:f600::5/tcp/4002/ws","/ip6/2604:1380:4601:f600::5/udp/4001/quic"]}}]}]}

{"MultihashResults":[{"Multihash":"EiD9hrEwE3KHRG4OfXY7il3OXAFiW/oAuFBR5Rv0E9CFCg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWCrBiagtZMzpZePCr1tfBbrZTh4BRQf7JurRqNMRi8YHF","Addrs":["/ip4/147.75.87.65/tcp/4001","/ip4/147.75.87.65/tcp/4002/ws","/ip4/147.75.87.65/udp/4001/quic","/ip6/2604:1380:4601:f600::1/tcp/4001","/ip6/2604:1380:4601:f600::1/tcp/4002/ws","/ip6/2604:1380:4601:f600::1/udp/4001/quic"]}}]}]}

{"MultihashResults":[{"Multihash":"EiD9hrEwE3KHRG4OfXY7il3OXAFiW/oAuFBR5Rv0E9CFCg==","ProviderResults":[{"ContextID":"aXBmcy1kaHQtY2FzY2FkZQ==","Metadata":"gBI=","Provider":{"ID":"12D3KooWRNijznEQoXrxBeNLb2TqbSFm8gG8jKtfEsbC1C9nPqce","Addrs":["/ip4/147.75.87.211/tcp/4001","/ip4/147.75.87.211/tcp/4002/ws","/ip4/147.75.87.211/udp/4001/quic","/ip6/2604:1380:4601:f600::3/tcp/4001","/ip6/2604:1380:4601:f600::3/tcp/4002/ws","/ip6/2604:1380:4601:f600::3/udp/4001/quic"]}}]}]}

* Operation timed out after 1005 milliseconds with 1890 bytes received
* Closing connection 0
curl: (28) Operation timed out after 1005 milliseconds with 1890 bytes received
```

To shutdown the server, interrupt the terminal by pressing `Ctrl + C`
#### Example IPFS Delegated Routing `ndjson` Response

```text
$ curl http://localhost:40080/routing/v1/providers/QmfQJymEUXsGNzHMmpGYmUcFiAtGw2ia97EXNDVDZbZjgm -v --max-time 1
* Trying 127.0.0.1:40080...
* Connected to localhost (127.0.0.1) port 40080 (#0)
> GET /routing/v1/providers/QmfQJymEUXsGNzHMmpGYmUcFiAtGw2ia97EXNDVDZbZjgm HTTP/1.1
> Host: localhost:40080
> User-Agent: curl/7.86.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Connection: Keep-Alive
< Content-Type: application/x-ndjson
< X-Content-Type-Options: nosniff
< Date: Sat, 28 Jan 2023 18:07:40 GMT
< Transfer-Encoding: chunked
<
{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWHVXoJnv2ifmr9K6LWwJPXxkfvzZRHzjiTZMvybeTnwPy","Addrs":["/ip4/145.40.89.101/tcp/4001","/ip4/145.40.89.101/tcp/4002/ws","/ip4/145.40.89.101/udp/4001/quic","/ip6/2604:1380:45f1:d800::1/tcp/4001","/ip6/2604:1380:45f1:d800::1/tcp/4002/ws","/ip6/2604:1380:45f1:d800::1/udp/4001/quic"]}

{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWDpp7U7W9Q8feMZPPEpPP5FKXTUakLgnVLbavfjb9mzrT","Addrs":["/ip4/147.75.80.75/tcp/4001","/ip4/147.75.80.75/tcp/4002/ws","/ip4/147.75.80.75/udp/4001/quic","/ip6/2604:1380:4601:f600::5/tcp/4001","/ip6/2604:1380:4601:f600::5/tcp/4002/ws","/ip6/2604:1380:4601:f600::5/udp/4001/quic"]}

{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWCrBiagtZMzpZePCr1tfBbrZTh4BRQf7JurRqNMRi8YHF","Addrs":["/ip4/147.75.87.65/tcp/4001","/ip4/147.75.87.65/tcp/4002/ws","/ip4/147.75.87.65/udp/4001/quic","/ip6/2604:1380:4601:f600::1/tcp/4001","/ip6/2604:1380:4601:f600::1/tcp/4002/ws","/ip6/2604:1380:4601:f600::1/udp/4001/quic"]}

{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWRNijznEQoXrxBeNLb2TqbSFm8gG8jKtfEsbC1C9nPqce","Addrs":["/ip4/147.75.87.211/tcp/4001","/ip4/147.75.87.211/tcp/4002/ws","/ip4/147.75.87.211/udp/4001/quic","/ip6/2604:1380:4601:f600::3/tcp/4001","/ip6/2604:1380:4601:f600::3/tcp/4002/ws","/ip6/2604:1380:4601:f600::3/udp/4001/quic"]}

* Operation timed out after 1001 milliseconds with 1378 bytes received
* Closing connection 0
curl: (28) Operation timed out after 1001 milliseconds with 1378 bytes received
```

## License

Expand Down
206 changes: 49 additions & 157 deletions caskadht.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@ package cascadht

import (
"context"
"encoding/json"
"io"
"mime"
"net"
"net/http"
"path"
"strings"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipns"
Expand All @@ -19,7 +15,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-varint"
)

Expand All @@ -31,39 +26,18 @@ var (
cascadeMetadata = varint.ToUvarint(uint64(multicodec.TransportBitswap))
)

type (
Caskadht struct {
*options
std *dht.IpfsDHT
acc *fullrt.FullRT
s *http.Server
type Caskadht struct {
*options
std *dht.IpfsDHT
acc *fullrt.FullRT
s *http.Server

// Context and cancellation used to terminate streaming responses on shutdown.
ctx context.Context
cancel context.CancelFunc
}

response struct {
MultihashResults []MultihashResult
}
MultihashResult struct {
Multihash multihash.Multihash
ProviderResults []ProviderResult
}
ProviderResult struct {
ContextID []byte
Metadata []byte
Provider peer.AddrInfo
}
)

const (
ipfsProtocolPrefix = "/ipfs"
// Context and cancellation used to terminate streaming responses on shutdown.
ctx context.Context
cancel context.CancelFunc
}

mediaTypeNDJson = "application/x-ndjson"
mediaTypeJson = "application/json"
mediaTypeAny = "*/*"
)
const ipfsProtocolPrefix = "/ipfs"

func New(o ...Option) (*Caskadht, error) {
opts, err := newOptions(o...)
Expand Down Expand Up @@ -118,6 +92,7 @@ func (c *Caskadht) Start(ctx context.Context) error {
func (c *Caskadht) serveMux() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/multihash/", c.handleMhSubtree)
mux.HandleFunc("/routing/v1/providers/", c.handleRoutingV1ProvidersSubtree)
mux.HandleFunc("/ready", c.handleReady)
mux.HandleFunc("/", c.handleCatchAll)
return mux
Expand All @@ -126,150 +101,67 @@ func (c *Caskadht) serveMux() *http.ServeMux {
func (c *Caskadht) handleMhSubtree(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
c.handleGetMh(w, r)
c.handleLookup(newIPNILookupResponseWriter(w), r)
default:
discardBody(r)
http.Error(w, "", http.StatusNotFound)
}
}

func (c *Caskadht) handleGetMh(w http.ResponseWriter, r *http.Request) {
discardBody(r)

smh := strings.TrimPrefix(path.Base(r.URL.Path), "multihash/")
logger := logger.With("mh", smh)
mh, err := multihash.FromB58String(smh)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

okNDJson, okJson, err := acceptsJson(r)
if err != nil {
logger.Debugw("Failed to check accepted response media type", "err", err)
http.Error(w, "invalid Accept header", http.StatusBadRequest)
return
}
flusher, okFlusher := w.(http.Flusher)
if !okFlusher && !okJson && okNDJson {
// Respond with error if the request only accepts ndjson and the server does not support
// streaming.
http.Error(w, "server does not support streaming response", http.StatusBadRequest)
return
}
if !okJson && !okNDJson {
http.Error(w, "media type not supported", http.StatusBadRequest)
return
func (c *Caskadht) handleRoutingV1ProvidersSubtree(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
c.handleLookup(newDelegatedRoutingLookupResponseWriter(w), r)
case http.MethodPut:
discardBody(r)
http.Error(w, "", http.StatusNotImplemented)
default:
discardBody(r)
http.Error(w, "", http.StatusNotFound)
}
}

ctx, cancel := context.WithCancel(r.Context())
key := cid.NewCidV1(cid.Raw, mh)
pch := c.cascadeFindProviders(ctx, key)
// TODO: Decide response media type based on `q` weighting param in Accept.
// For now always prefer streaming responses.
if !okFlusher && okJson {
// We cannot stream results and the client accepts JSON; respond with non-streaming JSON.
res := MultihashResult{
Multihash: mh,
}
JSON_LOOP:
for {
select {
case <-c.ctx.Done():
break JSON_LOOP
case provider, ok := <-pch:
if !ok {
break JSON_LOOP
}
res.ProviderResults = append(res.ProviderResults, ProviderResult{
ContextID: cascadeContextID,
Metadata: cascadeMetadata,
Provider: provider,
})
}
}
cancel()
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(response{
MultihashResults: []MultihashResult{res},
}); err != nil {
logger.Errorw("failed to write provider results", "count", len(res.ProviderResults), "err", err)
func (c *Caskadht) handleLookup(w lookupResponseWriter, r *http.Request) {
if err := w.Accept(r); err != nil {
switch e := err.(type) {
case errHttpResponse:
e.WriteTo(w)
default:
logger.Errorw("Failed to accept lookup request", "err", err)
http.Error(w, "", http.StatusInternalServerError)
}
return
}

w.Header().Set("Content-Type", mediaTypeNDJson)
w.Header().Set("Connection", "Keep-Alive")
w.Header().Set("X-Content-Type-Options", "nosniff")
encoder := json.NewEncoder(w)
var count int

NDJSON_LOOP:
ctx, cancel := context.WithCancel(r.Context())
pch := c.cascadeFindProviders(ctx, w.Key())
defer cancel()
LOOP:
for {
select {
case <-c.ctx.Done():
break NDJSON_LOOP
logger.Debugw("Interrupted while responding to lookup", "key", w.Key(), "err", ctx.Err())
break LOOP
case provider, ok := <-pch:
if !ok {
break NDJSON_LOOP
}
// TODO: Restructure response once there is a more optimal way to stream results.
// See: https://github.com/ipni/specs/issues/8
if err := encoder.Encode(response{
MultihashResults: []MultihashResult{
{
Multihash: mh,
ProviderResults: []ProviderResult{
{
ContextID: cascadeContextID,
Metadata: cascadeMetadata,
Provider: provider,
},
},
},
},
}); err != nil {
logger.Errorw("Failed to encode ndjson response", "err", err)
break NDJSON_LOOP
logger.Debugw("No more provider records", "key", w.Key())
break LOOP
}
if _, err := w.Write(newline); err != nil {
logger.Errorw("Failed to encode ndjson response", "err", err)
break NDJSON_LOOP
if err := w.WriteProviderRecord(providerRecord{AddrInfo: provider}); err != nil {
logger.Errorw("Failed to encode provider record", "err", err)
break LOOP
}
flusher.Flush()
count++
}
}
cancel()
logger.Debugw("Finished streaming results", "count", count)
if count == 0 {
http.Error(w, "", http.StatusNotFound)
}
}

func acceptsJson(r *http.Request) (ndjson bool, json bool, err error) {
accepts := r.Header.Values("Accept")
var mt string
for _, accept := range accepts {
mt, _, err = mime.ParseMediaType(accept)
if err != nil {
return
}
switch mt {
case mediaTypeNDJson:
ndjson = true
case mediaTypeJson:
json = true
case mediaTypeAny:
ndjson = true
json = true
}
if json && ndjson {
// Return early if both is supported.
return
if err := w.Close(); err != nil {
switch e := err.(type) {
case errHttpResponse:
e.WriteTo(w)
default:
logger.Errorw("Failed to finalize lookup results", "err", err)
http.Error(w, "", http.StatusInternalServerError)
}
}
return
}

func (c *Caskadht) cascadeFindProviders(ctx context.Context, key cid.Cid) <-chan peer.AddrInfo {
Expand Down
16 changes: 16 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cascadht

import "net/http"

type errHttpResponse struct {
message string
status int
}

func (e errHttpResponse) Error() string {
return e.message
}

func (e errHttpResponse) WriteTo(w http.ResponseWriter) {
http.Error(w, e.message, e.status)
}
Loading