diff --git a/comm.go b/comm.go index d0d40da5..8e31ff75 100644 --- a/comm.go +++ b/comm.go @@ -30,7 +30,7 @@ func (p *PubSub) getHelloPacket() *RPC { } func (p *PubSub) handleNewStream(s network.Stream) { - r := ggio.NewDelimitedReader(s, 1<<20) + r := ggio.NewDelimitedReader(s, maxRPCSize) for { rpc := new(RPC) err := r.ReadMsg(&rpc.RPC) @@ -85,7 +85,7 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan } func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) { - r := ggio.NewDelimitedReader(s, 1<<20) + r := ggio.NewDelimitedReader(s, maxRPCSize) rpc := new(RPC) for { err := r.ReadMsg(&rpc.RPC) diff --git a/floodsub.go b/floodsub.go index 0d57c991..447f0d2e 100644 --- a/floodsub.go +++ b/floodsub.go @@ -70,11 +70,20 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) { continue } + if out.Size() > maxRPCSize { + log.Errorf( + "dropping RPC outbound message that's too large: %d > %d", + out.Size(), + maxRPCSize, + ) + continue + } + select { case mch <- out: default: - log.Infof("dropping message to peer %s: queue full", pid) // Drop it. The peer is too slow. + log.Infof("dropping message to peer %s: queue full", pid) } } } diff --git a/gossipsub.go b/gossipsub.go index 4aaf345e..c8b341c6 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -342,15 +342,25 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) { return } - select { - case mch <- out: - default: - log.Infof("dropping message to peer %s: queue full", p) - // push control messages that need to be retried - ctl := out.GetControl() - if ctl != nil { - gs.pushControl(p, ctl) + if out.Size() > maxRPCSize { + log.Errorf( + "dropping RPC outbound message that's too large: %d > %d", + out.Size(), + maxRPCSize, + ) + } else { + select { + case mch <- out: + return + default: } + log.Infof("dropping message to peer %s: queue full", p) + } + + // push control messages that need to be retried + ctl = out.GetControl() + if ctl != nil { + gs.pushControl(p, ctl) } } diff --git a/pubsub.go b/pubsub.go index 6df169d8..19649b4d 100644 --- a/pubsub.go +++ b/pubsub.go @@ -25,6 +25,13 @@ var ( TimeCacheDuration = 120 * time.Second ) +const ( + /// maxMessageSize is the maximum size of outbound message. + maxMessageSize = 1 << 20 // 1MiB + // maxRPCSize is the maximum size of any RPC protobuf. + maxRPCSize = maxMessageSize + (64 * 1024) // a message + 64 KiB. +) + var log = logging.Logger("pubsub") // PubSub is the implementation of the pubsub system. @@ -696,7 +703,12 @@ func (p *PubSub) GetTopics() []string { } // Publish publishes data to the given topic. +// +// The message data must be less than the maximum message size, 1MiB. func (p *PubSub) Publish(topic string, data []byte) error { + if len(data) > maxMessageSize { + return fmt.Errorf("message too large: %d > %d", len(data), maxMessageSize) + } seqno := p.nextSeqno() m := &pb.Message{ Data: data,