Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Enable grpc compression #4804

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
ReadTimeout: rest.DefaultReadTimeout,
IdleTimeout: rest.DefaultIdleTimeout,
},
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
CompressorName: grpcutils.NoCompressor,
},
stateStreamConf: state_stream.Config{
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
Expand Down Expand Up @@ -870,6 +871,7 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
flags.UintVar(&builder.rpcConf.BackendConfig.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.BackendConfig.MaxHeightRange, "maximum size for height range requests")
flags.StringSliceVar(&builder.rpcConf.BackendConfig.PreferredExecutionNodeIDs, "preferred-execution-node-ids", defaultConfig.rpcConf.BackendConfig.PreferredExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call e.g. b4a4dbdcd443d...,fb386a6a... etc.")
flags.StringSliceVar(&builder.rpcConf.BackendConfig.FixedExecutionNodeIDs, "fixed-execution-node-ids", defaultConfig.rpcConf.BackendConfig.FixedExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call if no matching preferred execution id is found e.g. b4a4dbdcd443d...,fb386a6a... etc.")
flags.StringVar(&builder.rpcConf.CompressorName, "grpc-compressor", defaultConfig.rpcConf.CompressorName, "name of grpc compressor that will be used for requests to other nodes. One of (gzip, snappy, deflate)")
flags.BoolVar(&builder.logTxTimeToFinalized, "log-tx-time-to-finalized", defaultConfig.logTxTimeToFinalized, "log transaction time to finalized")
flags.BoolVar(&builder.logTxTimeToExecuted, "log-tx-time-to-executed", defaultConfig.logTxTimeToExecuted, "log transaction time to executed")
flags.BoolVar(&builder.logTxTimeToFinalizedExecuted, "log-tx-time-to-finalized-executed", defaultConfig.logTxTimeToFinalizedExecuted, "log transaction time to finalized and executed")
Expand Down Expand Up @@ -1241,6 +1243,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
accessMetrics,
config.MaxMsgSize,
backendConfig.CircuitBreakerConfig,
config.CompressorName,
),
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
ReadTimeout: rest.DefaultReadTimeout,
IdleTimeout: rest.DefaultIdleTimeout,
},
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
CompressorName: grpcutils.NoCompressor,
},
rpcMetricsEnabled: false,
apiRatelimits: nil,
Expand Down Expand Up @@ -919,6 +920,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
accessMetrics,
config.MaxMsgSize,
backendConfig.CircuitBreakerConfig,
config.CompressorName,
),
}

Expand Down
13 changes: 13 additions & 0 deletions engine/access/rpc/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"pgregory.net/rapid"

"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/utils/grpcutils"
"github.com/onflow/flow-go/utils/unittest"
)

Expand All @@ -49,6 +50,7 @@ func TestProxyAccessAPI(t *testing.T) {
connectionFactory.AccessMetrics,
0,
CircuitBreakerConfig{},
grpcutils.NoCompressor,
)

proxyConnectionFactory := ProxyConnectionFactory{
Expand Down Expand Up @@ -99,6 +101,7 @@ func TestProxyExecutionAPI(t *testing.T) {
connectionFactory.AccessMetrics,
0,
CircuitBreakerConfig{},
grpcutils.NoCompressor,
)

proxyConnectionFactory := ProxyConnectionFactory{
Expand Down Expand Up @@ -143,6 +146,7 @@ func TestProxyAccessAPIConnectionReuse(t *testing.T) {
connectionFactory.AccessMetrics,
0,
CircuitBreakerConfig{},
grpcutils.NoCompressor,
)

proxyConnectionFactory := ProxyConnectionFactory{
Expand Down Expand Up @@ -194,6 +198,7 @@ func TestProxyExecutionAPIConnectionReuse(t *testing.T) {
connectionFactory.AccessMetrics,
0,
CircuitBreakerConfig{},
grpcutils.NoCompressor,
)

proxyConnectionFactory := ProxyConnectionFactory{
Expand Down Expand Up @@ -252,6 +257,7 @@ func TestExecutionNodeClientTimeout(t *testing.T) {
connectionFactory.AccessMetrics,
0,
CircuitBreakerConfig{},
grpcutils.NoCompressor,
)

// create the execution API client
Expand Down Expand Up @@ -298,6 +304,7 @@ func TestCollectionNodeClientTimeout(t *testing.T) {
connectionFactory.AccessMetrics,
0,
CircuitBreakerConfig{},
grpcutils.NoCompressor,
)

// create the collection API client
Expand Down Expand Up @@ -345,6 +352,7 @@ func TestConnectionPoolFull(t *testing.T) {
connectionFactory.AccessMetrics,
0,
CircuitBreakerConfig{},
grpcutils.NoCompressor,
)

cn1Address := "foo1:123"
Expand Down Expand Up @@ -419,6 +427,7 @@ func TestConnectionPoolStale(t *testing.T) {
connectionFactory.AccessMetrics,
0,
CircuitBreakerConfig{},
grpcutils.NoCompressor,
)

proxyConnectionFactory := ProxyConnectionFactory{
Expand Down Expand Up @@ -506,6 +515,7 @@ func TestExecutionNodeClientClosedGracefully(t *testing.T) {
connectionFactory.AccessMetrics,
0,
CircuitBreakerConfig{},
grpcutils.NoCompressor,
)

clientAddress := en.listener.Addr().String()
Expand Down Expand Up @@ -605,6 +615,7 @@ func TestEvictingCacheClients(t *testing.T) {
connectionFactory.AccessMetrics,
0,
CircuitBreakerConfig{},
grpcutils.NoCompressor,
)

clientAddress := cn.listener.Addr().String()
Expand Down Expand Up @@ -831,6 +842,7 @@ func TestCircuitBreakerExecutionNode(t *testing.T) {
MaxRequests: 1,
RestoreTimeout: circuitBreakerRestoreTimeout,
},
grpcutils.NoCompressor,
)

// Set metrics reporting.
Expand Down Expand Up @@ -916,6 +928,7 @@ func TestCircuitBreakerCollectionNode(t *testing.T) {
MaxRequests: 1,
RestoreTimeout: circuitBreakerRestoreTimeout,
},
grpcutils.NoCompressor,
)

// Set metrics reporting.
Expand Down
122 changes: 122 additions & 0 deletions engine/access/rpc/connection/grpc_compression_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package connection

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
testifymock "github.com/stretchr/testify/mock"

"google.golang.org/grpc/encoding/gzip"

"github.com/onflow/flow-go/engine/common/grpc/compressor/deflate"
"github.com/onflow/flow-go/engine/common/grpc/compressor/snappy"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/utils/grpcutils"
"github.com/onflow/flow-go/utils/unittest"

"github.com/onflow/flow/protobuf/go/flow/execution"
)

// BenchmarkWithGzipCompression benchmarks the gRPC request to execution nodes using gzip compressor.
func BenchmarkWithGzipCompression(b *testing.B) {
runBenchmark(b, gzip.Name)
}

// BenchmarkWithSnappyCompression benchmarks the gRPC request to execution nodes using snappy compressor.
func BenchmarkWithSnappyCompression(b *testing.B) {
runBenchmark(b, snappy.Name)
}

// BenchmarkWithDeflateCompression benchmarks the gRPC request to execution nodes using deflate compressor.
func BenchmarkWithDeflateCompression(b *testing.B) {
runBenchmark(b, deflate.Name)
}

// runBenchmark is a helper function that performs the benchmarking for different compressors.
func runBenchmark(b *testing.B, compressorName string) {
// create an execution node
en := new(executionNode)
en.start(b)
defer en.stop(b)

blockHeaders := getHeaders(5)
exeResults := make([]*execution.GetEventsForBlockIDsResponse_Result, len(blockHeaders))
for i := 0; i < len(blockHeaders); i++ {
exeResults[i] = &execution.GetEventsForBlockIDsResponse_Result{
BlockId: convert.IdentifierToMessage(blockHeaders[i].ID()),
BlockHeight: blockHeaders[i].Height,
Events: convert.EventsToMessages(getEvents(10)),
}
}
expectedEventsResponse := &execution.GetEventsForBlockIDsResponse{
Results: exeResults,
}

blockIDs := make([]flow.Identifier, len(blockHeaders))
for i, header := range blockHeaders {
blockIDs[i] = header.ID()
}
eventsReq := &execution.GetEventsForBlockIDsRequest{
BlockIds: convert.IdentifiersToMessages(blockIDs),
Type: string(flow.EventAccountCreated),
}

en.handler.On("GetEventsForBlockIDs", testifymock.Anything, testifymock.Anything).
Return(expectedEventsResponse, nil)

// create the factory
connectionFactory := new(ConnectionFactoryImpl)
// set the execution grpc port
connectionFactory.ExecutionGRPCPort = en.port

// set metrics reporting
connectionFactory.AccessMetrics = metrics.NewNoopCollector()
connectionFactory.Manager = NewManager(
nil,
unittest.Logger(),
connectionFactory.AccessMetrics,
grpcutils.DefaultMaxMsgSize,
CircuitBreakerConfig{},
compressorName,
)

proxyConnectionFactory := ProxyConnectionFactory{
ConnectionFactory: connectionFactory,
targetAddress: en.listener.Addr().String(),
}

// get an execution API client
client, _, err := proxyConnectionFactory.GetExecutionAPIClient("foo")
assert.NoError(b, err)

ctx := context.Background()
b.ResetTimer()
// make the call to the execution node
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < b.N; i++ {
_, err := client.GetEventsForBlockIDs(ctx, eventsReq)
assert.NoError(b, err)
}
}

// getEvents generates a slice of flow events with a specified length.
func getEvents(n int) []flow.Event {
events := make([]flow.Event, n)
for i := range events {
events[i] = flow.Event{Type: flow.EventAccountCreated}
}
return events
}

// getHeaders generates a slice of flow headers with a specified length.
func getHeaders(n int) []*flow.Header {
headers := make([]*flow.Header, n)
for i := range headers {
b := unittest.BlockFixture()
headers[i] = b.Header

}
return headers
}
22 changes: 18 additions & 4 deletions engine/access/rpc/connection/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/encoding/gzip" //required for gRPC compression
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

_ "github.com/onflow/flow-go/engine/common/grpc/compressor/deflate" //required for gRPC compression
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/snappy" //required for gRPC compression
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/utils/grpcutils"
)

// DefaultClientTimeout is used when making a GRPC request to a collection node or an execution node.
Expand All @@ -43,6 +47,7 @@ type Manager struct {
metrics module.AccessMetrics
maxMsgSize uint
circuitBreakerConfig CircuitBreakerConfig
compressorName string
}

// CircuitBreakerConfig is a configuration struct for the circuit breaker.
Expand All @@ -66,13 +71,15 @@ func NewManager(
metrics module.AccessMetrics,
maxMsgSize uint,
circuitBreakerConfig CircuitBreakerConfig,
compressorName string,
) Manager {
return Manager{
cache: cache,
logger: logger,
metrics: metrics,
maxMsgSize: maxMsgSize,
circuitBreakerConfig: circuitBreakerConfig,
compressorName: compressorName,
}
}

Expand Down Expand Up @@ -203,12 +210,19 @@ func (m *Manager) createConnection(address string, timeout time.Duration, cached
// The connections should be safe to be persisted and reused.
// https://pkg.go.dev/google.golang.org/grpc#WithKeepaliveParams
// https://grpc.io/blog/grpc-on-http2/#keeping-connections-alive
var opts []grpc.DialOption
opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(m.maxMsgSize))))
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
opts = append(opts, grpc.WithKeepaliveParams(keepaliveParams))
opts = append(opts, grpc.WithChainUnaryInterceptor(connInterceptors...))

if m.compressorName != grpcutils.NoCompressor {
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(m.compressorName)))
}

conn, err := grpc.Dial(
address,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(m.maxMsgSize))),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepaliveParams),
grpc.WithChainUnaryInterceptor(connInterceptors...),
opts...,
)
if err != nil {
return nil, fmt.Errorf("failed to connect to address %s: %w", address, err)
Expand Down
Loading
Loading