From 7f3d7e61b4c6dd115e80eb0c9118cdeebd903ea1 Mon Sep 17 00:00:00 2001 From: mmsqe Date: Tue, 14 Feb 2023 08:58:46 +0800 Subject: [PATCH 01/27] limit the number of batch requests to 100 --- rpc/handler.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/rpc/handler.go b/rpc/handler.go index c2e7d7dc08c6..346d8152b842 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -172,6 +172,14 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { return } + if len(msgs) > 100 { + h.startCallProc(func(cp *callProc) { + resp := errorMessage(&invalidRequestError{"batch too large"}) + h.conn.writeJSON(cp.ctx, resp, true) + }) + return + } + // Handle non-call messages first: calls := make([]*jsonrpcMessage, 0, len(msgs)) for _, msg := range msgs { From 667a4088d22ada3878a500deab16dd665824a1d8 Mon Sep 17 00:00:00 2001 From: mmsqe Date: Tue, 14 Feb 2023 10:40:21 +0800 Subject: [PATCH 02/27] limit the size of the response packet to 10MB --- rpc/errors.go | 4 +++- rpc/handler.go | 23 +++++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/rpc/errors.go b/rpc/errors.go index 7188332d551e..36d2c82a73c7 100644 --- a/rpc/errors.go +++ b/rpc/errors.go @@ -61,12 +61,14 @@ const ( errcodeDefault = -32000 errcodeNotificationsUnsupported = -32001 errcodeTimeout = -32002 + errcodeResponseTooLarge = -32003 errcodePanic = -32603 errcodeMarshalError = -32603 ) const ( - errMsgTimeout = "request timed out" + errMsgTimeout = "request timed out" + errMsgResponseTooLarge = "response too large" ) type methodNotFoundError struct{ method string } diff --git a/rpc/handler.go b/rpc/handler.go index 346d8152b842..45fbb1ed48f7 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -149,6 +149,21 @@ func (b *batchCallBuffer) timeout(ctx context.Context, conn jsonWriter) { b.doWrite(ctx, conn, true) } +// responseTooLarge sends the responses added so far. For the remaining unanswered call +// messages, it sends a response too large error response. +func (b *batchCallBuffer) responseTooLarge(ctx context.Context, conn jsonWriter) { + b.mutex.Lock() + defer b.mutex.Unlock() + + for _, msg := range b.calls { + if !msg.isNotification() { + resp := msg.errorResponse(&internalServerError{errcodeResponseTooLarge, errMsgResponseTooLarge}) + b.resp = append(b.resp, resp) + } + } + b.doWrite(ctx, conn, true) +} + // doWrite actually writes the response. // This assumes b.mutex is held. func (b *batchCallBuffer) doWrite(ctx context.Context, conn jsonWriter, isErrorResponse bool) { @@ -211,6 +226,8 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { }) } + resBytes := 0 + maxBytes := 10 * 1000 * 1000 for { // No need to handle rest of calls if timed out. if cp.ctx.Err() != nil { @@ -222,6 +239,12 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { } resp := h.handleCallMsg(cp, msg) callBuffer.pushResponse(resp) + if resp != nil { + if resBytes += len(resp.Result); resBytes > maxBytes { + callBuffer.responseTooLarge(cp.ctx, h.conn) + break + } + } } if timer != nil { timer.Stop() From 21aec8f8e05c1ec199a2f46f44f94ce354370399 Mon Sep 17 00:00:00 2001 From: mmsqe Date: Tue, 14 Feb 2023 21:03:13 +0800 Subject: [PATCH 03/27] add batch limit related config --- rpc/handler.go | 31 ++++++++++++++++--------------- rpc/server.go | 5 +++++ 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/rpc/handler.go b/rpc/handler.go index 45fbb1ed48f7..ec390e084c1c 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -49,17 +49,19 @@ import ( // h.removeRequestOp(op) // timeout, etc. // } type handler struct { - reg *serviceRegistry - unsubscribeCb *callback - idgen func() ID // subscription ID generator - respWait map[string]*requestOp // active client requests - clientSubs map[string]*ClientSubscription // active client subscriptions - callWG sync.WaitGroup // pending call goroutines - rootCtx context.Context // canceled by close() - cancelRoot func() // cancel function for rootCtx - conn jsonWriter // where responses will be sent - log log.Logger - allowSubscribe bool + reg *serviceRegistry + unsubscribeCb *callback + idgen func() ID // subscription ID generator + respWait map[string]*requestOp // active client requests + clientSubs map[string]*ClientSubscription // active client subscriptions + callWG sync.WaitGroup // pending call goroutines + rootCtx context.Context // canceled by close() + cancelRoot func() // cancel function for rootCtx + conn jsonWriter // where responses will be sent + log log.Logger + allowSubscribe bool + batchRequestLimit int + batchResponseMaxSize int subLock sync.Mutex serverSubs map[ID]*Subscription @@ -187,7 +189,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { return } - if len(msgs) > 100 { + if len(msgs) > h.batchRequestLimit && h.batchRequestLimit != 0 { h.startCallProc(func(cp *callProc) { resp := errorMessage(&invalidRequestError{"batch too large"}) h.conn.writeJSON(cp.ctx, resp, true) @@ -227,7 +229,6 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { } resBytes := 0 - maxBytes := 10 * 1000 * 1000 for { // No need to handle rest of calls if timed out. if cp.ctx.Err() != nil { @@ -239,8 +240,8 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { } resp := h.handleCallMsg(cp, msg) callBuffer.pushResponse(resp) - if resp != nil { - if resBytes += len(resp.Result); resBytes > maxBytes { + if resp != nil && h.batchResponseMaxSize != 0 { + if resBytes += len(resp.Result); resBytes > h.batchResponseMaxSize { callBuffer.responseTooLarge(cp.ctx, h.conn) break } diff --git a/rpc/server.go b/rpc/server.go index 9c72c26d7b94..29e83c9bd748 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -39,6 +39,9 @@ const ( // OptionSubscriptions is an indication that the codec supports RPC notifications OptionSubscriptions = 1 << iota // support pub sub + + BatchRequestLimit = 100 + BatchResponseMaxSize = 10 * 1000 * 1000 // 10MB ) // Server is an RPC server. @@ -120,6 +123,8 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { h := newHandler(ctx, codec, s.idgen, &s.services) h.allowSubscribe = false + h.batchRequestLimit = BatchRequestLimit + h.batchResponseMaxSize = BatchResponseMaxSize defer h.close(io.EOF, nil) reqs, batch, err := codec.readBatch() From 6b8b39dcbf5941dc6725c48dfa29de0fc711409a Mon Sep 17 00:00:00 2001 From: mmsqe Date: Tue, 14 Feb 2023 21:16:24 +0800 Subject: [PATCH 04/27] update doc --- rpc/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rpc/server.go b/rpc/server.go index 29e83c9bd748..04bd8cf1dcdb 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -40,8 +40,8 @@ const ( // OptionSubscriptions is an indication that the codec supports RPC notifications OptionSubscriptions = 1 << iota // support pub sub - BatchRequestLimit = 100 - BatchResponseMaxSize = 10 * 1000 * 1000 // 10MB + BatchRequestLimit = 100 // Maximum number of requests in a batch + BatchResponseMaxSize = 10 * 1000 * 1000 // Maximum number of bytes returned from calls (10MB) ) // Server is an RPC server. From c4ac65c082578b16843012b43dfbc1e84461e929 Mon Sep 17 00:00:00 2001 From: mmsqe Date: Tue, 14 Feb 2023 21:29:29 +0800 Subject: [PATCH 05/27] apply limit for server & client --- rpc/handler.go | 27 +++++++++++++++++---------- rpc/server.go | 5 ----- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/rpc/handler.go b/rpc/handler.go index ec390e084c1c..9d7d0088fc56 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -28,6 +28,11 @@ import ( "github.com/ethereum/go-ethereum/log" ) +const ( + BatchRequestLimit = 100 // Maximum number of requests in a batch + BatchResponseMaxSize = 10 * 1000 * 1000 // Maximum number of bytes returned from calls (10MB) +) + // handler handles JSON-RPC messages. There is one handler per connection. Note that // handler is not safe for concurrent use. Message handling never blocks indefinitely // because RPCs are processed on background goroutines launched by handler. @@ -75,16 +80,18 @@ type callProc struct { func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler { rootCtx, cancelRoot := context.WithCancel(connCtx) h := &handler{ - reg: reg, - idgen: idgen, - conn: conn, - respWait: make(map[string]*requestOp), - clientSubs: make(map[string]*ClientSubscription), - rootCtx: rootCtx, - cancelRoot: cancelRoot, - allowSubscribe: true, - serverSubs: make(map[ID]*Subscription), - log: log.Root(), + reg: reg, + idgen: idgen, + conn: conn, + respWait: make(map[string]*requestOp), + clientSubs: make(map[string]*ClientSubscription), + rootCtx: rootCtx, + cancelRoot: cancelRoot, + allowSubscribe: true, + serverSubs: make(map[ID]*Subscription), + log: log.Root(), + batchRequestLimit: BatchRequestLimit, + batchResponseMaxSize: BatchResponseMaxSize, } if conn.remoteAddr() != "" { h.log = h.log.New("conn", conn.remoteAddr()) diff --git a/rpc/server.go b/rpc/server.go index 04bd8cf1dcdb..9c72c26d7b94 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -39,9 +39,6 @@ const ( // OptionSubscriptions is an indication that the codec supports RPC notifications OptionSubscriptions = 1 << iota // support pub sub - - BatchRequestLimit = 100 // Maximum number of requests in a batch - BatchResponseMaxSize = 10 * 1000 * 1000 // Maximum number of bytes returned from calls (10MB) ) // Server is an RPC server. @@ -123,8 +120,6 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { h := newHandler(ctx, codec, s.idgen, &s.services) h.allowSubscribe = false - h.batchRequestLimit = BatchRequestLimit - h.batchResponseMaxSize = BatchResponseMaxSize defer h.close(io.EOF, nil) reqs, batch, err := codec.readBatch() From c9015faa953985fae5b14fca5774aaf7f42aadef Mon Sep 17 00:00:00 2001 From: mmsqe Date: Wed, 15 Feb 2023 22:49:10 +0800 Subject: [PATCH 06/27] make batch related limit configurable --- cmd/clef/main.go | 1 + cmd/geth/main.go | 2 ++ cmd/utils/flags.go | 18 ++++++++++++++++++ node/api.go | 4 ++-- node/config.go | 6 ++++++ node/node.go | 13 +++++++------ node/rpcstack.go | 6 ++++-- node/rpcstack_test.go | 4 ++-- rpc/client.go | 2 +- rpc/handler.go | 11 +++-------- rpc/server.go | 16 ++++++++++++---- 11 files changed, 58 insertions(+), 25 deletions(-) diff --git a/cmd/clef/main.go b/cmd/clef/main.go index 2788ddc33b9e..1ee78642d7a1 100644 --- a/cmd/clef/main.go +++ b/cmd/clef/main.go @@ -732,6 +732,7 @@ func signer(c *cli.Context) error { cors := utils.SplitAndTrim(c.String(utils.HTTPCORSDomainFlag.Name)) srv := rpc.NewServer() + srv.SetBatchLimits(utils.BatchRequestLimit.Value, utils.BatchResponseMaxSize.Value) err := node.RegisterApis(rpcAPI, []string{"account"}, srv) if err != nil { utils.Fatalf("Could not register API: %w", err) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 5ba070249897..6efc1932fd4a 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -180,6 +180,8 @@ var ( utils.RPCGlobalEVMTimeoutFlag, utils.RPCGlobalTxFeeCapFlag, utils.AllowUnprotectedTxs, + utils.BatchRequestLimit, + utils.BatchResponseMaxSize, } metricsFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 08de71ee831b..f4e9f7917737 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -786,6 +786,16 @@ var ( Usage: "Allow for unprotected (non EIP155 signed) transactions to be submitted via RPC", Category: flags.APICategory, } + BatchRequestLimit = &cli.IntFlag{ + Name: "batch.request-limit", + Usage: "Maximum number of requests in a batch", + Value: 100, + } + BatchResponseMaxSize = &cli.IntFlag{ + Name: "batch.response-max-size", + Usage: "Maximum number of bytes returned from calls (10MB)", + Value: 10 * 1000 * 1000, + } EnablePersonal = &cli.BoolFlag{ Name: "rpc.enabledeprecatedpersonal", Usage: "Enables the (deprecated) personal namespace", @@ -1209,6 +1219,14 @@ func setHTTP(ctx *cli.Context, cfg *node.Config) { if ctx.IsSet(AllowUnprotectedTxs.Name) { cfg.AllowUnprotectedTxs = ctx.Bool(AllowUnprotectedTxs.Name) } + + if ctx.IsSet(BatchRequestLimit.Name) { + cfg.BatchRequestLimit = ctx.Int(BatchRequestLimit.Name) + } + + if ctx.IsSet(BatchResponseMaxSize.Name) { + cfg.BatchResponseMaxSize = ctx.Int(BatchResponseMaxSize.Name) + } } // setGraphQL creates the GraphQL listener interface string from the set diff --git a/node/api.go b/node/api.go index 15892a270b66..dd6a208b0734 100644 --- a/node/api.go +++ b/node/api.go @@ -199,7 +199,7 @@ func (api *adminAPI) StartHTTP(host *string, port *int, cors *string, apis *stri if err := api.node.http.setListenAddr(*host, *port); err != nil { return false, err } - if err := api.node.http.enableRPC(api.node.rpcAPIs, config); err != nil { + if err := api.node.http.enableRPC(api.node.rpcAPIs, config, api.node.config.BatchRequestLimit, api.node.config.BatchResponseMaxSize); err != nil { return false, err } if err := api.node.http.start(); err != nil { @@ -270,7 +270,7 @@ func (api *adminAPI) StartWS(host *string, port *int, allowedOrigins *string, ap return false, err } openApis, _ := api.node.getAPIs() - if err := server.enableWS(openApis, config); err != nil { + if err := server.enableWS(openApis, config, api.node.config.BatchRequestLimit, api.node.config.BatchResponseMaxSize); err != nil { return false, err } if err := server.start(); err != nil { diff --git a/node/config.go b/node/config.go index 37a7d5837fa2..684e032cfeea 100644 --- a/node/config.go +++ b/node/config.go @@ -197,6 +197,12 @@ type Config struct { // AllowUnprotectedTxs allows non EIP-155 protected transactions to be send over RPC. AllowUnprotectedTxs bool `toml:",omitempty"` + // BatchRequestLimit is the maximum number of requests in a batch. + BatchRequestLimit int `toml:",omitempty"` + + // BatchResponseMaxSize is the maximum number of bytes returned from calls (10MB). + BatchResponseMaxSize int `toml:",omitempty"` + // JWTSecret is the path to the hex-encoded jwt secret. JWTSecret string `toml:",omitempty"` diff --git a/node/node.go b/node/node.go index 112a771ab090..37086d293309 100644 --- a/node/node.go +++ b/node/node.go @@ -101,10 +101,11 @@ func New(conf *Config) (*Node, error) { if strings.HasSuffix(conf.Name, ".ipc") { return nil, errors.New(`Config.Name cannot end in ".ipc"`) } - + server := rpc.NewServer() + server.SetBatchLimits(conf.BatchRequestLimit, conf.BatchResponseMaxSize) node := &Node{ config: conf, - inprocHandler: rpc.NewServer(), + inprocHandler: server, eventmux: new(event.TypeMux), log: conf.Logger, stop: make(chan struct{}), @@ -412,7 +413,7 @@ func (n *Node) startRPC() error { Vhosts: n.config.HTTPVirtualHosts, Modules: n.config.HTTPModules, prefix: n.config.HTTPPathPrefix, - }); err != nil { + }, n.config.BatchRequestLimit, n.config.BatchResponseMaxSize); err != nil { return err } servers = append(servers, server) @@ -428,7 +429,7 @@ func (n *Node) startRPC() error { Modules: n.config.WSModules, Origins: n.config.WSOrigins, prefix: n.config.WSPathPrefix, - }); err != nil { + }, n.config.BatchRequestLimit, n.config.BatchResponseMaxSize); err != nil { return err } servers = append(servers, server) @@ -447,7 +448,7 @@ func (n *Node) startRPC() error { Modules: DefaultAuthModules, prefix: DefaultAuthPrefix, jwtSecret: secret, - }); err != nil { + }, n.config.BatchRequestLimit, n.config.BatchResponseMaxSize); err != nil { return err } servers = append(servers, server) @@ -461,7 +462,7 @@ func (n *Node) startRPC() error { Origins: DefaultAuthOrigins, prefix: DefaultAuthPrefix, jwtSecret: secret, - }); err != nil { + }, n.config.BatchRequestLimit, n.config.BatchResponseMaxSize); err != nil { return err } servers = append(servers, server) diff --git a/node/rpcstack.go b/node/rpcstack.go index 97d591642c09..bc65d65b43d7 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -287,7 +287,7 @@ func (h *httpServer) doStop() { } // enableRPC turns on JSON-RPC over HTTP on the server. -func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error { +func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig, batchRequestLimit, batchResponseMaxSize int) error { h.mu.Lock() defer h.mu.Unlock() @@ -297,6 +297,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error { // Create RPC server and handler. srv := rpc.NewServer() + srv.SetBatchLimits(batchRequestLimit, batchResponseMaxSize) if err := RegisterApis(apis, config.Modules, srv); err != nil { return err } @@ -319,7 +320,7 @@ func (h *httpServer) disableRPC() bool { } // enableWS turns on JSON-RPC over WebSocket on the server. -func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error { +func (h *httpServer) enableWS(apis []rpc.API, config wsConfig, batchRequestLimit, batchResponseMaxSize int) error { h.mu.Lock() defer h.mu.Unlock() @@ -328,6 +329,7 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error { } // Create RPC server and handler. srv := rpc.NewServer() + srv.SetBatchLimits(batchRequestLimit, batchResponseMaxSize) if err := RegisterApis(apis, config.Modules, srv); err != nil { return err } diff --git a/node/rpcstack_test.go b/node/rpcstack_test.go index 4d10e61e2dec..f6fd14cd8f42 100644 --- a/node/rpcstack_test.go +++ b/node/rpcstack_test.go @@ -242,9 +242,9 @@ func createAndStartServer(t *testing.T, conf *httpConfig, ws bool, wsConf *wsCon timeouts = &rpc.DefaultHTTPTimeouts } srv := newHTTPServer(testlog.Logger(t, log.LvlDebug), *timeouts) - assert.NoError(t, srv.enableRPC(apis(), *conf)) + assert.NoError(t, srv.enableRPC(apis(), *conf, 0, 0)) if ws { - assert.NoError(t, srv.enableWS(nil, *wsConf)) + assert.NoError(t, srv.enableWS(nil, *wsConf, 0, 0)) } assert.NoError(t, srv.setListenAddr("localhost", 0)) assert.NoError(t, srv.start()) diff --git a/rpc/client.go b/rpc/client.go index a509cb2e0fa0..853f55fab732 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -114,7 +114,7 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn { ctx := context.Background() ctx = context.WithValue(ctx, clientContextKey{}, c) ctx = context.WithValue(ctx, peerInfoContextKey{}, conn.peerInfo()) - handler := newHandler(ctx, conn, c.idgen, c.services) + handler := newHandler(ctx, conn, c.idgen, c.services, 0, 0) return &clientConn{conn, handler} } diff --git a/rpc/handler.go b/rpc/handler.go index 9d7d0088fc56..1bd6f656f8d9 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -28,11 +28,6 @@ import ( "github.com/ethereum/go-ethereum/log" ) -const ( - BatchRequestLimit = 100 // Maximum number of requests in a batch - BatchResponseMaxSize = 10 * 1000 * 1000 // Maximum number of bytes returned from calls (10MB) -) - // handler handles JSON-RPC messages. There is one handler per connection. Note that // handler is not safe for concurrent use. Message handling never blocks indefinitely // because RPCs are processed on background goroutines launched by handler. @@ -77,7 +72,7 @@ type callProc struct { notifiers []*Notifier } -func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler { +func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry, batchRequestLimit, batchResponseMaxSize int) *handler { rootCtx, cancelRoot := context.WithCancel(connCtx) h := &handler{ reg: reg, @@ -90,8 +85,8 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg * allowSubscribe: true, serverSubs: make(map[ID]*Subscription), log: log.Root(), - batchRequestLimit: BatchRequestLimit, - batchResponseMaxSize: BatchResponseMaxSize, + batchRequestLimit: batchRequestLimit, + batchResponseMaxSize: batchResponseMaxSize, } if conn.remoteAddr() != "" { h.log = h.log.New("conn", conn.remoteAddr()) diff --git a/rpc/server.go b/rpc/server.go index 9c72c26d7b94..4731bf36106c 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -46,9 +46,11 @@ type Server struct { services serviceRegistry idgen func() ID - mutex sync.Mutex - codecs map[ServerCodec]struct{} - run int32 + mutex sync.Mutex + codecs map[ServerCodec]struct{} + run int32 + batchRequestLimit int + batchResponseMaxSize int } // NewServer creates a new server instance with no registered handlers. @@ -65,6 +67,12 @@ func NewServer() *Server { return server } +// SetBatchLimits set maximum number of requests in a batch and maximum number of bytes returned from calls +func (s *Server) SetBatchLimits(limit int, size int) { + s.batchRequestLimit = limit + s.batchResponseMaxSize = size +} + // RegisterName creates a service for the given receiver type under the given name. When no // methods on the given receiver match the criteria to be either a RPC method or a // subscription an error is returned. Otherwise a new service is created and added to the @@ -118,7 +126,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { return } - h := newHandler(ctx, codec, s.idgen, &s.services) + h := newHandler(ctx, codec, s.idgen, &s.services, s.batchRequestLimit, s.batchResponseMaxSize) h.allowSubscribe = false defer h.close(io.EOF, nil) From 2c04aa0cd0cb9c907477b0de812f0f284fc1ecc2 Mon Sep 17 00:00:00 2001 From: mmsqe Date: Thu, 16 Feb 2023 12:13:26 +0800 Subject: [PATCH 07/27] add SetBatchLimits for client with default limit --- cmd/utils/flags.go | 14 ++++++++------ rpc/client.go | 21 ++++++++++++++++++++- rpc/server.go | 1 + 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index f4e9f7917737..d0d3da8ebc50 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -787,14 +787,16 @@ var ( Category: flags.APICategory, } BatchRequestLimit = &cli.IntFlag{ - Name: "batch.request-limit", - Usage: "Maximum number of requests in a batch", - Value: 100, + Name: "batch.request-limit", + Usage: "Maximum number of requests in a batch", + Value: rpc.BatchRequestLimit, + Category: flags.APICategory, } BatchResponseMaxSize = &cli.IntFlag{ - Name: "batch.response-max-size", - Usage: "Maximum number of bytes returned from calls (10MB)", - Value: 10 * 1000 * 1000, + Name: "batch.response-max-size", + Usage: "Maximum number of bytes returned from calls (10MB)", + Value: rpc.BatchResponseMaxSize, + Category: flags.APICategory, } EnablePersonal = &cli.BoolFlag{ Name: "rpc.enabledeprecatedpersonal", diff --git a/rpc/client.go b/rpc/client.go index 853f55fab732..d62dbc65ea5b 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -44,6 +44,10 @@ const ( // Timeouts defaultDialTimeout = 10 * time.Second // used if context has no deadline subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls + + // Batch limits + BatchRequestLimit = 100 // Maximum number of requests in a batch + BatchResponseMaxSize = 10 * 1000 * 1000 // Maximum number of bytes returned from calls (10MB) ) const ( @@ -99,6 +103,9 @@ type Client struct { reqInit chan *requestOp // register response IDs, takes write lock reqSent chan error // signals write completion, releases write lock reqTimeout chan *requestOp // removes response IDs when call timeout expires + + batchRequestLimit int + batchResponseMaxSize int } type reconnectFunc func(context.Context) (ServerCodec, error) @@ -114,10 +121,22 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn { ctx := context.Background() ctx = context.WithValue(ctx, clientContextKey{}, c) ctx = context.WithValue(ctx, peerInfoContextKey{}, conn.peerInfo()) - handler := newHandler(ctx, conn, c.idgen, c.services, 0, 0) + if c.batchRequestLimit == 0 { + c.batchRequestLimit = BatchRequestLimit + } + if c.batchResponseMaxSize == 0 { + c.batchResponseMaxSize = BatchResponseMaxSize + } + handler := newHandler(ctx, conn, c.idgen, c.services, c.batchRequestLimit, c.batchResponseMaxSize) return &clientConn{conn, handler} } +// SetBatchLimits set maximum number of requests in a batch and maximum number of bytes returned from calls +func (c *Client) SetBatchLimits(limit int, size int) { + c.batchRequestLimit = limit + c.batchResponseMaxSize = size +} + func (cc *clientConn) close(err error, inflightReq *requestOp) { cc.handler.close(err, inflightReq) cc.codec.close() diff --git a/rpc/server.go b/rpc/server.go index 4731bf36106c..74b01dda253a 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -95,6 +95,7 @@ func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { defer s.untrackCodec(codec) c := initClient(codec, s.idgen, &s.services) + c.SetBatchLimits(s.batchRequestLimit, s.batchResponseMaxSize) <-codec.closed() c.Close() } From a43fda5daea6f17a8c78c5c38fc456e279431f4e Mon Sep 17 00:00:00 2001 From: mmsqe Date: Thu, 16 Feb 2023 17:55:14 +0800 Subject: [PATCH 08/27] rename namespace --- cmd/utils/flags.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index d0d3da8ebc50..e85f982b311f 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -787,13 +787,13 @@ var ( Category: flags.APICategory, } BatchRequestLimit = &cli.IntFlag{ - Name: "batch.request-limit", + Name: "rpc.batch-request-limit", Usage: "Maximum number of requests in a batch", Value: rpc.BatchRequestLimit, Category: flags.APICategory, } BatchResponseMaxSize = &cli.IntFlag{ - Name: "batch.response-max-size", + Name: "rpc.batch-response-max-size", Usage: "Maximum number of bytes returned from calls (10MB)", Value: rpc.BatchResponseMaxSize, Category: flags.APICategory, From d7c86739c42dcf257e503cd85c946182b849a69e Mon Sep 17 00:00:00 2001 From: mmsqe Date: Thu, 16 Feb 2023 21:19:16 +0800 Subject: [PATCH 09/27] allow set limit with dial after client get init --- rpc/client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rpc/client.go b/rpc/client.go index d62dbc65ea5b..279cc1fd7575 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -135,6 +135,10 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn { func (c *Client) SetBatchLimits(limit int, size int) { c.batchRequestLimit = limit c.batchResponseMaxSize = size + select { + case c.reconnected <- c.writeConn.(ServerCodec): + default: + } } func (cc *clientConn) close(err error, inflightReq *requestOp) { From 7fd2b776c164507f7e07f44300a2c9104cc73ffa Mon Sep 17 00:00:00 2001 From: mmsqe Date: Thu, 16 Feb 2023 21:20:23 +0800 Subject: [PATCH 10/27] set limit when init client --- rpc/client.go | 37 ++++++++++++++++++++++--------------- rpc/server.go | 5 ++--- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 279cc1fd7575..d71605599557 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -132,7 +132,8 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn { } // SetBatchLimits set maximum number of requests in a batch and maximum number of bytes returned from calls -func (c *Client) SetBatchLimits(limit int, size int) { +// And update non-http connection with limit +func (c *Client) SetBatchLimits(limit, size int) { c.batchRequestLimit = limit c.batchResponseMaxSize = size select { @@ -254,22 +255,24 @@ func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) return c, nil } -func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client { +func initClientWithBatchLimits(conn ServerCodec, idgen func() ID, services *serviceRegistry, limit, size int) *Client { _, isHTTP := conn.(*httpConn) c := &Client{ - isHTTP: isHTTP, - idgen: idgen, - services: services, - writeConn: conn, - close: make(chan struct{}), - closing: make(chan struct{}), - didClose: make(chan struct{}), - reconnected: make(chan ServerCodec), - readOp: make(chan readOp), - readErr: make(chan error), - reqInit: make(chan *requestOp), - reqSent: make(chan error, 1), - reqTimeout: make(chan *requestOp), + isHTTP: isHTTP, + idgen: idgen, + services: services, + writeConn: conn, + close: make(chan struct{}), + closing: make(chan struct{}), + didClose: make(chan struct{}), + reconnected: make(chan ServerCodec), + readOp: make(chan readOp), + readErr: make(chan error), + reqInit: make(chan *requestOp), + reqSent: make(chan error, 1), + reqTimeout: make(chan *requestOp), + batchRequestLimit: limit, + batchResponseMaxSize: size, } if !isHTTP { go c.dispatch(conn) @@ -277,6 +280,10 @@ func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *C return c } +func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client { + return initClientWithBatchLimits(conn, idgen, services, 0, 0) +} + // RegisterName creates a service for the given receiver type under the given name. When no // methods on the given receiver match the criteria to be either a RPC method or a // subscription an error is returned. Otherwise a new service is created and added to the diff --git a/rpc/server.go b/rpc/server.go index 74b01dda253a..e0835ff9570c 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -68,7 +68,7 @@ func NewServer() *Server { } // SetBatchLimits set maximum number of requests in a batch and maximum number of bytes returned from calls -func (s *Server) SetBatchLimits(limit int, size int) { +func (s *Server) SetBatchLimits(limit, size int) { s.batchRequestLimit = limit s.batchResponseMaxSize = size } @@ -94,8 +94,7 @@ func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { } defer s.untrackCodec(codec) - c := initClient(codec, s.idgen, &s.services) - c.SetBatchLimits(s.batchRequestLimit, s.batchResponseMaxSize) + c := initClientWithBatchLimits(codec, s.idgen, &s.services, s.batchRequestLimit, s.batchResponseMaxSize) <-codec.closed() c.Close() } From 733910cbced9b66e070dbc23e2418e40c1c107e7 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 17 Feb 2023 18:21:22 +0100 Subject: [PATCH 11/27] rpc: configure client batch limits through options It's not really possible to change these values in a running client without recreating the connection, and I suspect users will rarely want to do that. --- cmd/utils/flags.go | 4 +-- rpc/client.go | 61 ++++++++++++++++++++-------------------------- rpc/client_opt.go | 29 ++++++++++++++++++++++ rpc/client_test.go | 3 ++- rpc/http.go | 2 +- rpc/inproc.go | 3 ++- rpc/ipc.go | 3 ++- rpc/server.go | 32 +++++++++++++++--------- rpc/stdio.go | 3 ++- rpc/websocket.go | 4 +-- 10 files changed, 90 insertions(+), 54 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index e85f982b311f..d94e6053c0bb 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -789,13 +789,13 @@ var ( BatchRequestLimit = &cli.IntFlag{ Name: "rpc.batch-request-limit", Usage: "Maximum number of requests in a batch", - Value: rpc.BatchRequestLimit, + Value: rpc.DefaultBatchRequestLimit, Category: flags.APICategory, } BatchResponseMaxSize = &cli.IntFlag{ Name: "rpc.batch-response-max-size", Usage: "Maximum number of bytes returned from calls (10MB)", - Value: rpc.BatchResponseMaxSize, + Value: rpc.DefaultBatchResponseMaxSize, Category: flags.APICategory, } EnablePersonal = &cli.BoolFlag{ diff --git a/rpc/client.go b/rpc/client.go index d71605599557..6dcb5d97f957 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -46,8 +46,8 @@ const ( subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls // Batch limits - BatchRequestLimit = 100 // Maximum number of requests in a batch - BatchResponseMaxSize = 10 * 1000 * 1000 // Maximum number of bytes returned from calls (10MB) + DefaultBatchRequestLimit = 100 // Maximum number of requests in a batch + DefaultBatchResponseMaxSize = 10 * 1000 * 1000 // Maximum number of bytes returned from calls (10MB) ) const ( @@ -88,6 +88,10 @@ type Client struct { // This function, if non-nil, is called when the connection is lost. reconnectFunc reconnectFunc + // config fields + batchItemLimit int + batchResponseMaxSize int + // writeConn is used for writing to the connection on the caller's goroutine. It should // only be accessed outside of dispatch, with the write lock held. The write lock is // taken by sending on reqInit and released by sending on reqSent. @@ -103,9 +107,6 @@ type Client struct { reqInit chan *requestOp // register response IDs, takes write lock reqSent chan error // signals write completion, releases write lock reqTimeout chan *requestOp // removes response IDs when call timeout expires - - batchRequestLimit int - batchResponseMaxSize int } type reconnectFunc func(context.Context) (ServerCodec, error) @@ -121,27 +122,10 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn { ctx := context.Background() ctx = context.WithValue(ctx, clientContextKey{}, c) ctx = context.WithValue(ctx, peerInfoContextKey{}, conn.peerInfo()) - if c.batchRequestLimit == 0 { - c.batchRequestLimit = BatchRequestLimit - } - if c.batchResponseMaxSize == 0 { - c.batchResponseMaxSize = BatchResponseMaxSize - } - handler := newHandler(ctx, conn, c.idgen, c.services, c.batchRequestLimit, c.batchResponseMaxSize) + handler := newHandler(ctx, conn, c.idgen, c.services, c.batchItemLimit, c.batchResponseMaxSize) return &clientConn{conn, handler} } -// SetBatchLimits set maximum number of requests in a batch and maximum number of bytes returned from calls -// And update non-http connection with limit -func (c *Client) SetBatchLimits(limit, size int) { - c.batchRequestLimit = limit - c.batchResponseMaxSize = size - select { - case c.reconnected <- c.writeConn.(ServerCodec): - default: - } -} - func (cc *clientConn) close(err error, inflightReq *requestOp) { cc.handler.close(err, inflightReq) cc.codec.close() @@ -235,7 +219,7 @@ func DialOptions(ctx context.Context, rawurl string, options ...ClientOption) (* return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme) } - return newClient(ctx, reconnect) + return newClient(ctx, cfg, reconnect) } // ClientFromContext retrieves the client from the context, if any. This can be used to perform @@ -245,22 +229,24 @@ func ClientFromContext(ctx context.Context) (*Client, bool) { return client, ok } -func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) { +func newClient(initctx context.Context, cfg *clientConfig, connect reconnectFunc) (*Client, error) { conn, err := connect(initctx) if err != nil { return nil, err } - c := initClient(conn, randomIDGenerator(), new(serviceRegistry)) + c := initClient(conn, new(serviceRegistry), cfg) c.reconnectFunc = connect return c, nil } -func initClientWithBatchLimits(conn ServerCodec, idgen func() ID, services *serviceRegistry, limit, size int) *Client { +func initClient(conn ServerCodec, services *serviceRegistry, cfg *clientConfig) *Client { _, isHTTP := conn.(*httpConn) c := &Client{ isHTTP: isHTTP, - idgen: idgen, services: services, + idgen: cfg.idgen, + batchItemLimit: cfg.batchItemLimit, + batchResponseMaxSize: cfg.batchResponseLimit, writeConn: conn, close: make(chan struct{}), closing: make(chan struct{}), @@ -271,19 +257,26 @@ func initClientWithBatchLimits(conn ServerCodec, idgen func() ID, services *serv reqInit: make(chan *requestOp), reqSent: make(chan error, 1), reqTimeout: make(chan *requestOp), - batchRequestLimit: limit, - batchResponseMaxSize: size, } + + // Set defaults. + if c.idgen == nil { + c.idgen = randomIDGenerator() + } + if c.batchItemLimit == 0 { + c.batchItemLimit = DefaultBatchRequestLimit + } + if c.batchResponseMaxSize == 0 { + c.batchResponseMaxSize = DefaultBatchResponseMaxSize + } + + // Launch the main loop. if !isHTTP { go c.dispatch(conn) } return c } -func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client { - return initClientWithBatchLimits(conn, idgen, services, 0, 0) -} - // RegisterName creates a service for the given receiver type under the given name. When no // methods on the given receiver match the criteria to be either a RPC method or a // subscription an error is returned. Otherwise a new service is created and added to the diff --git a/rpc/client_opt.go b/rpc/client_opt.go index 5ad7c22b3ce7..5bef08cca841 100644 --- a/rpc/client_opt.go +++ b/rpc/client_opt.go @@ -28,11 +28,18 @@ type ClientOption interface { } type clientConfig struct { + // HTTP settings httpClient *http.Client httpHeaders http.Header httpAuth HTTPAuth + // WebSocket options wsDialer *websocket.Dialer + + // RPC handler options + idgen func() ID + batchItemLimit int + batchResponseLimit int } func (cfg *clientConfig) initHeaders() { @@ -104,3 +111,25 @@ func WithHTTPAuth(a HTTPAuth) ClientOption { // Usually, HTTPAuth functions will call h.Set("authorization", "...") to add // auth information to the request. type HTTPAuth func(h http.Header) error + +// WithBatchItemLimit changes the maximum number of items allowed in batch requests. +// +// Note: this option applies when processing incoming batch requests. It does not affect +// batch requests sent by the client. +func WithBatchItemLimit(limit int) ClientOption { + return optionFunc(func(cfg *clientConfig) { + cfg.batchItemLimit = limit + }) +} + +// WithBatchResponseSizeLimit changes the maximum number of response bytes that can be +// generated for batch requests. When this limit is reached, further calls in the batch +// will not be processed. +// +// Note: this option applies when processing incoming batch requests. It does not affect +// batch requests sent by the client. +func WithBatchResponseSizeLimit(sizeLimit int) ClientOption { + return optionFunc(func(cfg *clientConfig) { + cfg.batchResponseLimit = sizeLimit + }) +} diff --git a/rpc/client_test.go b/rpc/client_test.go index 0a88ce40b2a8..23b9c0f3f0c6 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -467,7 +467,8 @@ func TestClientSubscriptionUnsubscribeServer(t *testing.T) { defer srv.Stop() // Create the client on the other end of the pipe. - client, _ := newClient(context.Background(), func(context.Context) (ServerCodec, error) { + cfg := new(clientConfig) + client, _ := newClient(context.Background(), cfg, func(context.Context) (ServerCodec, error) { return NewCodec(p2), nil }) defer client.Close() diff --git a/rpc/http.go b/rpc/http.go index 8712f99610b5..723417f752c7 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -139,7 +139,7 @@ func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) { var cfg clientConfig cfg.httpClient = client fn := newClientTransportHTTP(endpoint, &cfg) - return newClient(context.Background(), fn) + return newClient(context.Background(), &cfg, fn) } func newClientTransportHTTP(endpoint string, cfg *clientConfig) reconnectFunc { diff --git a/rpc/inproc.go b/rpc/inproc.go index fbe9a40ceca9..306974e04b81 100644 --- a/rpc/inproc.go +++ b/rpc/inproc.go @@ -24,7 +24,8 @@ import ( // DialInProc attaches an in-process connection to the given RPC server. func DialInProc(handler *Server) *Client { initctx := context.Background() - c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) { + cfg := new(clientConfig) + c, _ := newClient(initctx, cfg, func(context.Context) (ServerCodec, error) { p1, p2 := net.Pipe() go handler.ServeCodec(NewCodec(p1), 0) return NewCodec(p2), nil diff --git a/rpc/ipc.go b/rpc/ipc.go index d9e0de62e877..a08245b27089 100644 --- a/rpc/ipc.go +++ b/rpc/ipc.go @@ -46,7 +46,8 @@ func (s *Server) ServeListener(l net.Listener) error { // The context is used for the initial connection establishment. It does not // affect subsequent interactions with the client. func DialIPC(ctx context.Context, endpoint string) (*Client, error) { - return newClient(ctx, newClientTransportIPC(endpoint)) + cfg := new(clientConfig) + return newClient(ctx, cfg, newClientTransportIPC(endpoint)) } func newClientTransportIPC(endpoint string) reconnectFunc { diff --git a/rpc/server.go b/rpc/server.go index e0835ff9570c..95642ebc8218 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -46,11 +46,11 @@ type Server struct { services serviceRegistry idgen func() ID - mutex sync.Mutex - codecs map[ServerCodec]struct{} - run int32 - batchRequestLimit int - batchResponseMaxSize int + mutex sync.Mutex + codecs map[ServerCodec]struct{} + run int32 + batchItemLimit int + batchResponseLimit int } // NewServer creates a new server instance with no registered handlers. @@ -67,10 +67,15 @@ func NewServer() *Server { return server } -// SetBatchLimits set maximum number of requests in a batch and maximum number of bytes returned from calls -func (s *Server) SetBatchLimits(limit, size int) { - s.batchRequestLimit = limit - s.batchResponseMaxSize = size +// SetBatchLimits sets limits applied to batch requests. There are two limits: 'itemLimit' +// is the maximum number of items in a batch. 'maxResponseSize' is the maximum number of +// response bytes across all requests in a batch. +// +// This method should be called before processing any requests via ServeCodec, ServeHTTP, +// ServeListener etc. +func (s *Server) SetBatchLimits(itemLimit, maxResponseSize int) { + s.batchItemLimit = itemLimit + s.batchResponseLimit = maxResponseSize } // RegisterName creates a service for the given receiver type under the given name. When no @@ -94,7 +99,12 @@ func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { } defer s.untrackCodec(codec) - c := initClientWithBatchLimits(codec, s.idgen, &s.services, s.batchRequestLimit, s.batchResponseMaxSize) + cfg := &clientConfig{ + idgen: s.idgen, + batchItemLimit: s.batchItemLimit, + batchResponseLimit: s.batchResponseLimit, + } + c := initClient(codec, &s.services, cfg) <-codec.closed() c.Close() } @@ -126,7 +136,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { return } - h := newHandler(ctx, codec, s.idgen, &s.services, s.batchRequestLimit, s.batchResponseMaxSize) + h := newHandler(ctx, codec, s.idgen, &s.services, s.batchItemLimit, s.batchResponseLimit) h.allowSubscribe = false defer h.close(io.EOF, nil) diff --git a/rpc/stdio.go b/rpc/stdio.go index ae32db26ef1c..084e5f0700ce 100644 --- a/rpc/stdio.go +++ b/rpc/stdio.go @@ -32,7 +32,8 @@ func DialStdIO(ctx context.Context) (*Client, error) { // DialIO creates a client which uses the given IO channels func DialIO(ctx context.Context, in io.Reader, out io.Writer) (*Client, error) { - return newClient(ctx, newClientTransportIO(in, out)) + cfg := new(clientConfig) + return newClient(ctx, cfg, newClientTransportIO(in, out)) } func newClientTransportIO(in io.Reader, out io.Writer) reconnectFunc { diff --git a/rpc/websocket.go b/rpc/websocket.go index 0ac2a2792d5a..61b9bf5e38b4 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -197,7 +197,7 @@ func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, diale if err != nil { return nil, err } - return newClient(ctx, connect) + return newClient(ctx, cfg, connect) } // DialWebsocket creates a new RPC client that communicates with a JSON-RPC server @@ -214,7 +214,7 @@ func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error if err != nil { return nil, err } - return newClient(ctx, connect) + return newClient(ctx, cfg, connect) } func newClientTransportWS(endpoint string, cfg *clientConfig) (reconnectFunc, error) { From bae5a2f45872caa7e9b7f31a3be31d7698b00481 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 17 Feb 2023 18:53:16 +0100 Subject: [PATCH 12/27] node: refactor passing around rpc config Passing options in function parameters makes it hard to add more options later. Pass them in a struct instead. --- node/api.go | 12 ++++++++++-- node/node.go | 34 ++++++++++++++++++++++------------ node/rpcstack.go | 24 +++++++++++++++--------- node/rpcstack_test.go | 10 ++++++---- 4 files changed, 53 insertions(+), 27 deletions(-) diff --git a/node/api.go b/node/api.go index dd6a208b0734..f81f394beb24 100644 --- a/node/api.go +++ b/node/api.go @@ -176,6 +176,10 @@ func (api *adminAPI) StartHTTP(host *string, port *int, cors *string, apis *stri CorsAllowedOrigins: api.node.config.HTTPCors, Vhosts: api.node.config.HTTPVirtualHosts, Modules: api.node.config.HTTPModules, + rpcEndpointConfig: rpcEndpointConfig{ + batchItemLimit: api.node.config.BatchRequestLimit, + batchResponseSizeLimit: api.node.config.BatchResponseMaxSize, + }, } if cors != nil { config.CorsAllowedOrigins = nil @@ -199,7 +203,7 @@ func (api *adminAPI) StartHTTP(host *string, port *int, cors *string, apis *stri if err := api.node.http.setListenAddr(*host, *port); err != nil { return false, err } - if err := api.node.http.enableRPC(api.node.rpcAPIs, config, api.node.config.BatchRequestLimit, api.node.config.BatchResponseMaxSize); err != nil { + if err := api.node.http.enableRPC(api.node.rpcAPIs, config); err != nil { return false, err } if err := api.node.http.start(); err != nil { @@ -250,6 +254,10 @@ func (api *adminAPI) StartWS(host *string, port *int, allowedOrigins *string, ap Modules: api.node.config.WSModules, Origins: api.node.config.WSOrigins, // ExposeAll: api.node.config.WSExposeAll, + rpcEndpointConfig: rpcEndpointConfig{ + batchItemLimit: api.node.config.BatchRequestLimit, + batchResponseSizeLimit: api.node.config.BatchResponseMaxSize, + }, } if apis != nil { config.Modules = nil @@ -270,7 +278,7 @@ func (api *adminAPI) StartWS(host *string, port *int, allowedOrigins *string, ap return false, err } openApis, _ := api.node.getAPIs() - if err := server.enableWS(openApis, config, api.node.config.BatchRequestLimit, api.node.config.BatchResponseMaxSize); err != nil { + if err := server.enableWS(openApis, config); err != nil { return false, err } if err := server.start(); err != nil { diff --git a/node/node.go b/node/node.go index 37086d293309..8e4c6e5df76e 100644 --- a/node/node.go +++ b/node/node.go @@ -404,6 +404,11 @@ func (n *Node) startRPC() error { openAPIs, allAPIs = n.getAPIs() ) + rpcConfig := rpcEndpointConfig{ + batchItemLimit: n.config.BatchRequestLimit, + batchResponseSizeLimit: n.config.BatchResponseMaxSize, + } + initHttp := func(server *httpServer, port int) error { if err := server.setListenAddr(n.config.HTTPHost, port); err != nil { return err @@ -413,7 +418,8 @@ func (n *Node) startRPC() error { Vhosts: n.config.HTTPVirtualHosts, Modules: n.config.HTTPModules, prefix: n.config.HTTPPathPrefix, - }, n.config.BatchRequestLimit, n.config.BatchResponseMaxSize); err != nil { + rpcEndpointConfig: rpcConfig, + }); err != nil { return err } servers = append(servers, server) @@ -426,10 +432,11 @@ func (n *Node) startRPC() error { return err } if err := server.enableWS(openAPIs, wsConfig{ - Modules: n.config.WSModules, - Origins: n.config.WSOrigins, - prefix: n.config.WSPathPrefix, - }, n.config.BatchRequestLimit, n.config.BatchResponseMaxSize); err != nil { + Modules: n.config.WSModules, + Origins: n.config.WSOrigins, + prefix: n.config.WSPathPrefix, + rpcEndpointConfig: rpcConfig, + }); err != nil { return err } servers = append(servers, server) @@ -442,27 +449,30 @@ func (n *Node) startRPC() error { if err := server.setListenAddr(n.config.AuthAddr, port); err != nil { return err } + sharedConfig := rpcConfig + sharedConfig.jwtSecret = secret if err := server.enableRPC(allAPIs, httpConfig{ CorsAllowedOrigins: DefaultAuthCors, Vhosts: n.config.AuthVirtualHosts, Modules: DefaultAuthModules, prefix: DefaultAuthPrefix, - jwtSecret: secret, - }, n.config.BatchRequestLimit, n.config.BatchResponseMaxSize); err != nil { + rpcEndpointConfig: sharedConfig, + }); err != nil { return err } servers = append(servers, server) + // Enable auth via WS server = n.wsServerForPort(port, true) if err := server.setListenAddr(n.config.AuthAddr, port); err != nil { return err } if err := server.enableWS(allAPIs, wsConfig{ - Modules: DefaultAuthModules, - Origins: DefaultAuthOrigins, - prefix: DefaultAuthPrefix, - jwtSecret: secret, - }, n.config.BatchRequestLimit, n.config.BatchResponseMaxSize); err != nil { + Modules: DefaultAuthModules, + Origins: DefaultAuthOrigins, + prefix: DefaultAuthPrefix, + rpcEndpointConfig: sharedConfig, + }); err != nil { return err } servers = append(servers, server) diff --git a/node/rpcstack.go b/node/rpcstack.go index bc65d65b43d7..e91585a2b630 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -41,15 +41,21 @@ type httpConfig struct { CorsAllowedOrigins []string Vhosts []string prefix string // path prefix on which to mount http handler - jwtSecret []byte // optional JWT secret + rpcEndpointConfig } // wsConfig is the JSON-RPC/Websocket configuration type wsConfig struct { - Origins []string - Modules []string - prefix string // path prefix on which to mount ws handler - jwtSecret []byte // optional JWT secret + Origins []string + Modules []string + prefix string // path prefix on which to mount ws handler + rpcEndpointConfig +} + +type rpcEndpointConfig struct { + jwtSecret []byte // optional JWT secret + batchItemLimit int + batchResponseSizeLimit int } type rpcHandler struct { @@ -287,7 +293,7 @@ func (h *httpServer) doStop() { } // enableRPC turns on JSON-RPC over HTTP on the server. -func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig, batchRequestLimit, batchResponseMaxSize int) error { +func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error { h.mu.Lock() defer h.mu.Unlock() @@ -297,7 +303,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig, batchRequestLi // Create RPC server and handler. srv := rpc.NewServer() - srv.SetBatchLimits(batchRequestLimit, batchResponseMaxSize) + srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit) if err := RegisterApis(apis, config.Modules, srv); err != nil { return err } @@ -320,7 +326,7 @@ func (h *httpServer) disableRPC() bool { } // enableWS turns on JSON-RPC over WebSocket on the server. -func (h *httpServer) enableWS(apis []rpc.API, config wsConfig, batchRequestLimit, batchResponseMaxSize int) error { +func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error { h.mu.Lock() defer h.mu.Unlock() @@ -329,7 +335,7 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig, batchRequestLimit } // Create RPC server and handler. srv := rpc.NewServer() - srv.SetBatchLimits(batchRequestLimit, batchResponseMaxSize) + srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit) if err := RegisterApis(apis, config.Modules, srv); err != nil { return err } diff --git a/node/rpcstack_test.go b/node/rpcstack_test.go index f6fd14cd8f42..600db4333030 100644 --- a/node/rpcstack_test.go +++ b/node/rpcstack_test.go @@ -242,9 +242,9 @@ func createAndStartServer(t *testing.T, conf *httpConfig, ws bool, wsConf *wsCon timeouts = &rpc.DefaultHTTPTimeouts } srv := newHTTPServer(testlog.Logger(t, log.LvlDebug), *timeouts) - assert.NoError(t, srv.enableRPC(apis(), *conf, 0, 0)) + assert.NoError(t, srv.enableRPC(apis(), *conf)) if ws { - assert.NoError(t, srv.enableWS(nil, *wsConf, 0, 0)) + assert.NoError(t, srv.enableWS(nil, *wsConf)) } assert.NoError(t, srv.setListenAddr("localhost", 0)) assert.NoError(t, srv.start()) @@ -338,8 +338,10 @@ func TestJWT(t *testing.T) { ss, _ := jwt.NewWithClaims(method, testClaim(input)).SignedString(secret) return ss } - srv := createAndStartServer(t, &httpConfig{jwtSecret: []byte("secret")}, - true, &wsConfig{Origins: []string{"*"}, jwtSecret: []byte("secret")}, nil) + cfg := rpcEndpointConfig{jwtSecret: []byte("secret")} + httpcfg := &httpConfig{rpcEndpointConfig: cfg} + wscfg := &wsConfig{Origins: []string{"*"}, rpcEndpointConfig: cfg} + srv := createAndStartServer(t, httpcfg, true, wscfg, nil) wsUrl := fmt.Sprintf("ws://%v", srv.listenAddr()) htUrl := fmt.Sprintf("http://%v", srv.listenAddr()) From 6c6b8b1b8a28f546d51720113316576c453b56d3 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 17 Feb 2023 21:01:12 +0100 Subject: [PATCH 13/27] rpc: increase default batch limits --- rpc/client.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 6dcb5d97f957..e9cca8379e83 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -40,14 +40,16 @@ var ( errDead = errors.New("connection lost") ) +// Timeouts const ( - // Timeouts defaultDialTimeout = 10 * time.Second // used if context has no deadline subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls +) - // Batch limits - DefaultBatchRequestLimit = 100 // Maximum number of requests in a batch - DefaultBatchResponseMaxSize = 10 * 1000 * 1000 // Maximum number of bytes returned from calls (10MB) +// Batch limits +const ( + DefaultBatchRequestLimit = 1000 // Maximum number of items in a batch. + DefaultBatchResponseMaxSize = 25 * 1000 * 1000 // Maximum number of bytes returned from calls. ) const ( From cebe2263e16b19d69bebdc84cf125358988a5249 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 17 Feb 2023 21:12:21 +0100 Subject: [PATCH 14/27] rpc: simplify sending error response --- rpc/handler.go | 30 ++++++++---------------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/rpc/handler.go b/rpc/handler.go index 1bd6f656f8d9..11017fefd7ac 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -138,31 +138,15 @@ func (b *batchCallBuffer) write(ctx context.Context, conn jsonWriter) { b.doWrite(ctx, conn, false) } -// timeout sends the responses added so far. For the remaining unanswered call -// messages, it sends a timeout error response. -func (b *batchCallBuffer) timeout(ctx context.Context, conn jsonWriter) { +// respondWithError sends the responses added so far. For the remaining unanswered call +// messages, it responds with the given error. +func (b *batchCallBuffer) respondWithError(ctx context.Context, conn jsonWriter, err error) { b.mutex.Lock() defer b.mutex.Unlock() for _, msg := range b.calls { if !msg.isNotification() { - resp := msg.errorResponse(&internalServerError{errcodeTimeout, errMsgTimeout}) - b.resp = append(b.resp, resp) - } - } - b.doWrite(ctx, conn, true) -} - -// responseTooLarge sends the responses added so far. For the remaining unanswered call -// messages, it sends a response too large error response. -func (b *batchCallBuffer) responseTooLarge(ctx context.Context, conn jsonWriter) { - b.mutex.Lock() - defer b.mutex.Unlock() - - for _, msg := range b.calls { - if !msg.isNotification() { - resp := msg.errorResponse(&internalServerError{errcodeResponseTooLarge, errMsgResponseTooLarge}) - b.resp = append(b.resp, resp) + b.resp = append(b.resp, msg.errorResponse(err)) } } b.doWrite(ctx, conn, true) @@ -226,7 +210,8 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { if timeout, ok := ContextRequestTimeout(cp.ctx); ok { timer = time.AfterFunc(timeout, func() { cancel() - callBuffer.timeout(cp.ctx, h.conn) + err := &internalServerError{errcodeTimeout, errMsgTimeout} + callBuffer.respondWithError(cp.ctx, h.conn, err) }) } @@ -244,7 +229,8 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { callBuffer.pushResponse(resp) if resp != nil && h.batchResponseMaxSize != 0 { if resBytes += len(resp.Result); resBytes > h.batchResponseMaxSize { - callBuffer.responseTooLarge(cp.ctx, h.conn) + err := &internalServerError{errcodeResponseTooLarge, errMsgResponseTooLarge} + callBuffer.respondWithError(cp.ctx, h.conn, err) break } } From 333dffb15b3de0c470e1e6872d964f7c2a15a2d5 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 17 Feb 2023 23:51:55 +0100 Subject: [PATCH 15/27] rpc: rename variable --- rpc/handler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rpc/handler.go b/rpc/handler.go index 11017fefd7ac..54c24328864b 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -215,7 +215,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { }) } - resBytes := 0 + responseBytes := 0 for { // No need to handle rest of calls if timed out. if cp.ctx.Err() != nil { @@ -228,7 +228,8 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { resp := h.handleCallMsg(cp, msg) callBuffer.pushResponse(resp) if resp != nil && h.batchResponseMaxSize != 0 { - if resBytes += len(resp.Result); resBytes > h.batchResponseMaxSize { + responseBytes += len(resp.Result) + if responseBytes > h.batchResponseMaxSize { err := &internalServerError{errcodeResponseTooLarge, errMsgResponseTooLarge} callBuffer.respondWithError(cp.ctx, h.conn, err) break From b91f08a0c24affb0b74630acf81444e77e1d55a2 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Sat, 18 Feb 2023 00:02:40 +0100 Subject: [PATCH 16/27] rpc: add test for batch size limit --- rpc/server_test.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/rpc/server_test.go b/rpc/server_test.go index c9abe53e5210..7d5024606211 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -152,3 +152,38 @@ func TestServerShortLivedConn(t *testing.T) { } } } + +func TestServerBatchResponseSizeLimit(t *testing.T) { + server := newTestServer() + server.SetBatchLimits(100, 60) + defer server.Stop() + + client := DialInProc(server) + batch := make([]BatchElem, 3) + for i := range batch { + batch[i].Method = "test_echo" + batch[i].Result = new(echoResult) + batch[i].Args = []any{"x", 1} + } + err := client.BatchCall(batch) + + if err != nil { + t.Fatal("error sending batch:", err) + } + for i := range batch { + if i < 2 { + if batch[i].Error != nil { + t.Errorf("batch elem %d has unexpected error: %v", i, batch[i].Error) + } + } else { + re, ok := batch[i].Error.(Error) + if !ok { + t.Errorf("batch elem %d has wrong error: %v", i, batch[i].Error) + continue + } + if re.ErrorCode() != errcodeResponseTooLarge { + t.Errorf("batch elem %d has wrong error code %d", i, re.ErrorCode()) + } + } + } +} From fdf1b2072f8d7d5bf21b6c5c246e68b22d0c7600 Mon Sep 17 00:00:00 2001 From: mmsqe Date: Sat, 18 Feb 2023 08:45:00 +0800 Subject: [PATCH 17/27] handle msg id for batch too large --- rpc/client.go | 3 +++ rpc/errors.go | 1 + rpc/handler.go | 17 +++++++++-------- rpc/server_test.go | 19 +++++++++++++++++++ 4 files changed, 32 insertions(+), 8 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index e9cca8379e83..89b6abe64820 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -433,6 +433,9 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { // only sends valid IDs to our channel. elem := &b[byID[string(resp.ID)]] if resp.Error != nil { + if resp.Error.Message == errMsgBatchTooLarge { + return resp.Error + } elem.Error = resp.Error continue } diff --git a/rpc/errors.go b/rpc/errors.go index 36d2c82a73c7..abb698af75c1 100644 --- a/rpc/errors.go +++ b/rpc/errors.go @@ -69,6 +69,7 @@ const ( const ( errMsgTimeout = "request timed out" errMsgResponseTooLarge = "response too large" + errMsgBatchTooLarge = "batch too large" ) type methodNotFoundError struct{ method string } diff --git a/rpc/handler.go b/rpc/handler.go index 54c24328864b..2e11326c13d1 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -175,14 +175,6 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { return } - if len(msgs) > h.batchRequestLimit && h.batchRequestLimit != 0 { - h.startCallProc(func(cp *callProc) { - resp := errorMessage(&invalidRequestError{"batch too large"}) - h.conn.writeJSON(cp.ctx, resp, true) - }) - return - } - // Handle non-call messages first: calls := make([]*jsonrpcMessage, 0, len(msgs)) for _, msg := range msgs { @@ -193,6 +185,15 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { if len(calls) == 0 { return } + + if len(calls) > h.batchRequestLimit && h.batchRequestLimit != 0 { + h.startCallProc(func(cp *callProc) { + resp := calls[0].errorResponse(&invalidRequestError{errMsgBatchTooLarge}) + h.conn.writeJSON(cp.ctx, resp, true) + }) + return + } + // Process calls on a goroutine because they may block indefinitely: h.startCallProc(func(cp *callProc) { var ( diff --git a/rpc/server_test.go b/rpc/server_test.go index 7d5024606211..cafb13ec12e2 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -187,3 +187,22 @@ func TestServerBatchResponseSizeLimit(t *testing.T) { } } } + +func TestServerBatchRequestLimit(t *testing.T) { + server := newTestServer() + server.SetBatchLimits(2, 100000) + defer server.Stop() + + client := DialInProc(server) + batch := make([]BatchElem, 3) + for i := range batch { + batch[i].Method = "test_echo" + batch[i].Result = new(echoResult) + batch[i].Args = []any{"x", 1} + } + err := client.BatchCall(batch) + + if err.Error() != errMsgBatchTooLarge { + t.Errorf("error mismatch: %v", err) + } +} From 127079b8d3b2fd5d98a691b8e074f2862b9d13fb Mon Sep 17 00:00:00 2001 From: mmsqe Date: Fri, 3 Mar 2023 09:31:20 +0800 Subject: [PATCH 18/27] test batch request limit for non-call --- rpc/client.go | 19 +++++++++++++++++++ rpc/handler.go | 24 +++++++++++++----------- rpc/server_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 11 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 89b6abe64820..e9a1d724939e 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -463,6 +463,25 @@ func (c *Client) Notify(ctx context.Context, method string, args ...interface{}) return c.send(ctx, op, msg) } +// BatchNotify sends batched notifications, i.e. method calls that don't expect response. +func (c *Client) BatchNotify(ctx context.Context, method string, args [][]interface{}) error { + op := new(requestOp) + msgs := make([]*jsonrpcMessage, 0, len(args)) + for _, args := range args { + msg, err := c.newMessage(method, args...) + if err != nil { + return err + } + msg.ID = nil + msgs = append(msgs, msg) + } + + if c.isHTTP { + return c.sendHTTP(ctx, op, msgs) + } + return c.send(ctx, op, msgs) +} + // EthSubscribe registers a subscription under the "eth" namespace. func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { return c.Subscribe(ctx, "eth", channel, args...) diff --git a/rpc/handler.go b/rpc/handler.go index 2e11326c13d1..33dcbc2c3f21 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -19,13 +19,14 @@ package rpc import ( "context" "encoding/json" + "log" "reflect" "strconv" "strings" "sync" "time" - "github.com/ethereum/go-ethereum/log" + logger "github.com/ethereum/go-ethereum/log" ) // handler handles JSON-RPC messages. There is one handler per connection. Note that @@ -58,7 +59,7 @@ type handler struct { rootCtx context.Context // canceled by close() cancelRoot func() // cancel function for rootCtx conn jsonWriter // where responses will be sent - log log.Logger + log logger.Logger allowSubscribe bool batchRequestLimit int batchResponseMaxSize int @@ -84,7 +85,7 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg * cancelRoot: cancelRoot, allowSubscribe: true, serverSubs: make(map[ID]*Subscription), - log: log.Root(), + log: logger.Root(), batchRequestLimit: batchRequestLimit, batchResponseMaxSize: batchResponseMaxSize, } @@ -175,6 +176,15 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { return } + if len(msgs) > h.batchRequestLimit && h.batchRequestLimit != 0 { + h.startCallProc(func(cp *callProc) { + resp := msgs[0].errorResponse(&invalidRequestError{errMsgBatchTooLarge}) + h.conn.writeJSON(cp.ctx, resp, true) + }) + log.Println("return for batch too large") + return + } + // Handle non-call messages first: calls := make([]*jsonrpcMessage, 0, len(msgs)) for _, msg := range msgs { @@ -186,14 +196,6 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { return } - if len(calls) > h.batchRequestLimit && h.batchRequestLimit != 0 { - h.startCallProc(func(cp *callProc) { - resp := calls[0].errorResponse(&invalidRequestError{errMsgBatchTooLarge}) - h.conn.writeJSON(cp.ctx, resp, true) - }) - return - } - // Process calls on a goroutine because they may block indefinitely: h.startCallProc(func(cp *callProc) { var ( diff --git a/rpc/server_test.go b/rpc/server_test.go index cafb13ec12e2..c7a7e9f93e2c 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -19,7 +19,10 @@ package rpc import ( "bufio" "bytes" + "context" + "fmt" "io" + "log" "net" "os" "path/filepath" @@ -206,3 +209,45 @@ func TestServerBatchRequestLimit(t *testing.T) { t.Errorf("error mismatch: %v", err) } } + +func mockLogger(t *testing.T) (*bufio.Scanner, *os.File, *os.File) { + // rm time prefix in test + log.SetFlags(log.Flags() &^ (log.Ldate | log.Ltime)) + reader, writer, err := os.Pipe() + if err != nil { + t.Fatalf("couldn't get os Pipe: %v", err) + } + log.SetOutput(writer) + return bufio.NewScanner(reader), reader, writer +} + +func resetLogger(reader *os.File, writer *os.File) { + err := reader.Close() + if err != nil { + fmt.Println("error closing reader was ", err) + } + if err = writer.Close(); err != nil { + fmt.Println("error closing writer was ", err) + } + log.SetOutput(os.Stderr) +} + +func TestServerBatchRequestLimitForNonCall(t *testing.T) { + scanner, reader, writer := mockLogger(t) + defer resetLogger(reader, writer) + + server := newTestServer() + server.SetBatchLimits(2, 100000) + defer server.Stop() + + client := DialInProc(server) + args := make([][]interface{}, 3) + for i := range args { + args[i] = make([]interface{}, 0) + } + client.BatchNotify(context.Background(), "ex_subscription", args) + scanner.Scan() + if scanner.Text() != "return for batch too large" { + t.Error("missing return") + } +} From e82658a42d1184754627fde781bfe17d57a22c01 Mon Sep 17 00:00:00 2001 From: mmsqe Date: Mon, 6 Mar 2023 22:34:24 +0800 Subject: [PATCH 19/27] rm non-call test --- rpc/client.go | 19 ------------------- rpc/handler.go | 8 +++----- rpc/server_test.go | 45 --------------------------------------------- 3 files changed, 3 insertions(+), 69 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index e9a1d724939e..89b6abe64820 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -463,25 +463,6 @@ func (c *Client) Notify(ctx context.Context, method string, args ...interface{}) return c.send(ctx, op, msg) } -// BatchNotify sends batched notifications, i.e. method calls that don't expect response. -func (c *Client) BatchNotify(ctx context.Context, method string, args [][]interface{}) error { - op := new(requestOp) - msgs := make([]*jsonrpcMessage, 0, len(args)) - for _, args := range args { - msg, err := c.newMessage(method, args...) - if err != nil { - return err - } - msg.ID = nil - msgs = append(msgs, msg) - } - - if c.isHTTP { - return c.sendHTTP(ctx, op, msgs) - } - return c.send(ctx, op, msgs) -} - // EthSubscribe registers a subscription under the "eth" namespace. func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { return c.Subscribe(ctx, "eth", channel, args...) diff --git a/rpc/handler.go b/rpc/handler.go index 33dcbc2c3f21..1ebb113d5931 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -19,14 +19,13 @@ package rpc import ( "context" "encoding/json" - "log" "reflect" "strconv" "strings" "sync" "time" - logger "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/log" ) // handler handles JSON-RPC messages. There is one handler per connection. Note that @@ -59,7 +58,7 @@ type handler struct { rootCtx context.Context // canceled by close() cancelRoot func() // cancel function for rootCtx conn jsonWriter // where responses will be sent - log logger.Logger + log log.Logger allowSubscribe bool batchRequestLimit int batchResponseMaxSize int @@ -85,7 +84,7 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg * cancelRoot: cancelRoot, allowSubscribe: true, serverSubs: make(map[ID]*Subscription), - log: logger.Root(), + log: log.Root(), batchRequestLimit: batchRequestLimit, batchResponseMaxSize: batchResponseMaxSize, } @@ -181,7 +180,6 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { resp := msgs[0].errorResponse(&invalidRequestError{errMsgBatchTooLarge}) h.conn.writeJSON(cp.ctx, resp, true) }) - log.Println("return for batch too large") return } diff --git a/rpc/server_test.go b/rpc/server_test.go index c7a7e9f93e2c..cafb13ec12e2 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -19,10 +19,7 @@ package rpc import ( "bufio" "bytes" - "context" - "fmt" "io" - "log" "net" "os" "path/filepath" @@ -209,45 +206,3 @@ func TestServerBatchRequestLimit(t *testing.T) { t.Errorf("error mismatch: %v", err) } } - -func mockLogger(t *testing.T) (*bufio.Scanner, *os.File, *os.File) { - // rm time prefix in test - log.SetFlags(log.Flags() &^ (log.Ldate | log.Ltime)) - reader, writer, err := os.Pipe() - if err != nil { - t.Fatalf("couldn't get os Pipe: %v", err) - } - log.SetOutput(writer) - return bufio.NewScanner(reader), reader, writer -} - -func resetLogger(reader *os.File, writer *os.File) { - err := reader.Close() - if err != nil { - fmt.Println("error closing reader was ", err) - } - if err = writer.Close(); err != nil { - fmt.Println("error closing writer was ", err) - } - log.SetOutput(os.Stderr) -} - -func TestServerBatchRequestLimitForNonCall(t *testing.T) { - scanner, reader, writer := mockLogger(t) - defer resetLogger(reader, writer) - - server := newTestServer() - server.SetBatchLimits(2, 100000) - defer server.Stop() - - client := DialInProc(server) - args := make([][]interface{}, 3) - for i := range args { - args[i] = make([]interface{}, 0) - } - client.BatchNotify(context.Background(), "ex_subscription", args) - scanner.Scan() - if scanner.Text() != "return for batch too large" { - t.Error("missing return") - } -} From 47557d11f33bf7965b7f2c18368866bd74f492e4 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Wed, 31 May 2023 09:38:31 +0200 Subject: [PATCH 20/27] cmd/utils: fix docs on flags --- cmd/utils/flags.go | 2 +- node/config.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 430d385dadce..286b299eda24 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -726,7 +726,7 @@ var ( } BatchResponseMaxSize = &cli.IntFlag{ Name: "rpc.batch-response-max-size", - Usage: "Maximum number of bytes returned from calls (10MB)", + Usage: "Maximum number of bytes returned from a batched call", Value: rpc.DefaultBatchResponseMaxSize, Category: flags.APICategory, } diff --git a/node/config.go b/node/config.go index c770bcd1c339..37c1e4882b84 100644 --- a/node/config.go +++ b/node/config.go @@ -200,7 +200,7 @@ type Config struct { // BatchRequestLimit is the maximum number of requests in a batch. BatchRequestLimit int `toml:",omitempty"` - // BatchResponseMaxSize is the maximum number of bytes returned from calls (10MB). + // BatchResponseMaxSize is the maximum number of bytes returned from a batched rpc call. BatchResponseMaxSize int `toml:",omitempty"` // JWTSecret is the path to the hex-encoded jwt secret. From 8e6018f5009217289997aac2c4529aed464c300e Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Wed, 31 May 2023 10:02:02 +0200 Subject: [PATCH 21/27] rpc: minor refactor of tests --- rpc/server_test.go | 60 +++++++++++++++++++++------------------------- 1 file changed, 27 insertions(+), 33 deletions(-) diff --git a/rpc/server_test.go b/rpc/server_test.go index 7ae8dd78c4a6..80bffef02f9a 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -155,54 +155,48 @@ func TestServerShortLivedConn(t *testing.T) { func TestServerBatchResponseSizeLimit(t *testing.T) { server := newTestServer() - server.SetBatchLimits(100, 60) defer server.Stop() - - client := DialInProc(server) - batch := make([]BatchElem, 3) - for i := range batch { - batch[i].Method = "test_echo" - batch[i].Result = new(echoResult) - batch[i].Args = []any{"x", 1} + server.SetBatchLimits(100, 60) + var ( + batch []BatchElem + client = DialInProc(server) + ) + for i := 0; i < 5; i++ { + batch = append(batch, BatchElem{ + Method: "test_echo", + Args: []any{"x", 1}, + Result: new(echoResult), + }) } - err := client.BatchCall(batch) - - if err != nil { + if err := client.BatchCall(batch); err != nil { t.Fatal("error sending batch:", err) } for i := range batch { + // We expect the first two queries to be ok, but after that the + // size limit hits. if i < 2 { if batch[i].Error != nil { - t.Errorf("batch elem %d has unexpected error: %v", i, batch[i].Error) - } - } else { - re, ok := batch[i].Error.(Error) - if !ok { - t.Errorf("batch elem %d has wrong error: %v", i, batch[i].Error) - continue - } - if re.ErrorCode() != errcodeResponseTooLarge { - t.Errorf("batch elem %d has wrong error code %d", i, re.ErrorCode()) + t.Fatalf("batch elem %d has unexpected error: %v", i, batch[i].Error) } + continue + } + // After two, we expect error + re, ok := batch[i].Error.(Error) + if !ok { + t.Fatalf("batch elem %d has wrong error: %v", i, batch[i].Error) + } + if have, want := re.ErrorCode(), errcodeResponseTooLarge; have != want { + t.Errorf("batch elem %d wrong error code, have %d want %d", i, have, want) } } } func TestServerBatchRequestLimit(t *testing.T) { server := newTestServer() - server.SetBatchLimits(2, 100000) defer server.Stop() - + server.SetBatchLimits(2, 100000) client := DialInProc(server) - batch := make([]BatchElem, 3) - for i := range batch { - batch[i].Method = "test_echo" - batch[i].Result = new(echoResult) - batch[i].Args = []any{"x", 1} - } - err := client.BatchCall(batch) - - if err.Error() != errMsgBatchTooLarge { - t.Errorf("error mismatch: %v", err) + if have, want := client.BatchCall(make([]BatchElem, 3)).Error(), errMsgBatchTooLarge; have != want { + t.Errorf("error mismatch, have %v want %v", have, want) } } From acf5730238ca3746659da90f2da76339327b89da Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 8 Jun 2023 13:03:30 +0200 Subject: [PATCH 22/27] rpc: improve client batch response handling This changes how we handle batch responses in order to be able to react to situations where the server does not provide a response for every request batch element. For some weird reason, when writing the original client implementation, I worked off of the assumption that responses could be distributed across batches arbitrarily. So for a batch request containing requests [A, B, C], the server could respond with [A B C] but also with [A B] [C] or even [A] [B] [C] and it wouldn't make a difference to the client. So in the implementation of BatchCallContext, the client waited for all requests in the batch individually. If the server didn't respond to all requests in the batch, the client would eventually just time out. With the addition of batch limits into the server, I anticipate that people will hit this kind of error way more often. To handle this properly, the client now waits for a single response batch and expects it to contain all responses to the requests. --- rpc/client.go | 69 +++++++++++----- rpc/client_test.go | 6 +- rpc/handler.go | 196 +++++++++++++++++++++++++-------------------- rpc/http.go | 14 ++-- rpc/server_test.go | 35 ++++++-- 5 files changed, 195 insertions(+), 125 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 40a0c99b00a1..c8c956e44c7e 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -35,6 +35,7 @@ var ( ErrBadResult = errors.New("bad result in JSON-RPC response") ErrClientQuit = errors.New("client is closed") ErrNoResult = errors.New("no result in JSON-RPC response") + ErrMissingBatchResp = errors.New("response batch did not contain a result for this call") ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow") errClientReconnected = errors.New("client reconnected") errDead = errors.New("connection lost") @@ -138,14 +139,17 @@ type readOp struct { batch bool } +// requestOp represents a pending request. This is used for both batch and non-batch +// requests. type requestOp struct { - ids []json.RawMessage - err error - resp chan *jsonrpcMessage // receives up to len(ids) responses - sub *ClientSubscription // only set for EthSubscribe requests + ids []json.RawMessage + err error + resp chan []*jsonrpcMessage // the response goes here + sub *ClientSubscription // set for Subscribe requests. + hadResponse bool // true when the request was responded to } -func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) { +func (op *requestOp) wait(ctx context.Context, c *Client) ([]*jsonrpcMessage, error) { select { case <-ctx.Done(): // Send the timeout to dispatch so it can remove the request IDs. @@ -350,7 +354,10 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str if err != nil { return err } - op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)} + op := &requestOp{ + ids: []json.RawMessage{msg.ID}, + resp: make(chan []*jsonrpcMessage, 1), + } if c.isHTTP { err = c.sendHTTP(ctx, op, msg) @@ -362,9 +369,12 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str } // dispatch has accepted the request and will close the channel when it quits. - switch resp, err := op.wait(ctx, c); { - case err != nil: + batchresp, err := op.wait(ctx, c) + if err != nil { return err + } + resp := batchresp[0] + switch { case resp.Error != nil: return resp.Error case len(resp.Result) == 0: @@ -405,7 +415,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { ) op := &requestOp{ ids: make([]json.RawMessage, len(b)), - resp: make(chan *jsonrpcMessage, len(b)), + resp: make(chan []*jsonrpcMessage, 1), } for i, elem := range b { msg, err := c.newMessage(elem.Method, elem.Args...) @@ -423,22 +433,32 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { } else { err = c.send(ctx, op, msgs) } + if err != nil { + return err + } + + batchresp, err := op.wait(ctx, c) + if err != nil { + return err + } // Wait for all responses to come back. - for n := 0; n < len(b) && err == nil; n++ { - var resp *jsonrpcMessage - resp, err = op.wait(ctx, c) - if err != nil { - break + for n := 0; n < len(batchresp) && err == nil; n++ { + resp := batchresp[n] + if resp == nil { + // Ignore null responses. These can happen for batches sent via HTTP. + continue } + // Find the element corresponding to this response. - // The element is guaranteed to be present because dispatch - // only sends valid IDs to our channel. - elem := &b[byID[string(resp.ID)]] + index, ok := byID[string(resp.ID)] + if !ok { + continue + } + delete(byID, string(resp.ID)) + + elem := &b[index] if resp.Error != nil { - if resp.Error.Message == errMsgBatchTooLarge { - return resp.Error - } elem.Error = resp.Error continue } @@ -448,6 +468,13 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { } elem.Error = json.Unmarshal(resp.Result, elem.Result) } + + // Check that all expected responses have been received. + for _, index := range byID { + elem := &b[index] + elem.Error = ErrMissingBatchResp + } + return err } @@ -508,7 +535,7 @@ func (c *Client) Subscribe(ctx context.Context, namespace string, channel interf } op := &requestOp{ ids: []json.RawMessage{msg.ID}, - resp: make(chan *jsonrpcMessage), + resp: make(chan []*jsonrpcMessage, 1), sub: newClientSubscription(c, namespace, chanVal), } diff --git a/rpc/client_test.go b/rpc/client_test.go index fbbcbff85eda..1c12d867ee21 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -169,10 +169,12 @@ func TestClientBatchRequest(t *testing.T) { } } +// This checks that, for HTTP connections, the length of batch responses is validated to +// match the request exactly. func TestClientBatchRequest_len(t *testing.T) { b, err := json.Marshal([]jsonrpcMessage{ - {Version: "2.0", ID: json.RawMessage("1"), Method: "foo", Result: json.RawMessage(`"0x1"`)}, - {Version: "2.0", ID: json.RawMessage("2"), Method: "bar", Result: json.RawMessage(`"0x2"`)}, + {Version: "2.0", ID: json.RawMessage("1"), Result: json.RawMessage(`"0x1"`)}, + {Version: "2.0", ID: json.RawMessage("2"), Result: json.RawMessage(`"0x2"`)}, }) if err != nil { t.Fatal("failed to encode jsonrpc message:", err) diff --git a/rpc/handler.go b/rpc/handler.go index 1ebb113d5931..63b149623e33 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -164,6 +164,16 @@ func (b *batchCallBuffer) doWrite(ctx context.Context, conn jsonWriter, isErrorR } } +// handleMsg handles a single non-batch message. +func (h *handler) handleMsg(msg *jsonrpcMessage) { + msgs := []*jsonrpcMessage{msg} + h.handleResponses(msgs, func(msg *jsonrpcMessage) { + h.startCallProc(func(cp *callProc) { + h.handleNonBatchCall(cp, msg) + }) + }) +} + // handleBatch executes all messages in a batch and returns the responses. func (h *handler) handleBatch(msgs []*jsonrpcMessage) { // Emit error response for empty batches: @@ -183,13 +193,12 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { return } - // Handle non-call messages first: + // Handle non-call messages first. + // Here we need to find the requestOp that sent the request batch. calls := make([]*jsonrpcMessage, 0, len(msgs)) - for _, msg := range msgs { - if handled := h.handleImmediate(msg); !handled { - calls = append(calls, msg) - } - } + h.handleResponses(msgs, func(msg *jsonrpcMessage) { + calls = append(calls, msg) + }) if len(calls) == 0 { return } @@ -241,6 +250,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { timer.Stop() } callBuffer.write(cp.ctx, h.conn) + h.addSubscriptions(cp.notifiers) for _, n := range cp.notifiers { n.activate() @@ -248,47 +258,41 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { }) } -// handleMsg handles a single message. -func (h *handler) handleMsg(msg *jsonrpcMessage) { - if ok := h.handleImmediate(msg); ok { - return - } - h.startCallProc(func(cp *callProc) { - var ( - responded sync.Once - timer *time.Timer - cancel context.CancelFunc - ) - cp.ctx, cancel = context.WithCancel(cp.ctx) - defer cancel() - - // Cancel the request context after timeout and send an error response. Since the - // running method might not return immediately on timeout, we must wait for the - // timeout concurrently with processing the request. - if timeout, ok := ContextRequestTimeout(cp.ctx); ok { - timer = time.AfterFunc(timeout, func() { - cancel() - responded.Do(func() { - resp := msg.errorResponse(&internalServerError{errcodeTimeout, errMsgTimeout}) - h.conn.writeJSON(cp.ctx, resp, true) - }) - }) - } - - answer := h.handleCallMsg(cp, msg) - if timer != nil { - timer.Stop() - } - h.addSubscriptions(cp.notifiers) - if answer != nil { +func (h *handler) handleNonBatchCall(cp *callProc, msg *jsonrpcMessage) { + var ( + responded sync.Once + timer *time.Timer + cancel context.CancelFunc + ) + cp.ctx, cancel = context.WithCancel(cp.ctx) + defer cancel() + + // Cancel the request context after timeout and send an error response. Since the + // running method might not return immediately on timeout, we must wait for the + // timeout concurrently with processing the request. + if timeout, ok := ContextRequestTimeout(cp.ctx); ok { + timer = time.AfterFunc(timeout, func() { + cancel() responded.Do(func() { - h.conn.writeJSON(cp.ctx, answer, false) + resp := msg.errorResponse(&internalServerError{errcodeTimeout, errMsgTimeout}) + h.conn.writeJSON(cp.ctx, resp, true) }) - } - for _, n := range cp.notifiers { - n.activate() - } - }) + }) + } + + answer := h.handleCallMsg(cp, msg) + if timer != nil { + timer.Stop() + } + h.addSubscriptions(cp.notifiers) + if answer != nil { + responded.Do(func() { + h.conn.writeJSON(cp.ctx, answer, false) + }) + } + for _, n := range cp.notifiers { + n.activate() + } } // close cancels all requests except for inflightReq and waits for @@ -371,23 +375,60 @@ func (h *handler) startCallProc(fn func(*callProc)) { }() } -// handleImmediate executes non-call messages. It returns false if the message is a -// call or requires a reply. -func (h *handler) handleImmediate(msg *jsonrpcMessage) bool { - start := time.Now() - switch { - case msg.isNotification(): - if strings.HasSuffix(msg.Method, notificationMethodSuffix) { - h.handleSubscriptionResult(msg) - return true +// handleResponse processes method call responses. +func (h *handler) handleResponses(batch []*jsonrpcMessage, handleCall func(*jsonrpcMessage)) { + var resolvedops []*requestOp + handleResp := func(msg *jsonrpcMessage) { + op := h.respWait[string(msg.ID)] + if op == nil { + h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID}) + return } - return false - case msg.isResponse(): - h.handleResponse(msg) - h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "duration", time.Since(start)) - return true - default: - return false + resolvedops = append(resolvedops, op) + delete(h.respWait, string(msg.ID)) + + // For subscription responses, start the subscription if the server + // indicates success. EthSubscribe gets unblocked in either case through + // the op.resp channel. + if op.sub != nil { + if msg.Error != nil { + op.err = msg.Error + } else { + op.err = json.Unmarshal(msg.Result, &op.sub.subid) + if op.err == nil { + go op.sub.run() + h.clientSubs[op.sub.subid] = op.sub + } + } + } + + if !op.hadResponse { + op.hadResponse = true + op.resp <- batch + } + } + + for _, msg := range batch { + start := time.Now() + switch { + case msg.isResponse(): + handleResp(msg) + h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "duration", time.Since(start)) + + case msg.isNotification(): + if strings.HasSuffix(msg.Method, notificationMethodSuffix) { + h.handleSubscriptionResult(msg) + continue + } + handleCall(msg) + + default: + handleCall(msg) + } + } + + for _, op := range resolvedops { + h.removeRequestOp(op) } } @@ -403,33 +444,6 @@ func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) { } } -// handleResponse processes method call responses. -func (h *handler) handleResponse(msg *jsonrpcMessage) { - op := h.respWait[string(msg.ID)] - if op == nil { - h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID}) - return - } - delete(h.respWait, string(msg.ID)) - // For normal responses, just forward the reply to Call/BatchCall. - if op.sub == nil { - op.resp <- msg - return - } - // For subscription responses, start the subscription if the server - // indicates success. EthSubscribe gets unblocked in either case through - // the op.resp channel. - defer close(op.resp) - if msg.Error != nil { - op.err = msg.Error - return - } - if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { - go op.sub.run() - h.clientSubs[op.sub.subid] = op.sub - } -} - // handleCallMsg executes a call message and returns the answer. func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage { start := time.Now() @@ -438,6 +452,7 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess h.handleCall(ctx, msg) h.log.Debug("Served "+msg.Method, "duration", time.Since(start)) return nil + case msg.isCall(): resp := h.handleCall(ctx, msg) var ctx []interface{} @@ -452,8 +467,10 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess h.log.Debug("Served "+msg.Method, ctx...) } return resp + case msg.hasValidID(): return msg.errorResponse(&invalidRequestError{"invalid request"}) + default: return errorMessage(&invalidRequestError{"invalid request"}) } @@ -473,12 +490,14 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage if callb == nil { return msg.errorResponse(&methodNotFoundError{method: msg.Method}) } + args, err := parsePositionalArguments(msg.Params, callb.argTypes) if err != nil { return msg.errorResponse(&invalidParamsError{err.Error()}) } start := time.Now() answer := h.runMethod(cp.ctx, msg, callb, args) + // Collect the statistics for RPC calls if metrics is enabled. // We only care about pure rpc call. Filter out subscription. if callb != h.unsubscribeCb { @@ -491,6 +510,7 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage rpcServingTimer.UpdateSince(start) updateServeTimeHistogram(msg.Method, answer.Error == nil, time.Since(start)) } + return answer } diff --git a/rpc/http.go b/rpc/http.go index 723417f752c7..d9556667958c 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -176,11 +176,12 @@ func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) e } defer respBody.Close() - var respmsg jsonrpcMessage - if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil { + var resp jsonrpcMessage + batch := [1]*jsonrpcMessage{&resp} + if err := json.NewDecoder(respBody).Decode(&resp); err != nil { return err } - op.resp <- &respmsg + op.resp <- batch[:] return nil } @@ -191,16 +192,15 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr return err } defer respBody.Close() - var respmsgs []jsonrpcMessage + + var respmsgs []*jsonrpcMessage if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { return err } if len(respmsgs) != len(msgs) { return fmt.Errorf("batch has %d requests but response has %d: %w", len(msgs), len(respmsgs), ErrBadResult) } - for i := 0; i < len(respmsgs); i++ { - op.resp <- &respmsgs[i] - } + op.resp <- respmsgs return nil } diff --git a/rpc/server_test.go b/rpc/server_test.go index 80bffef02f9a..78b2fe1ab0d0 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -19,6 +19,7 @@ package rpc import ( "bufio" "bytes" + "errors" "io" "net" "os" @@ -172,21 +173,21 @@ func TestServerBatchResponseSizeLimit(t *testing.T) { t.Fatal("error sending batch:", err) } for i := range batch { - // We expect the first two queries to be ok, but after that the - // size limit hits. + // We expect the first two queries to be ok, but after that the size limit takes effect. if i < 2 { if batch[i].Error != nil { t.Fatalf("batch elem %d has unexpected error: %v", i, batch[i].Error) } continue } - // After two, we expect error + // After two, we expect an error. re, ok := batch[i].Error.(Error) if !ok { t.Fatalf("batch elem %d has wrong error: %v", i, batch[i].Error) } - if have, want := re.ErrorCode(), errcodeResponseTooLarge; have != want { - t.Errorf("batch elem %d wrong error code, have %d want %d", i, have, want) + wantedCode := errcodeResponseTooLarge + if re.ErrorCode() != wantedCode { + t.Errorf("batch elem %d wrong error code, have %d want %d", i, re.ErrorCode(), wantedCode) } } } @@ -196,7 +197,27 @@ func TestServerBatchRequestLimit(t *testing.T) { defer server.Stop() server.SetBatchLimits(2, 100000) client := DialInProc(server) - if have, want := client.BatchCall(make([]BatchElem, 3)).Error(), errMsgBatchTooLarge; have != want { - t.Errorf("error mismatch, have %v want %v", have, want) + + batch := make([]BatchElem, 3) + err := client.BatchCall(batch) + if err != nil { + t.Fatal("unexpected error:", err) + } + + // Check that the first response indicates an error with batch size. + var err0 Error + if !errors.As(batch[0].Error, &err0) { + t.Fatalf("batch elem 0 has wrong error type: %T", batch[0].Error) + } else { + if err0.ErrorCode() != -32600 || err0.Error() != errMsgBatchTooLarge { + t.Fatalf("wrong error on batch elem zero: %v", err0) + } + } + + // Check that remaining response batch elements are reported as absent. + for i, elem := range batch[1:] { + if elem.Error != ErrMissingBatchResp { + t.Fatalf("batch elem %d has unexpected error: %v", i+1, elem.Error) + } } } From 82b5208b5706e646fef241b80f46e8027b6ea65c Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 8 Jun 2023 15:02:42 +0200 Subject: [PATCH 23/27] rpc: attach "batch too large" error to the first call In JSON-RPC, only method calls with a non-null "id" can be responded to. Since batches can contain non-call messages or notifications, the best effort thing we can do to report an error with the batch itself is the first method call message. --- rpc/client.go | 12 +++---- rpc/client_test.go | 37 ++++++++++++++++++++++ rpc/handler.go | 43 +++++++++++++++++--------- rpc/server_test.go | 32 +------------------ rpc/testdata/invalid-batch-toolarge.js | 13 ++++++++ rpc/testdata/invalid-batch.js | 2 ++ 6 files changed, 87 insertions(+), 52 deletions(-) create mode 100644 rpc/testdata/invalid-batch-toolarge.js diff --git a/rpc/client.go b/rpc/client.go index c8c956e44c7e..e8933bbd1d1c 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -457,16 +457,16 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { } delete(byID, string(resp.ID)) + // Assign result and error. elem := &b[index] - if resp.Error != nil { + switch { + case resp.Error != nil: elem.Error = resp.Error - continue - } - if len(resp.Result) == 0 { + case resp.Result == nil: elem.Error = ErrNoResult - continue + default: + elem.Error = json.Unmarshal(resp.Result, elem.Result) } - elem.Error = json.Unmarshal(resp.Result, elem.Result) } // Check that all expected responses have been received. diff --git a/rpc/client_test.go b/rpc/client_test.go index 1c12d867ee21..f104f00fcc27 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -218,6 +218,43 @@ func TestClientBatchRequest_len(t *testing.T) { }) } +// This checks that the client can handle the case where the server doesn't +// respond to all requests in a batch. +func TestClientBatchRequestLimit(t *testing.T) { + server := newTestServer() + defer server.Stop() + server.SetBatchLimits(2, 100000) + client := DialInProc(server) + + batch := []BatchElem{ + {Method: "foo"}, + {Method: "bar"}, + {Method: "baz"}, + } + err := client.BatchCall(batch) + if err != nil { + t.Fatal("unexpected error:", err) + } + + // Check that the first response indicates an error with batch size. + var err0 Error + if !errors.As(batch[0].Error, &err0) { + t.Log("error zero:", batch[0].Error) + t.Fatalf("batch elem 0 has wrong error type: %T", batch[0].Error) + } else { + if err0.ErrorCode() != -32600 || err0.Error() != errMsgBatchTooLarge { + t.Fatalf("wrong error on batch elem zero: %v", err0) + } + } + + // Check that remaining response batch elements are reported as absent. + for i, elem := range batch[1:] { + if elem.Error != ErrMissingBatchResp { + t.Fatalf("batch elem %d has unexpected error: %v", i+1, elem.Error) + } + } +} + func TestClientNotify(t *testing.T) { server := newTestServer() defer server.Stop() diff --git a/rpc/handler.go b/rpc/handler.go index 63b149623e33..4f48c7931c65 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -164,16 +164,6 @@ func (b *batchCallBuffer) doWrite(ctx context.Context, conn jsonWriter, isErrorR } } -// handleMsg handles a single non-batch message. -func (h *handler) handleMsg(msg *jsonrpcMessage) { - msgs := []*jsonrpcMessage{msg} - h.handleResponses(msgs, func(msg *jsonrpcMessage) { - h.startCallProc(func(cp *callProc) { - h.handleNonBatchCall(cp, msg) - }) - }) -} - // handleBatch executes all messages in a batch and returns the responses. func (h *handler) handleBatch(msgs []*jsonrpcMessage) { // Emit error response for empty batches: @@ -184,11 +174,10 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { }) return } - - if len(msgs) > h.batchRequestLimit && h.batchRequestLimit != 0 { + // Apply limit on total number of requests. + if h.batchRequestLimit != 0 && len(msgs) > h.batchRequestLimit { h.startCallProc(func(cp *callProc) { - resp := msgs[0].errorResponse(&invalidRequestError{errMsgBatchTooLarge}) - h.conn.writeJSON(cp.ctx, resp, true) + h.respondWithBatchTooLarge(cp, msgs) }) return } @@ -249,15 +238,39 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { if timer != nil { timer.Stop() } - callBuffer.write(cp.ctx, h.conn) h.addSubscriptions(cp.notifiers) + callBuffer.write(cp.ctx, h.conn) for _, n := range cp.notifiers { n.activate() } }) } +func (h *handler) respondWithBatchTooLarge(cp *callProc, batch []*jsonrpcMessage) { + resp := errorMessage(&invalidRequestError{errMsgBatchTooLarge}) + // Find the first call and add its "id" field to the error. + // This is the best we can do, given that the protocol doesn't have a way + // of reporting an error for the entire batch. + for _, msg := range batch { + if msg.isCall() { + resp.ID = msg.ID + break + } + } + h.conn.writeJSON(cp.ctx, []*jsonrpcMessage{resp}, true) +} + +// handleMsg handles a single non-batch message. +func (h *handler) handleMsg(msg *jsonrpcMessage) { + msgs := []*jsonrpcMessage{msg} + h.handleResponses(msgs, func(msg *jsonrpcMessage) { + h.startCallProc(func(cp *callProc) { + h.handleNonBatchCall(cp, msg) + }) + }) +} + func (h *handler) handleNonBatchCall(cp *callProc, msg *jsonrpcMessage) { var ( responded sync.Once diff --git a/rpc/server_test.go b/rpc/server_test.go index 78b2fe1ab0d0..5d3929dfdc69 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -19,7 +19,6 @@ package rpc import ( "bufio" "bytes" - "errors" "io" "net" "os" @@ -71,6 +70,7 @@ func TestServer(t *testing.T) { func runTestScript(t *testing.T, file string) { server := newTestServer() + server.SetBatchLimits(4, 100000) content, err := os.ReadFile(file) if err != nil { t.Fatal(err) @@ -191,33 +191,3 @@ func TestServerBatchResponseSizeLimit(t *testing.T) { } } } - -func TestServerBatchRequestLimit(t *testing.T) { - server := newTestServer() - defer server.Stop() - server.SetBatchLimits(2, 100000) - client := DialInProc(server) - - batch := make([]BatchElem, 3) - err := client.BatchCall(batch) - if err != nil { - t.Fatal("unexpected error:", err) - } - - // Check that the first response indicates an error with batch size. - var err0 Error - if !errors.As(batch[0].Error, &err0) { - t.Fatalf("batch elem 0 has wrong error type: %T", batch[0].Error) - } else { - if err0.ErrorCode() != -32600 || err0.Error() != errMsgBatchTooLarge { - t.Fatalf("wrong error on batch elem zero: %v", err0) - } - } - - // Check that remaining response batch elements are reported as absent. - for i, elem := range batch[1:] { - if elem.Error != ErrMissingBatchResp { - t.Fatalf("batch elem %d has unexpected error: %v", i+1, elem.Error) - } - } -} diff --git a/rpc/testdata/invalid-batch-toolarge.js b/rpc/testdata/invalid-batch-toolarge.js new file mode 100644 index 000000000000..218fea58aaac --- /dev/null +++ b/rpc/testdata/invalid-batch-toolarge.js @@ -0,0 +1,13 @@ +// This file checks the behavior of the batch item limit code. +// In tests, the batch item limit is set to 4. So to trigger the error, +// all batches in this file have 5 elements. + +// For batches that do not contain any calls, a response message with "id" == null +// is returned. + +--> [{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]}] +<-- [{"jsonrpc":"2.0","id":null,"error":{"code":-32600,"message":"batch too large"}}] + +// For batches with at least one call, the call's "id" is used. +--> [{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","id":3,"method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]}] +<-- [{"jsonrpc":"2.0","id":3,"error":{"code":-32600,"message":"batch too large"}}] diff --git a/rpc/testdata/invalid-batch.js b/rpc/testdata/invalid-batch.js index 768dbc837e95..911ec7ab132e 100644 --- a/rpc/testdata/invalid-batch.js +++ b/rpc/testdata/invalid-batch.js @@ -15,3 +15,5 @@ --> [{"jsonrpc":"2.0","id":1,"method":"test_echo","params":["foo",1]},55,{"jsonrpc":"2.0","id":2,"method":"unknown_method"},{"foo":"bar"}] <-- [{"jsonrpc":"2.0","id":1,"result":{"String":"foo","Int":1,"Args":null}},{"jsonrpc":"2.0","id":null,"error":{"code":-32600,"message":"invalid request"}},{"jsonrpc":"2.0","id":2,"error":{"code":-32601,"message":"the method unknown_method does not exist/is not available"}},{"jsonrpc":"2.0","id":null,"error":{"code":-32600,"message":"invalid request"}}] + + From f0688d665002558c0a4184a8830f2faa411b5963 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 8 Jun 2023 15:49:28 +0200 Subject: [PATCH 24/27] rpc: remove default limits I think adding these limits might break people's setups, so best to avoid configuring them by default in package rpc. --- cmd/clef/main.go | 2 +- cmd/utils/flags.go | 4 ++-- node/defaults.go | 24 +++++++++++++----------- rpc/client.go | 12 ------------ 4 files changed, 16 insertions(+), 26 deletions(-) diff --git a/cmd/clef/main.go b/cmd/clef/main.go index 959901d4137e..69909eead96e 100644 --- a/cmd/clef/main.go +++ b/cmd/clef/main.go @@ -732,7 +732,7 @@ func signer(c *cli.Context) error { cors := utils.SplitAndTrim(c.String(utils.HTTPCORSDomainFlag.Name)) srv := rpc.NewServer() - srv.SetBatchLimits(utils.BatchRequestLimit.Value, utils.BatchResponseMaxSize.Value) + srv.SetBatchLimits(node.DefaultConfig.BatchRequestLimit, node.DefaultConfig.BatchResponseMaxSize) err := node.RegisterApis(rpcAPI, []string{"account"}, srv) if err != nil { utils.Fatalf("Could not register API: %w", err) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 286b299eda24..104ab5dbd014 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -721,13 +721,13 @@ var ( BatchRequestLimit = &cli.IntFlag{ Name: "rpc.batch-request-limit", Usage: "Maximum number of requests in a batch", - Value: rpc.DefaultBatchRequestLimit, + Value: node.DefaultConfig.BatchRequestLimit, Category: flags.APICategory, } BatchResponseMaxSize = &cli.IntFlag{ Name: "rpc.batch-response-max-size", Usage: "Maximum number of bytes returned from a batched call", - Value: rpc.DefaultBatchResponseMaxSize, + Value: node.DefaultConfig.BatchResponseMaxSize, Category: flags.APICategory, } EnablePersonal = &cli.BoolFlag{ diff --git a/node/defaults.go b/node/defaults.go index fcfbc934bfd4..d8f718121e80 100644 --- a/node/defaults.go +++ b/node/defaults.go @@ -46,17 +46,19 @@ var ( // DefaultConfig contains reasonable default settings. var DefaultConfig = Config{ - DataDir: DefaultDataDir(), - HTTPPort: DefaultHTTPPort, - AuthAddr: DefaultAuthHost, - AuthPort: DefaultAuthPort, - AuthVirtualHosts: DefaultAuthVhosts, - HTTPModules: []string{"net", "web3"}, - HTTPVirtualHosts: []string{"localhost"}, - HTTPTimeouts: rpc.DefaultHTTPTimeouts, - WSPort: DefaultWSPort, - WSModules: []string{"net", "web3"}, - GraphQLVirtualHosts: []string{"localhost"}, + DataDir: DefaultDataDir(), + HTTPPort: DefaultHTTPPort, + AuthAddr: DefaultAuthHost, + AuthPort: DefaultAuthPort, + AuthVirtualHosts: DefaultAuthVhosts, + HTTPModules: []string{"net", "web3"}, + HTTPVirtualHosts: []string{"localhost"}, + HTTPTimeouts: rpc.DefaultHTTPTimeouts, + WSPort: DefaultWSPort, + WSModules: []string{"net", "web3"}, + BatchRequestLimit: 1000, + BatchResponseMaxSize: 25 * 1000 * 1000, + GraphQLVirtualHosts: []string{"localhost"}, P2P: p2p.Config{ ListenAddr: ":30303", MaxPeers: 50, diff --git a/rpc/client.go b/rpc/client.go index e8933bbd1d1c..f7188461e89a 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -47,12 +47,6 @@ const ( subscribeTimeout = 10 * time.Second // overall timeout eth_subscribe, rpc_modules calls ) -// Batch limits -const ( - DefaultBatchRequestLimit = 1000 // Maximum number of items in a batch. - DefaultBatchResponseMaxSize = 25 * 1000 * 1000 // Maximum number of bytes returned from calls. -) - const ( // Subscriptions are removed when the subscriber cannot keep up. // @@ -269,12 +263,6 @@ func initClient(conn ServerCodec, services *serviceRegistry, cfg *clientConfig) if c.idgen == nil { c.idgen = randomIDGenerator() } - if c.batchItemLimit == 0 { - c.batchItemLimit = DefaultBatchRequestLimit - } - if c.batchResponseMaxSize == 0 { - c.batchResponseMaxSize = DefaultBatchResponseMaxSize - } // Launch the main loop. if !isHTTP { From cd73291fce1f01376c5ded8bcf46b2c567301ab6 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 8 Jun 2023 19:46:54 +0200 Subject: [PATCH 25/27] rpc: remove added blank lines in invalid-batch.js --- rpc/testdata/invalid-batch.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/rpc/testdata/invalid-batch.js b/rpc/testdata/invalid-batch.js index 911ec7ab132e..768dbc837e95 100644 --- a/rpc/testdata/invalid-batch.js +++ b/rpc/testdata/invalid-batch.js @@ -15,5 +15,3 @@ --> [{"jsonrpc":"2.0","id":1,"method":"test_echo","params":["foo",1]},55,{"jsonrpc":"2.0","id":2,"method":"unknown_method"},{"foo":"bar"}] <-- [{"jsonrpc":"2.0","id":1,"result":{"String":"foo","Int":1,"Args":null}},{"jsonrpc":"2.0","id":null,"error":{"code":-32600,"message":"invalid request"}},{"jsonrpc":"2.0","id":2,"error":{"code":-32601,"message":"the method unknown_method does not exist/is not available"}},{"jsonrpc":"2.0","id":null,"error":{"code":-32600,"message":"invalid request"}}] - - From 7048bfc35b3a4f172a91c046bfa2c1b201facee9 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 8 Jun 2023 20:17:51 +0200 Subject: [PATCH 26/27] rpc: remove special error handling for HTTP batch response length --- rpc/client_test.go | 58 ++++++++++++++++++++++++++++++++++------------ rpc/http.go | 3 --- 2 files changed, 43 insertions(+), 18 deletions(-) diff --git a/rpc/client_test.go b/rpc/client_test.go index f104f00fcc27..ebdb44c2a93d 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -187,33 +187,61 @@ func TestClientBatchRequest_len(t *testing.T) { })) t.Cleanup(s.Close) - client, err := Dial(s.URL) - if err != nil { - t.Fatal("failed to dial test server:", err) - } - defer client.Close() - t.Run("too-few", func(t *testing.T) { + client, err := Dial(s.URL) + if err != nil { + t.Fatal("failed to dial test server:", err) + } + defer client.Close() + batch := []BatchElem{ - {Method: "foo"}, - {Method: "bar"}, - {Method: "baz"}, + {Method: "foo", Result: new(string)}, + {Method: "bar", Result: new(string)}, + {Method: "baz", Result: new(string)}, } ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) defer cancelFn() - if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) { - t.Errorf("expected %q but got: %v", ErrBadResult, err) + + if err := client.BatchCallContext(ctx, batch); err != nil { + t.Fatal("error:", err) + } + for i, elem := range batch[:2] { + if elem.Error != nil { + t.Errorf("expected no error for batch element %d, got %q", i, elem.Error) + } + } + for i, elem := range batch[2:] { + if elem.Error != ErrMissingBatchResp { + t.Errorf("wrong error %q for batch element %d", elem.Error, i+2) + } } }) t.Run("too-many", func(t *testing.T) { + client, err := Dial(s.URL) + if err != nil { + t.Fatal("failed to dial test server:", err) + } + defer client.Close() + batch := []BatchElem{ - {Method: "foo"}, + {Method: "foo", Result: new(string)}, } ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) defer cancelFn() - if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) { - t.Errorf("expected %q but got: %v", ErrBadResult, err) + + if err := client.BatchCallContext(ctx, batch); err != nil { + t.Fatal("error:", err) + } + for i, elem := range batch[:1] { + if elem.Error != nil { + t.Errorf("expected no error for batch element %d, got %q", i, elem.Error) + } + } + for i, elem := range batch[1:] { + if elem.Error != ErrMissingBatchResp { + t.Errorf("wrong error %q for batch element %d", elem.Error, i+2) + } } }) } @@ -349,7 +377,7 @@ func testClientCancel(transport string, t *testing.T) { _, hasDeadline := ctx.Deadline() t.Errorf("no error for call with %v wait time (deadline: %v)", timeout, hasDeadline) // default: - // t.Logf("got expected error with %v wait time: %v", timeout, err) + // t.Logf("got expected error with %v wait time: %v", timeout, err) } cancel() } diff --git a/rpc/http.go b/rpc/http.go index d9556667958c..741fa1c0eb4f 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -197,9 +197,6 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { return err } - if len(respmsgs) != len(msgs) { - return fmt.Errorf("batch has %d requests but response has %d: %w", len(msgs), len(respmsgs), ErrBadResult) - } op.resp <- respmsgs return nil } From 6841858f7434b7b08ddec0e8656a47c50294e856 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 9 Jun 2023 11:33:14 +0200 Subject: [PATCH 27/27] rpc: rename error --- rpc/client.go | 6 +++--- rpc/client_test.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index f7188461e89a..c3114ef1d20f 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -34,8 +34,8 @@ import ( var ( ErrBadResult = errors.New("bad result in JSON-RPC response") ErrClientQuit = errors.New("client is closed") - ErrNoResult = errors.New("no result in JSON-RPC response") - ErrMissingBatchResp = errors.New("response batch did not contain a result for this call") + ErrNoResult = errors.New("JSON-RPC response has no result") + ErrMissingBatchResponse = errors.New("response batch did not contain a response to this call") ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow") errClientReconnected = errors.New("client reconnected") errDead = errors.New("connection lost") @@ -460,7 +460,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { // Check that all expected responses have been received. for _, index := range byID { elem := &b[index] - elem.Error = ErrMissingBatchResp + elem.Error = ErrMissingBatchResponse } return err diff --git a/rpc/client_test.go b/rpc/client_test.go index ebdb44c2a93d..7c96b2d6667b 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -211,7 +211,7 @@ func TestClientBatchRequest_len(t *testing.T) { } } for i, elem := range batch[2:] { - if elem.Error != ErrMissingBatchResp { + if elem.Error != ErrMissingBatchResponse { t.Errorf("wrong error %q for batch element %d", elem.Error, i+2) } } @@ -239,7 +239,7 @@ func TestClientBatchRequest_len(t *testing.T) { } } for i, elem := range batch[1:] { - if elem.Error != ErrMissingBatchResp { + if elem.Error != ErrMissingBatchResponse { t.Errorf("wrong error %q for batch element %d", elem.Error, i+2) } } @@ -277,7 +277,7 @@ func TestClientBatchRequestLimit(t *testing.T) { // Check that remaining response batch elements are reported as absent. for i, elem := range batch[1:] { - if elem.Error != ErrMissingBatchResp { + if elem.Error != ErrMissingBatchResponse { t.Fatalf("batch elem %d has unexpected error: %v", i+1, elem.Error) } }