From 83d06db83715ad0c30d6898657c46133306627c4 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 24 Mar 2023 18:06:33 -0700 Subject: [PATCH 1/2] Add initial prototype of http over libp2p --- config/config.go | 4 + options.go | 8 ++ p2p/host/basic/basic_host.go | 18 ++++ p2p/http/responsewriter.go | 146 ++++++++++++++++++++++++++++++++ p2p/http/responsewriter_test.go | 32 +++++++ 5 files changed, 208 insertions(+) create mode 100644 p2p/http/responsewriter.go create mode 100644 p2p/http/responsewriter_test.go diff --git a/config/config.go b/config/config.go index 70686f7634..8f15e8d035 100644 --- a/config/config.go +++ b/config/config.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "errors" "fmt" + "net/http" "time" "github.com/libp2p/go-libp2p/core/connmgr" @@ -123,6 +124,8 @@ type Config struct { DisableMetrics bool PrometheusRegisterer prometheus.Registerer + + HTTPHandler http.Handler } func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) { @@ -306,6 +309,7 @@ func (cfg *Config) NewNode() (host.Host, error) { RelayServiceOpts: cfg.RelayServiceOpts, EnableMetrics: !cfg.DisableMetrics, PrometheusRegisterer: cfg.PrometheusRegisterer, + HTTPHandler: cfg.HTTPHandler, }) if err != nil { swrm.Close() diff --git a/options.go b/options.go index 1809ec44ba..769d4f028d 100644 --- a/options.go +++ b/options.go @@ -8,6 +8,7 @@ import ( "encoding/binary" "errors" "fmt" + "net/http" "reflect" "time" @@ -574,3 +575,10 @@ func PrometheusRegisterer(reg prometheus.Registerer) Option { return nil } } + +func WithHTTPHandler(h http.Handler) Option { + return func(cfg *Config) error { + cfg.HTTPHandler = h + return nil + } +} diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index bee068fd9c..42d0cacec0 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net" + "net/http" "sync" "time" @@ -22,6 +23,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/pstoremanager" "github.com/libp2p/go-libp2p/p2p/host/relaysvc" + p2phttp "github.com/libp2p/go-libp2p/p2p/http" inat "github.com/libp2p/go-libp2p/p2p/net/nat" relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" @@ -100,6 +102,8 @@ type BasicHost struct { caBook peerstore.CertifiedAddrBook autoNat autonat.AutoNAT + + httpHandler http.Handler } var _ host.Host = (*BasicHost)(nil) @@ -159,6 +163,8 @@ type HostOpts struct { EnableMetrics bool // PrometheusRegisterer is the PrometheusRegisterer used for metrics PrometheusRegisterer prometheus.Registerer + + HTTPHandler http.Handler } // NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network. @@ -188,6 +194,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { ctx: hostCtx, ctxCancel: cancel, disableSignedPeerRecord: opts.DisableSignedPeerRecord, + httpHandler: opts.HTTPHandler, } h.updateLocalIpAddr() @@ -382,9 +389,20 @@ func (h *BasicHost) Start() { h.psManager.Start() h.refCount.Add(1) h.ids.Start() + if h.httpHandler != nil { + h.registerHTTPHandler() + } go h.background() } +func (h *BasicHost) registerHTTPHandler() { + const libp2pHttpProtoID = "/libp2p-http" + h.SetStreamHandler(libp2pHttpProtoID, func(s network.Stream) { + p2phttp.ServeReadWriter(s, h.httpHandler) + s.Close() + }) +} + // newStreamHandler is the remote-opened stream handler for network.Network // TODO: this feels a bit wonky func (h *BasicHost) newStreamHandler(s network.Stream) { diff --git a/p2p/http/responsewriter.go b/p2p/http/responsewriter.go new file mode 100644 index 0000000000..c57bfb630c --- /dev/null +++ b/p2p/http/responsewriter.go @@ -0,0 +1,146 @@ +package p2phttp + +import ( + "bufio" + "fmt" + "io" + "net/http" + "strconv" + "sync" + + logging "github.com/ipfs/go-log/v2" +) + +var bufWriterPool = sync.Pool{ + New: func() any { + return bufio.NewWriterSize(nil, 4<<10) + }, +} + +var bufReaderPool = sync.Pool{ + New: func() any { + return bufio.NewReaderSize(nil, 4<<10) + }, +} + +var log = logging.Logger("p2phttp") + +var _ http.ResponseWriter = (*httpResponseWriter)(nil) + +type httpResponseWriter struct { + w *bufio.Writer + directWriter io.Writer + header http.Header + wroteHeader bool + inferredContentLength bool +} + +// Header implements http.ResponseWriter +func (w *httpResponseWriter) Header() http.Header { + return w.header +} + +// Write implements http.ResponseWriter +func (w *httpResponseWriter) Write(b []byte) (int, error) { + if !w.wroteHeader { + if w.header.Get("Content-Type") == "" { + contentType := http.DetectContentType(b) + w.header.Set("Content-Type", contentType) + } + + if w.w.Available() > len(b) { + return w.w.Write(b) + } + } + + // Ran out of buffered space, We should check if we need to write the headers. + if !w.wroteHeader { + // Be nice for small things + if w.header.Get("Content-Length") == "" { + w.inferredContentLength = true + w.header.Set("Content-Length", strconv.Itoa(len(b)+w.w.Buffered())) + } + + // If WriteHeader has not yet been called, Write calls + // WriteHeader(http.StatusOK) before writing the data. + w.WriteHeader(http.StatusOK) + } + + if w.inferredContentLength { + log.Error("Tried to infer content length, but another write happened, so content length is wrong and headers are already written. This response may fail to parse by clients") + } + + return w.w.Write(b) +} + +func (w *httpResponseWriter) flush() { + if !w.wroteHeader { + // Be nice for small things + if w.header.Get("Content-Length") == "" { + w.inferredContentLength = true + w.header.Set("Content-Length", strconv.Itoa(w.w.Buffered())) + } + + // If WriteHeader has not yet been called, Write calls + // WriteHeader(http.StatusOK) before writing the data. + w.WriteHeader(http.StatusOK) + w.w.Flush() + } +} + +// WriteHeader implements http.ResponseWriter +func (w *httpResponseWriter) WriteHeader(statusCode int) { + if w.wroteHeader { + log.Errorf("multiple WriteHeader calls dropping %d", statusCode) + return + } + w.wroteHeader = true + w.writeStatusLine(statusCode) + w.header.Write(w.directWriter) + w.directWriter.Write([]byte("\r\n")) +} + +// Copied from Go stdlib https://cs.opensource.google/go/go/+/refs/tags/go1.20.2:src/net/http/server.go;drc=ea4631cc0cf301c824bd665a7980c13289ab5c9d;l=1533 +func (w *httpResponseWriter) writeStatusLine(code int) { + // Stack allocated + scratch := [4]byte{} + // Always HTTP/1.1 + w.directWriter.Write([]byte("HTTP/1.1 ")) + + if text := http.StatusText(code); text != "" { + w.directWriter.Write(strconv.AppendInt(scratch[:0], int64(code), 10)) + w.directWriter.Write([]byte(" ")) + w.directWriter.Write([]byte(text)) + w.directWriter.Write([]byte("\r\n")) + } else { + // don't worry about performance + fmt.Fprintf(w.directWriter, "%03d status code %d\r\n", code, code) + } +} + +func ServeReadWriter(rw io.ReadWriter, handler http.Handler) { + r := bufReaderPool.Get().(*bufio.Reader) + r.Reset(rw) + defer bufReaderPool.Put(r) + + buffedWriter := bufWriterPool.Get().(*bufio.Writer) + buffedWriter.Reset(rw) + defer bufWriterPool.Put(buffedWriter) + w := httpResponseWriter{ + w: buffedWriter, + directWriter: rw, + header: make(http.Header), + } + defer w.w.Flush() + + req, err := http.ReadRequest(r) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.w.Flush() + log.Errorf("error reading request: %s", err) + return + } + + handler.ServeHTTP(&w, req) + w.flush() +} diff --git a/p2p/http/responsewriter_test.go b/p2p/http/responsewriter_test.go new file mode 100644 index 0000000000..b30a318082 --- /dev/null +++ b/p2p/http/responsewriter_test.go @@ -0,0 +1,32 @@ +package p2phttp + +import ( + "bufio" + "bytes" + "fmt" + "net/http" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestResponseLooksCorrect(t *testing.T) { + req, err := http.NewRequest("GET", "http://localhost/", bytes.NewReader([]byte(""))) + require.NoError(t, err) + reqBuf := bytes.Buffer{} + req.Write(&reqBuf) + + resp := bytes.Buffer{} + respWriter := bufio.NewWriter(&resp) + s := bufio.NewReadWriter(bufio.NewReader(&reqBuf), respWriter) + + ServeReadWriter(s, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("Hello world")) + })) + + respWriter.Flush() + fmt.Println("Resp is", resp.String()) + parsedResponse, err := http.ReadResponse(bufio.NewReader(&resp), nil) + require.NoError(t, err) + fmt.Println("Parsed response is", parsedResponse) +} From c029fca236cf3cfd9e8fa2f6fac8f914d55c9398 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 24 Mar 2023 21:52:01 -0700 Subject: [PATCH 2/2] Add test for multiple writes --- p2p/http/responsewriter_test.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/p2p/http/responsewriter_test.go b/p2p/http/responsewriter_test.go index b30a318082..e64e158012 100644 --- a/p2p/http/responsewriter_test.go +++ b/p2p/http/responsewriter_test.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "fmt" + "io" "net/http" "testing" @@ -30,3 +31,33 @@ func TestResponseLooksCorrect(t *testing.T) { require.NoError(t, err) fmt.Println("Parsed response is", parsedResponse) } + +func TestMultipleWritesButSmallResponseLooksCorrect(t *testing.T) { + req, err := http.NewRequest("GET", "http://localhost/", bytes.NewReader([]byte(""))) + require.NoError(t, err) + reqBuf := bytes.Buffer{} + req.Write(&reqBuf) + + resp := bytes.Buffer{} + respWriter := bufio.NewWriter(&resp) + s := bufio.NewReadWriter(bufio.NewReader(&reqBuf), respWriter) + + ServeReadWriter(s, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("Hello world 1 ")) + w.Write([]byte("2 ")) + w.Write([]byte("3 ")) + w.Write([]byte("4 ")) + w.Write([]byte("5 ")) + w.Write([]byte("6 ")) + })) + + respWriter.Flush() + fmt.Println("Resp is", resp.String()) + parsedResponse, err := http.ReadResponse(bufio.NewReader(&resp), nil) + require.NoError(t, err) + respBody, err := io.ReadAll(parsedResponse.Body) + require.NoError(t, err) + require.Equal(t, "Hello world 1 2 3 4 5 6 ", string(respBody)) + require.Equal(t, len("Hello world 1 2 3 4 5 6 "), int(parsedResponse.ContentLength)) + fmt.Println("Parsed response is", parsedResponse) +}