Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial prototype of http over libp2p #2218

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/rand"
"errors"
"fmt"
"net/http"
"time"

"github.com/libp2p/go-libp2p/core/connmgr"
Expand Down Expand Up @@ -123,6 +124,8 @@ type Config struct {

DisableMetrics bool
PrometheusRegisterer prometheus.Registerer

HTTPHandler http.Handler
}

func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
Expand Down Expand Up @@ -306,6 +309,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
RelayServiceOpts: cfg.RelayServiceOpts,
EnableMetrics: !cfg.DisableMetrics,
PrometheusRegisterer: cfg.PrometheusRegisterer,
HTTPHandler: cfg.HTTPHandler,
})
if err != nil {
swrm.Close()
Expand Down
8 changes: 8 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"net/http"
"reflect"
"time"

Expand Down Expand Up @@ -574,3 +575,10 @@ func PrometheusRegisterer(reg prometheus.Registerer) Option {
return nil
}
}

func WithHTTPHandler(h http.Handler) Option {
return func(cfg *Config) error {
cfg.HTTPHandler = h
return nil
}
}
18 changes: 18 additions & 0 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net"
"net/http"
"sync"
"time"

Expand All @@ -22,6 +23,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/pstoremanager"
"github.com/libp2p/go-libp2p/p2p/host/relaysvc"
p2phttp "github.com/libp2p/go-libp2p/p2p/http"
inat "github.com/libp2p/go-libp2p/p2p/net/nat"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
Expand Down Expand Up @@ -100,6 +102,8 @@ type BasicHost struct {
caBook peerstore.CertifiedAddrBook

autoNat autonat.AutoNAT

httpHandler http.Handler
}

var _ host.Host = (*BasicHost)(nil)
Expand Down Expand Up @@ -159,6 +163,8 @@ type HostOpts struct {
EnableMetrics bool
// PrometheusRegisterer is the PrometheusRegisterer used for metrics
PrometheusRegisterer prometheus.Registerer

HTTPHandler http.Handler
}

// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network.
Expand Down Expand Up @@ -188,6 +194,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
ctx: hostCtx,
ctxCancel: cancel,
disableSignedPeerRecord: opts.DisableSignedPeerRecord,
httpHandler: opts.HTTPHandler,
}

h.updateLocalIpAddr()
Expand Down Expand Up @@ -382,9 +389,20 @@ func (h *BasicHost) Start() {
h.psManager.Start()
h.refCount.Add(1)
h.ids.Start()
if h.httpHandler != nil {
h.registerHTTPHandler()
}
go h.background()
}

func (h *BasicHost) registerHTTPHandler() {
const libp2pHttpProtoID = "/libp2p-http"
h.SetStreamHandler(libp2pHttpProtoID, func(s network.Stream) {
p2phttp.ServeReadWriter(s, h.httpHandler)
s.Close()
})
}

// newStreamHandler is the remote-opened stream handler for network.Network
// TODO: this feels a bit wonky
func (h *BasicHost) newStreamHandler(s network.Stream) {
Expand Down
146 changes: 146 additions & 0 deletions p2p/http/responsewriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package p2phttp

import (
"bufio"
"fmt"
"io"
"net/http"
"strconv"
"sync"

logging "github.com/ipfs/go-log/v2"
)

var bufWriterPool = sync.Pool{
New: func() any {
return bufio.NewWriterSize(nil, 4<<10)
},
}

var bufReaderPool = sync.Pool{
New: func() any {
return bufio.NewReaderSize(nil, 4<<10)
},
}
Comment on lines +14 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably doesn't matter (and definitely not for a prototype): What about creating a new reader / writer, but taking the underlying the byte slice from our shared buffer pool? That's an additional alloc for the bufio.{Reader/Writer}, but we get better byte slice reuse.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We get to reuse the whole thing here which is probably net better for the GC.


var log = logging.Logger("p2phttp")

var _ http.ResponseWriter = (*httpResponseWriter)(nil)

type httpResponseWriter struct {
w *bufio.Writer
directWriter io.Writer
header http.Header
wroteHeader bool
inferredContentLength bool
}

// Header implements http.ResponseWriter
func (w *httpResponseWriter) Header() http.Header {
return w.header
}

// Write implements http.ResponseWriter
func (w *httpResponseWriter) Write(b []byte) (int, error) {
if !w.wroteHeader {
if w.header.Get("Content-Type") == "" {
contentType := http.DetectContentType(b)
w.header.Set("Content-Type", contentType)
}

if w.w.Available() > len(b) {
return w.w.Write(b)
}
}

// Ran out of buffered space, We should check if we need to write the headers.
if !w.wroteHeader {
// Be nice for small things
if w.header.Get("Content-Length") == "" {
w.inferredContentLength = true
w.header.Set("Content-Length", strconv.Itoa(len(b)+w.w.Buffered()))
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
}

// If WriteHeader has not yet been called, Write calls
// WriteHeader(http.StatusOK) before writing the data.
w.WriteHeader(http.StatusOK)
}

if w.inferredContentLength {
log.Error("Tried to infer content length, but another write happened, so content length is wrong and headers are already written. This response may fail to parse by clients")
}

return w.w.Write(b)
}

func (w *httpResponseWriter) flush() {
if !w.wroteHeader {
// Be nice for small things
if w.header.Get("Content-Length") == "" {
w.inferredContentLength = true
w.header.Set("Content-Length", strconv.Itoa(w.w.Buffered()))
}

// If WriteHeader has not yet been called, Write calls
// WriteHeader(http.StatusOK) before writing the data.
w.WriteHeader(http.StatusOK)
w.w.Flush()
}
}

// WriteHeader implements http.ResponseWriter
func (w *httpResponseWriter) WriteHeader(statusCode int) {
if w.wroteHeader {
log.Errorf("multiple WriteHeader calls dropping %d", statusCode)
return
}
w.wroteHeader = true
w.writeStatusLine(statusCode)
w.header.Write(w.directWriter)
w.directWriter.Write([]byte("\r\n"))
}

// Copied from Go stdlib https://cs.opensource.google/go/go/+/refs/tags/go1.20.2:src/net/http/server.go;drc=ea4631cc0cf301c824bd665a7980c13289ab5c9d;l=1533
func (w *httpResponseWriter) writeStatusLine(code int) {
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
// Stack allocated
scratch := [4]byte{}
// Always HTTP/1.1
w.directWriter.Write([]byte("HTTP/1.1 "))

if text := http.StatusText(code); text != "" {
w.directWriter.Write(strconv.AppendInt(scratch[:0], int64(code), 10))
w.directWriter.Write([]byte(" "))
w.directWriter.Write([]byte(text))
w.directWriter.Write([]byte("\r\n"))
} else {
// don't worry about performance
fmt.Fprintf(w.directWriter, "%03d status code %d\r\n", code, code)
}
}

func ServeReadWriter(rw io.ReadWriter, handler http.Handler) {
r := bufReaderPool.Get().(*bufio.Reader)
r.Reset(rw)
defer bufReaderPool.Put(r)

buffedWriter := bufWriterPool.Get().(*bufio.Writer)
buffedWriter.Reset(rw)
defer bufWriterPool.Put(buffedWriter)
w := httpResponseWriter{
w: buffedWriter,
directWriter: rw,
header: make(http.Header),
}
defer w.w.Flush()

req, err := http.ReadRequest(r)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.w.Flush()
log.Errorf("error reading request: %s", err)
return
}

handler.ServeHTTP(&w, req)
w.flush()
}
63 changes: 63 additions & 0 deletions p2p/http/responsewriter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package p2phttp

import (
"bufio"
"bytes"
"fmt"
"io"
"net/http"
"testing"

"github.com/stretchr/testify/require"
)

func TestResponseLooksCorrect(t *testing.T) {
req, err := http.NewRequest("GET", "http://localhost/", bytes.NewReader([]byte("")))
require.NoError(t, err)
reqBuf := bytes.Buffer{}
req.Write(&reqBuf)

resp := bytes.Buffer{}
respWriter := bufio.NewWriter(&resp)
s := bufio.NewReadWriter(bufio.NewReader(&reqBuf), respWriter)

ServeReadWriter(s, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello world"))
}))

respWriter.Flush()
fmt.Println("Resp is", resp.String())
parsedResponse, err := http.ReadResponse(bufio.NewReader(&resp), nil)
require.NoError(t, err)
fmt.Println("Parsed response is", parsedResponse)
}

func TestMultipleWritesButSmallResponseLooksCorrect(t *testing.T) {
req, err := http.NewRequest("GET", "http://localhost/", bytes.NewReader([]byte("")))
require.NoError(t, err)
reqBuf := bytes.Buffer{}
req.Write(&reqBuf)

resp := bytes.Buffer{}
respWriter := bufio.NewWriter(&resp)
s := bufio.NewReadWriter(bufio.NewReader(&reqBuf), respWriter)

ServeReadWriter(s, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello world 1 "))
w.Write([]byte("2 "))
w.Write([]byte("3 "))
w.Write([]byte("4 "))
w.Write([]byte("5 "))
w.Write([]byte("6 "))
}))

respWriter.Flush()
fmt.Println("Resp is", resp.String())
parsedResponse, err := http.ReadResponse(bufio.NewReader(&resp), nil)
require.NoError(t, err)
respBody, err := io.ReadAll(parsedResponse.Body)
require.NoError(t, err)
require.Equal(t, "Hello world 1 2 3 4 5 6 ", string(respBody))
require.Equal(t, len("Hello world 1 2 3 4 5 6 "), int(parsedResponse.ContentLength))
fmt.Println("Parsed response is", parsedResponse)
}