diff --git a/cmd/main.go b/cmd/main.go index b6c0665b..a0d5fa6d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -290,7 +290,7 @@ Stacks may include: initRepoPath = initCmd.Flag("repo-dir", "Specify a custom repository path").Short('r').String() initIpfsServerMode = initCmd.Flag("server", "Apply IPFS server profile").Bool() initIpfsSwarmPorts = initCmd.Flag("swarm-ports", "Set the swarm ports (TCP,WS). A random TCP port is chosen by default").String() - initLogFiles = initCmd.Flag("log-files", "If true, writes logs to rolling files, if false, writes logs to stdout").Default("true").Bool() + initLogFiles = initCmd.Flag("log-files", "If true, writes logs to rolling files, if false, writes logs to stdout").Bool() initApiBindAddr = initCmd.Flag("api-bind-addr", "Set the local API address").Default("127.0.0.1:40600").String() initCafeApiBindAddr = initCmd.Flag("cafe-bind-addr", "Set the cafe REST API address").Default("0.0.0.0:40601").String() initGatewayBindAddr = initCmd.Flag("gateway-bind-addr", "Set the IPFS gateway address").Default("127.0.0.1:5050").String() diff --git a/common/version.go b/common/version.go index 21e2f1e6..b952cc18 100644 --- a/common/version.go +++ b/common/version.go @@ -4,4 +4,4 @@ package common var GitCommit, GitBranch, GitState, GitSummary, BuildDate string // Version is the current application's version literal -const Version = "0.2.3" +const Version = "0.2.4" diff --git a/core/cafe_api_v0.go b/core/cafe_api_v0.go index 5c02ac75..9fe6aa47 100644 --- a/core/cafe_api_v0.go +++ b/core/cafe_api_v0.go @@ -165,7 +165,7 @@ func (c *cafeApi) service(g *gin.Context) { log.Debugf("responding with %s to %s", rpmes.Message.Type.String(), mPeer.Pretty()) g.JSON(http.StatusOK, rpmes) - g.Writer.Write([]byte("\n")) + _, _ = g.Writer.Write([]byte("\n")) } return true }) diff --git a/core/cafe_inbox.go b/core/cafe_inbox.go index b54249e3..e9d7ba74 100644 --- a/core/cafe_inbox.go +++ b/core/cafe_inbox.go @@ -1,6 +1,7 @@ package core import ( + "fmt" "sync" "github.com/golang/protobuf/proto" @@ -141,31 +142,31 @@ func (q *CafeInbox) batch(msgs []pb.CafeMessage) error { func (q *CafeInbox) handle(msg pb.CafeMessage) error { pid, err := peer.IDB58Decode(msg.Peer) if err != nil { - return q.handleErr(err, msg) + return q.handleErr(fmt.Errorf("error decoding msg peer: %s", err), msg) } // download the actual message ciphertext, err := ipfs.DataAtPath(q.node(), msg.Id) if err != nil { - return q.handleErr(err, msg) + return q.handleErr(fmt.Errorf("error getting msg data: %s", err), msg) } envb, err := crypto.Decrypt(q.node().PrivateKey, ciphertext) if err != nil { - return q.handleErr(err, msg) + return q.handleErr(fmt.Errorf("error decrypting msg: %s", err), msg) } env := new(pb.Envelope) if err := proto.Unmarshal(envb, env); err != nil { - return q.handleErr(err, msg) + return q.handleErr(fmt.Errorf("error unmarshaling env: %s", err), msg) } if err := q.threadsService().service.VerifyEnvelope(env, pid); err != nil { - return q.handleErr(err, msg) + return q.handleErr(fmt.Errorf("error verifying env: %s", err), msg) } // pass to thread service for normal handling if _, err := q.threadsService().Handle(pid, env); err != nil { - return q.handleErr(err, msg) + return q.handleErr(fmt.Errorf("error handling msg: %s", err), msg) } return nil } diff --git a/core/cafe_service.go b/core/cafe_service.go index 0b98ab00..d7ea1903 100644 --- a/core/cafe_service.go +++ b/core/cafe_service.go @@ -379,8 +379,7 @@ func (h *CafeService) notifyClient(pid peer.ID) error { if err != nil { return err } - - return ipfs.Publish(h.service.Node(), client, payload, time.Second*5) + return ipfs.Publish(h.service.Node(), client, payload) } // sendCafeRequest sends an authenticated request, retrying once after a session refresh @@ -752,7 +751,7 @@ func (h *CafeService) publishQuery(req *pb.PubSubQuery) error { if err != nil { return err } - return ipfs.Publish(h.service.Node(), topic, payload, 0) + return ipfs.Publish(h.service.Node(), topic, payload) } // handleChallenge receives a challenge request @@ -1395,9 +1394,7 @@ func (h *CafeService) handlePubSubQuery(pid peer.ID, env *pb.Envelope) (*pb.Enve if err != nil { return nil, err } - // allow some time for the receiver to collect the response after a connect - timeout := time.Duration(int(query.Timeout*2/3) * 1e9) - if err := ipfs.Publish(h.service.Node(), query.Topic, payload, timeout); err != nil { + if err := ipfs.Publish(h.service.Node(), query.Topic, payload); err != nil { return nil, err } } diff --git a/core/config.go b/core/config.go index 90e89214..a3b410f4 100644 --- a/core/config.go +++ b/core/config.go @@ -90,6 +90,7 @@ func applySwarmPortConfigOption(rep repo.Repo, ports string) error { ws = parts[1] default: tcp = GetRandomPort() + ws = GetRandomPort() } list := []string{ @@ -121,7 +122,7 @@ func ensureMobileConfig(repoPath string) error { conf.Swarm.ConnMgr.HighWater = 500 conf.Swarm.ConnMgr.GracePeriod = (time.Second * 20).String() conf.Swarm.DisableBandwidthMetrics = true - conf.Swarm.EnableAutoRelay = false + conf.Swarm.EnableAutoRelay = true return rep.SetConfig(conf) } @@ -141,7 +142,7 @@ func ensureServerConfig(repoPath string) error { conf.Addresses.NoAnnounce = config.DefaultServerFilters conf.Swarm.AddrFilters = config.DefaultServerFilters conf.Swarm.DisableNatPortMap = true - conf.Swarm.EnableRelayHop = false + conf.Swarm.EnableRelayHop = true conf.Swarm.EnableAutoNATService = true // tmp. ensure IPFS addresses are available in case we need to diff --git a/go.mod b/go.mod index a1ba256c..adacca7e 100644 --- a/go.mod +++ b/go.mod @@ -22,9 +22,9 @@ require ( github.com/gogo/protobuf v1.2.1 github.com/golang/protobuf v1.3.1 github.com/ipfs/go-cid v0.0.2 - github.com/ipfs/go-ipfs v0.4.21-rc3 + github.com/ipfs/go-ipfs v0.4.21 github.com/ipfs/go-ipfs-addr v0.0.1 - github.com/ipfs/go-ipfs-cmds v0.0.7 + github.com/ipfs/go-ipfs-cmds v0.0.8 github.com/ipfs/go-ipfs-config v0.0.3 github.com/ipfs/go-ipfs-files v0.0.3 github.com/ipfs/go-ipld-format v0.0.2 @@ -65,3 +65,5 @@ require ( gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 ) + +replace github.com/ipfs/go-ipfs => github.com/sanderpick/go-ipfs v0.4.22-0.20190606034924-0478a0eca246 diff --git a/go.sum b/go.sum index 74138d08..7c055544 100644 --- a/go.sum +++ b/go.sum @@ -244,8 +244,8 @@ github.com/ipfs/go-ds-measure v0.0.1 h1:PrCueug+yZLkDCOthZTXKinuoCal/GvlAT7cNxzr github.com/ipfs/go-ds-measure v0.0.1/go.mod h1:wiH6bepKsgyNKpz3nyb4erwhhIVpIxnZbsjN1QpVbbE= github.com/ipfs/go-fs-lock v0.0.1 h1:XHX8uW4jQBYWHj59XXcjg7BHlHxV9ZOYs6Y43yb7/l0= github.com/ipfs/go-fs-lock v0.0.1/go.mod h1:DNBekbboPKcxs1aukPSaOtFA3QfSdi5C855v0i9XJ8Y= -github.com/ipfs/go-ipfs v0.4.21-rc3 h1:GeSJJ9+r6nRHlaQNvUB2yAZ2aUQcJs7zuCa1cpMVCuM= -github.com/ipfs/go-ipfs v0.4.21-rc3/go.mod h1:kh51IS+/hCyR4DLH6+821iiIs/7413OK7ZPRtX4oWkM= +github.com/ipfs/go-ipfs v0.4.21 h1:KB4k3U90cesx60MwHEOqUoSCquZ+JXXHNdw0HIKBusc= +github.com/ipfs/go-ipfs v0.4.21/go.mod h1:T9zAmGO+rzbvLjDUm0DHtYXtdT4GhWOZeCPztgmt2V8= github.com/ipfs/go-ipfs-addr v0.0.1 h1:DpDFybnho9v3/a1dzJ5KnWdThWD1HrFLpQ+tWIyBaFI= github.com/ipfs/go-ipfs-addr v0.0.1/go.mod h1:uKTDljHT3Q3SUWzDLp3aYUi8MrY32fgNgogsIa0npjg= github.com/ipfs/go-ipfs-blockstore v0.0.1 h1:O9n3PbmTYZoNhkgkEyrXTznbmktIXif62xLX+8dPHzc= @@ -254,8 +254,8 @@ github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IW github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk= github.com/ipfs/go-ipfs-chunker v0.0.1 h1:cHUUxKFQ99pozdahi+uSC/3Y6HeRpi9oTeUHbE27SEw= github.com/ipfs/go-ipfs-chunker v0.0.1/go.mod h1:tWewYK0we3+rMbOh7pPFGDyypCtvGcBFymgY4rSDLAw= -github.com/ipfs/go-ipfs-cmds v0.0.7 h1:0N2NXxYAZn1kHpHrZMHZYRcVGJSxQogDD89oKc0GZMg= -github.com/ipfs/go-ipfs-cmds v0.0.7/go.mod h1:E5ou2OpwkAtR8LdneNdq4w1vPcrTWvh/6WPhjxGaX/Y= +github.com/ipfs/go-ipfs-cmds v0.0.8 h1:ZMo0ZeQOr10ZKY4yxYA3lRHUbnF/ZYcV9cpU0IrlGFI= +github.com/ipfs/go-ipfs-cmds v0.0.8/go.mod h1:TiK4e7/V31tuEb8YWDF8lN3qrnDH+BS7ZqWIeYJlAs8= github.com/ipfs/go-ipfs-config v0.0.1 h1:6ED08emzI1imdsAjixFi2pEyZxTVD5ECKtCOxLBx+Uc= github.com/ipfs/go-ipfs-config v0.0.1/go.mod h1:KDbHjNyg4e6LLQSQpkgQMBz6Jf4LXiWAcmnkcwmH0DU= github.com/ipfs/go-ipfs-config v0.0.3 h1:Ep4tRdP1iVK76BgOprD9B/qtOEdpno+1Xb57BqydgGk= @@ -670,6 +670,7 @@ github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd h1:CmH9+J6ZSsIjUK3dcGsnCnO41eRBOnY12zwkn5qVwgc= github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0+uJM6H+SuU8sEs5K5IQeKccPqeSjfgcKGgPk= github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= +github.com/sanderpick/go-ipfs v0.4.22-0.20190606034924-0478a0eca246/go.mod h1:T9zAmGO+rzbvLjDUm0DHtYXtdT4GhWOZeCPztgmt2V8= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/segmentio/ksuid v1.0.2 h1:9yBfKyw4ECGTdALaF09Snw3sLJmYIX6AbPJrAy6MrDc= github.com/segmentio/ksuid v1.0.2/go.mod h1:BXuJDr2byAiHuQaQtSKoXh1J0YmUDurywOXgB2w+OSU= diff --git a/ipfs/pubsub.go b/ipfs/pubsub.go index 96505a82..6c871f68 100644 --- a/ipfs/pubsub.go +++ b/ipfs/pubsub.go @@ -17,20 +17,12 @@ import ( const PublishTimeout = time.Second * 5 // Publish publishes data to a topic -func Publish(node *core.IpfsNode, topic string, data []byte, connectTimeout time.Duration) error { +func Publish(node *core.IpfsNode, topic string, data []byte) error { api, err := coreapi.NewCoreAPI(node) if err != nil { return err } - if connectTimeout > 0 { - cctx, cancel := context.WithTimeout(node.Context(), connectTimeout) - defer cancel() - if err := connectToTopicReceiver(node, cctx, topic); err != nil { - return err - } - } - ctx, pcancel := context.WithTimeout(node.Context(), PublishTimeout) defer pcancel() diff --git a/repo/config/init_ipfs.go b/repo/config/init_ipfs.go index 4580ad52..8bb310e4 100644 --- a/repo/config/init_ipfs.go +++ b/repo/config/init_ipfs.go @@ -38,6 +38,8 @@ var DefaultServerFilters = []string{ var TextileBootstrapAddresses = []string{ "/ip4/18.144.12.135/tcp/4001/ipfs/12D3KooWGBW3LfzypK3zgV4QxdPyUm3aEuwBDMKRRpCPm9FrJvar", // us-west-1a "/ip4/13.57.23.210/tcp/4001/ipfs/12D3KooWQue2dSRqnZTVvikoxorZQ5Qyyug3hV65rYnWYpYsNMRE", // us-west-1c + "/ip4/13.56.163.77/tcp/4001/ipfs/12D3KooWFrrmGJcQhE5h6VUvUEXdLH7gPKdWh2q4CEM62rFGcFpr", // us-west-beta + "/ip4/52.53.127.155/tcp/4001/ipfs/12D3KooWGN8VAsPHsHeJtoTbbzsGjs2LTmQZ6wFKvuPich1TYmYY", // us-west-dev "/ip4/18.221.167.133/tcp/4001/ipfs/12D3KooWERmHT6g4YkrPBTmhfDLjfi8b662vFCfvBXqzcdkPGQn1", // us-east-2a "/ip4/18.224.173.65/tcp/4001/ipfs/12D3KooWLh9Gd4C3knv4XqCyCuaNddfEoSLXgekVJzRyC5vsjv5d", // us-east-2b "/ip4/35.180.16.103/tcp/4001/ipfs/12D3KooWDhSfXZCBVAK6SNQu7h6mfGCBJtjMS44PW5YA5YCjVmjB", // eu-west-3a @@ -146,7 +148,7 @@ func InitIpfs(identity native.Identity, mobile bool, server bool) (*native.Confi }, DisableBandwidthMetrics: mobile, DisableNatPortMap: server, - EnableRelayHop: false, + EnableRelayHop: server, EnableAutoRelay: mobile, EnableAutoNATService: server, }, @@ -164,7 +166,7 @@ func InitIpfs(identity native.Identity, mobile bool, server bool) (*native.Confi } func addressesConfig(server bool) native.Addresses { - var noAnnounce []string + noAnnounce := make([]string, 0) if server { noAnnounce = DefaultServerFilters } diff --git a/service/main.go b/service/main.go index 26ba36b9..cdb3cd93 100644 --- a/service/main.go +++ b/service/main.go @@ -90,6 +90,7 @@ func (srv *Service) Ping(p peer.ID) (PeerStatus, error) { } if _, err := srv.SendRequest(p, env); err != nil { + log.Errorf("ping error: %s", err) return PeerOffline, nil } @@ -113,9 +114,9 @@ func (srv *Service) SendRequest(p peer.ID, pmes *pb.Envelope) (*pb.Envelope, err return nil, err } + _ = srv.updateFromMessage(ctx, p) + if rpmes == nil { - err := fmt.Errorf("no response from %s", p.Pretty()) - log.Debug(err.Error()) return nil, err } @@ -172,8 +173,6 @@ func (srv *Service) SendHTTPRequest(addr string, pmes *pb.Envelope) (*pb.Envelop } if rpmes.Message == nil { - err := fmt.Errorf("no response from %s", addr) - log.Debug(err.Error()) return nil, err } @@ -243,8 +242,6 @@ func (srv *Service) SendHTTPStreamRequest(addr string, pmes *pb.Envelope) (chan } if rpmes == nil || rpmes.Message == nil { - err := fmt.Errorf("no response from %s", addr) - log.Debug(err.Error()) errCh <- err return } @@ -276,11 +273,7 @@ func (srv *Service) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Envelop return err } - if err := ms.SendMessage(ctx, pmes); err != nil { - return err - } - - return nil + return ms.SendMessage(ctx, pmes) } // SendHTTPMessage sends a message over HTTP @@ -374,9 +367,7 @@ func (srv *Service) VerifyEnvelope(env *pb.Envelope, pid peer.ID) error { // handleError receives an error response func (srv *Service) handleError(env *pb.Envelope) error { if env.Message.Payload == nil && env.Message.Type != pb.Message_PONG { - err := fmt.Errorf("message payload with type %s is nil", env.Message.Type.String()) - log.Error(err.Error()) - return err + return fmt.Errorf("message payload with type %s is nil", env.Message.Type.String()) } if env.Message.Type != pb.Message_ERROR { @@ -408,11 +399,6 @@ func (srv *Service) handlePing(pid peer.ID, env *pb.Envelope) (*pb.Envelope, err var dhtReadMessageTimeout = time.Minute var ErrReadTimeout = fmt.Errorf("timed out reading response") -type bufferedWriteCloser interface { - ggio.WriteCloser - Flush() error -} - // The Protobuf writer performs multiple small writes when writing a message. // We need to buffer those writes, to make sure that we're not sending a new // packet for every single write. @@ -421,12 +407,26 @@ type bufferedDelimitedWriter struct { ggio.WriteCloser } -func newBufferedDelimitedWriter(str io.Writer) bufferedWriteCloser { - w := bufio.NewWriter(str) - return &bufferedDelimitedWriter{ - Writer: w, - WriteCloser: ggio.NewDelimitedWriter(w), +var writerPool = sync.Pool{ + New: func() interface{} { + w := bufio.NewWriter(nil) + return &bufferedDelimitedWriter{ + Writer: w, + WriteCloser: ggio.NewDelimitedWriter(w), + } + }, +} + +func writeMsg(w io.Writer, mes *pb.Envelope) error { + bw := writerPool.Get().(*bufferedDelimitedWriter) + bw.Reset(w) + err := bw.WriteMsg(mes) + if err == nil { + err = bw.Flush() } + bw.Reset(nil) + writerPool.Put(bw) + return err } func (w *bufferedDelimitedWriter) Flush() error { @@ -435,39 +435,37 @@ func (w *bufferedDelimitedWriter) Flush() error { // handleNewStream implements the inet.StreamHandler func (srv *Service) handleNewStream(s inet.Stream) { - go srv.handleNewMessage(s) + defer s.Reset() + if srv.handleNewMessage(s) { + // Gracefully close the stream for writes. + _ = s.Close() + } } -func (srv *Service) handleNewMessage(s inet.Stream) { +func (srv *Service) handleNewMessage(s inet.Stream) bool { ctx := srv.Node().Context() + cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) - w := newBufferedDelimitedWriter(cw) mPeer := s.Conn().RemotePeer() for { - select { - // end loop on context close - case <-srv.Node().Context().Done(): - return - default: - } - - // receive msg - pmes := new(pb.Envelope) - switch err := r.ReadMsg(pmes); err { + var pmes pb.Envelope + switch err := r.ReadMsg(&pmes); err { case io.EOF: - s.Close() - return - case nil: + return true default: - s.Reset() - log.Debugf("error unmarshaling data: %s", err) - return + // This string test is necessary because there isn't a single stream reset error + // instance in use. + if err.Error() != "stream reset" { + log.Debugf("error reading message: %s", err) + } + return false + case nil: } - if err := srv.VerifyEnvelope(pmes, mPeer); err != nil { + if err := srv.VerifyEnvelope(&pmes, mPeer); err != nil { log.Warningf("error verifying message: %s", err) continue } @@ -477,18 +475,20 @@ func (srv *Service) handleNewMessage(s inet.Stream) { if handler == nil { // get service specific handler handler = srv.handler.Handle - // TODO: handle stream requests over p2p? } log.Debugf("received %s from %s", pmes.Message.Type.String(), mPeer.Pretty()) - rpmes, err := handler(mPeer, pmes) + rpmes, err := handler(mPeer, &pmes) if err != nil { - s.Reset() - log.Errorf("%s handle message error: %s", pmes.Message.Type.String(), err) - return + log.Warningf("error handling message %s: %s", pmes.Message.Type.String(), err) + return false + } + + err = srv.updateFromMessage(ctx, mPeer) + if err != nil { + log.Warningf("error updating from: %s", err) } - // if nil response, return it before serializing if rpmes == nil { continue } @@ -497,14 +497,10 @@ func (srv *Service) handleNewMessage(s inet.Stream) { log.Debugf("responding with %s to %s", rpmes.Message.Type.String(), mPeer.Pretty()) // send out response msg - err = w.WriteMsg(rpmes) - if err == nil { - err = w.Flush() - } + err = writeMsg(cw, rpmes) if err != nil { - s.Reset() - log.Errorf("send response error: %s", err) - return + log.Debugf("error writing response: %s", err) + return false } } } @@ -545,12 +541,14 @@ func (srv *Service) listen(tag string) { } pmes := new(pb.Envelope) - if err := proto.Unmarshal(msg.Data(), pmes); err != nil { - log.Errorf("error unmarshaling pubsub message data from %s: %s", mPeer.Pretty(), err) + err := proto.Unmarshal(msg.Data(), pmes) + if err != nil { + log.Warningf("error unmarshaling pubsub message data from %s: %s", mPeer.Pretty(), err) continue } - if err := srv.VerifyEnvelope(pmes, mPeer); err != nil { + err = srv.VerifyEnvelope(pmes, mPeer) + if err != nil { log.Warningf("error verifying message: %s", err) continue } @@ -565,7 +563,7 @@ func (srv *Service) listen(tag string) { log.Debugf("received pubsub %s from %s", pmes.Message.Type.String(), mPeer.Pretty()) rpmes, err := handler(mPeer, pmes) if err != nil { - log.Errorf("%s handle message error: %s", pmes.Message.Type.String(), err) + log.Warningf("error handling message %s: %s", pmes.Message.Type.String(), err) continue } @@ -578,8 +576,9 @@ func (srv *Service) listen(tag string) { log.Debugf("responding with %s to %s", rpmes.Message.Type.String(), mPeer.Pretty()) ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) - if err := srv.SendMessage(ctx, mPeer, rpmes); err != nil { - log.Errorf("error sending message response to %s: %s", mPeer, err) + err = srv.SendMessage(ctx, mPeer, rpmes) + if err != nil { + log.Warningf("error sending message response to %s: %s", mPeer, err) } cancel() } diff --git a/service/sender.go b/service/sender.go index 74d40e79..8d380a80 100644 --- a/service/sender.go +++ b/service/sender.go @@ -9,23 +9,16 @@ import ( ggio "github.com/gogo/protobuf/io" inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" - protocol "github.com/libp2p/go-libp2p-protocol" "github.com/textileio/go-textile/pb" ) -type messageSender struct { - s inet.Stream - r ggio.ReadCloser - w bufferedWriteCloser - lk sync.Mutex - p peer.ID - - srv *Service - pt protocol.ID - reqs map[int32]chan *pb.Envelope - - invalid bool - singleMes int +func (srv *Service) updateFromMessage(ctx context.Context, p peer.ID) error { + // Make sure that this node is actually a DHT server, not just a client. + protos, err := srv.Node().Peerstore.SupportsProtocols(p, string(srv.handler.Protocol())) + if err == nil && len(protos) > 0 { + srv.Node().DHT.Update(ctx, p) + } + return nil } func (srv *Service) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) { @@ -35,12 +28,7 @@ func (srv *Service) messageSenderForPeer(ctx context.Context, p peer.ID) (*messa srv.smlk.Unlock() return ms, nil } - ms = &messageSender{ - p: p, - srv: srv, - pt: srv.handler.Protocol(), - reqs: make(map[int32]chan *pb.Envelope, 2), - } + ms = &messageSender{p: p, srv: srv} srv.strmap[p] = ms srv.smlk.Unlock() @@ -65,13 +53,25 @@ func (srv *Service) messageSenderForPeer(ctx context.Context, p peer.ID) (*messa return ms, nil } +type messageSender struct { + s inet.Stream + r ggio.ReadCloser + lk sync.Mutex + p peer.ID + + srv *Service + + invalid bool + singleMes int +} + // invalidate is called before this messageSender is removed from the strmap. // It prevents the messageSender from being reused/reinitialized and then // forgotten (leaving the stream open). func (ms *messageSender) invalidate() { ms.invalid = true if ms.s != nil { - ms.s.Reset() + _ = ms.s.Reset() ms.s = nil } } @@ -94,13 +94,12 @@ func (ms *messageSender) prep(ctx context.Context) error { return nil } - nstr, err := ms.srv.Node().PeerHost.NewStream(ctx, ms.p, ms.pt) + nstr, err := ms.srv.Node().PeerHost.NewStream(ctx, ms.p, ms.srv.handler.Protocol()) if err != nil { return err } ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax) - ms.w = newBufferedDelimitedWriter(nstr) ms.s = nstr return nil @@ -121,17 +120,16 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Envelope) err } if err := ms.writeMsg(pmes); err != nil { - ms.s.Reset() + _ = ms.s.Reset() ms.s = nil if retry { log.Info("error writing message, bailing: ", err) return err - } else { - log.Info("error writing message, trying again: ", err) - retry = true - continue } + log.Info("error writing message, trying again: ", err) + retry = true + continue } if ms.singleMes > streamReuseTries { @@ -155,22 +153,21 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Envelope) (*p } if err := ms.writeMsg(pmes); err != nil { - ms.s.Reset() + _ = ms.s.Reset() ms.s = nil if retry { log.Info("error writing message, bailing: ", err) return nil, err - } else { - log.Info("error writing message, trying again: ", err) - retry = true - continue } + log.Info("error writing message, trying again: ", err) + retry = true + continue } mes := new(pb.Envelope) if err := ms.ctxReadMsg(ctx, mes); err != nil { - ms.s.Reset() + _ = ms.s.Reset() ms.s = nil if retry { @@ -195,10 +192,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Envelope) (*p } func (ms *messageSender) writeMsg(pmes *pb.Envelope) error { - if err := ms.w.WriteMsg(pmes); err != nil { - return err - } - return ms.w.Flush() + return writeMsg(ms.s, pmes) } func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Envelope) error {