Skip to content

Commit

Permalink
Merge branch 'master' into upgrade-lru-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
nozim committed Aug 23, 2023
2 parents 689ac6d + 8e4b0fc commit c2938e4
Show file tree
Hide file tree
Showing 28 changed files with 1,320 additions and 1,370 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ go-math-rand-check:
# `exclude` should only specify non production code (test, bench..).
# If this check fails, try updating your code by using:
# - "crypto/rand" or "flow-go/utils/rand" for non-deterministic randomness
# - "flow-go/crypto/random" for deterministic randomness
# - "flow-go/crypto/random" for deterministic randomness
grep --include=\*.go \
--exclude=*test* --exclude=*helper* --exclude=*example* --exclude=*fixture* --exclude=*benchmark* --exclude=*profiler* \
--exclude-dir=*test* --exclude-dir=*helper* --exclude-dir=*example* --exclude-dir=*fixture* --exclude-dir=*benchmark* --exclude-dir=*profiler* -rnw '"math/rand"'; \
Expand Down Expand Up @@ -194,6 +194,8 @@ generate-mocks: install-mock-generators
mockery --name 'API' --dir="./engine/protocol" --case=underscore --output="./engine/protocol/mock" --outpkg="mock"
mockery --name '.*' --dir="./engine/access/state_stream" --case=underscore --output="./engine/access/state_stream/mock" --outpkg="mock"
mockery --name 'ConnectionFactory' --dir="./engine/access/rpc/connection" --case=underscore --output="./engine/access/rpc/connection/mock" --outpkg="mock"
mockery --name 'Communicator' --dir="./engine/access/rpc/backend" --case=underscore --output="./engine/access/rpc/backend/mock" --outpkg="mock"

mockery --name '.*' --dir=model/fingerprint --case=underscore --output="./model/fingerprint/mock" --outpkg="mock"
mockery --name 'ExecForkActor' --structname 'ExecForkActorMock' --dir=module/mempool/consensus/mock/ --case=underscore --output="./module/mempool/consensus/mock/" --outpkg="mock"
mockery --name '.*' --dir=engine/verification/fetcher/ --case=underscore --output="./engine/verification/fetcher/mock" --outpkg="mockfetcher"
Expand Down
59 changes: 32 additions & 27 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ import (
badger "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/go-bitswap"
"github.com/rs/zerolog"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/go-bitswap"

"github.com/onflow/flow-go/admin/commands"
stateSyncCommands "github.com/onflow/flow-go/admin/commands/state_synchronization"
storageCommands "github.com/onflow/flow-go/admin/commands/storage"
Expand Down Expand Up @@ -131,6 +130,7 @@ type AccessNodeConfig struct {
executionDataStartHeight uint64
executionDataConfig edrequester.ExecutionDataConfig
PublicNetworkConfig PublicNetworkConfig
TxResultCacheSize uint
}

type PublicNetworkConfig struct {
Expand Down Expand Up @@ -207,6 +207,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
RetryDelay: edrequester.DefaultRetryDelay,
MaxRetryDelay: edrequester.DefaultMaxRetryDelay,
},
TxResultCacheSize: 0,
}
}

Expand Down Expand Up @@ -720,6 +721,8 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
flags.DurationVar(&builder.stateStreamConf.ClientSendTimeout, "state-stream-send-timeout", defaultConfig.stateStreamConf.ClientSendTimeout, "maximum wait before timing out while sending a response to a streaming client e.g. 30s")
flags.UintVar(&builder.stateStreamConf.ClientSendBufferSize, "state-stream-send-buffer-size", defaultConfig.stateStreamConf.ClientSendBufferSize, "maximum number of responses to buffer within a stream")
flags.Float64Var(&builder.stateStreamConf.ResponseLimit, "state-stream-response-limit", defaultConfig.stateStreamConf.ResponseLimit, "max number of responses per second to send over streaming endpoints. this helps manage resources consumed by each client querying data not in the cache e.g. 3 or 0.5. 0 means no limit")

flags.UintVar(&builder.TxResultCacheSize, "transaction-result-cache-size", defaultConfig.TxResultCacheSize, "transaction result cache size.(Disabled by default i.e 0)")
}).ValidateFlags(func() error {
if builder.supportsObserver && (builder.PublicNetworkConfig.BindAddress == cmd.NotSet || builder.PublicNetworkConfig.BindAddress == "") {
return errors.New("public-network-address must be set if supports-observer is true")
Expand Down Expand Up @@ -1091,28 +1094,30 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
),
}

backend := backend.New(
node.State,
builder.CollectionRPC,
builder.HistoricalAccessRPCs,
node.Storage.Blocks,
node.Storage.Headers,
node.Storage.Collections,
node.Storage.Transactions,
node.Storage.Receipts,
node.Storage.Results,
node.RootChainID,
builder.AccessMetrics,
connFactory,
builder.retryEnabled,
backendConfig.MaxHeightRange,
backendConfig.PreferredExecutionNodeIDs,
backendConfig.FixedExecutionNodeIDs,
node.Logger,
backend.DefaultSnapshotHistoryLimit,
backendConfig.ArchiveAddressList,
backendConfig.ScriptExecValidation,
backendConfig.CircuitBreakerConfig.Enabled)
nodeBackend := backend.New(backend.Params{
State: node.State,
CollectionRPC: builder.CollectionRPC,
HistoricalAccessNodes: builder.HistoricalAccessRPCs,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
ChainID: node.RootChainID,
AccessMetrics: builder.AccessMetrics,
ConnFactory: connFactory,
RetryEnabled: builder.retryEnabled,
MaxHeightRange: backendConfig.MaxHeightRange,
PreferredExecutionNodeIDs: backendConfig.PreferredExecutionNodeIDs,
FixedExecutionNodeIDs: backendConfig.FixedExecutionNodeIDs,
Log: node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
ArchiveAddressList: backendConfig.ArchiveAddressList,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
ScriptExecValidation: backendConfig.ScriptExecValidation,
TxResultCacheSize: builder.TxResultCacheSize,
})

engineBuilder, err := rpc.NewBuilder(
node.Logger,
Expand All @@ -1122,8 +1127,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.AccessMetrics,
builder.rpcMetricsEnabled,
builder.Me,
backend,
backend,
nodeBackend,
nodeBackend,
builder.secureGrpcServer,
builder.unsecureGrpcServer,
)
Expand Down
52 changes: 26 additions & 26 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/rs/zerolog"
"github.com/spf13/pflag"

"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/consensus"
"github.com/onflow/flow-go/consensus/hotstuff"
Expand Down Expand Up @@ -73,6 +70,8 @@ import (
"github.com/onflow/flow-go/state/protocol/events/gadgets"
"github.com/onflow/flow-go/utils/grpcutils"
"github.com/onflow/flow-go/utils/io"
"github.com/rs/zerolog"
"github.com/spf13/pflag"
)

// ObserverBuilder extends cmd.NodeBuilder and declares additional functions needed to bootstrap an Access node
Expand Down Expand Up @@ -925,29 +924,30 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
backendConfig.CircuitBreakerConfig,
),
}

accessBackend := backend.New(
node.State,
nil,
nil,
node.Storage.Blocks,
node.Storage.Headers,
node.Storage.Collections,
node.Storage.Transactions,
node.Storage.Receipts,
node.Storage.Results,
node.RootChainID,
accessMetrics,
connFactory,
false,
backendConfig.MaxHeightRange,
backendConfig.PreferredExecutionNodeIDs,
backendConfig.FixedExecutionNodeIDs,
node.Logger,
backend.DefaultSnapshotHistoryLimit,
backendConfig.ArchiveAddressList,
backendConfig.ScriptExecValidation,
backendConfig.CircuitBreakerConfig.Enabled)



accessBackend := backend.New(backend.Params{
State:node.State,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
ChainID: node.RootChainID,
AccessMetrics: accessMetrics,
ConnFactory: connFactory,
RetryEnabled: false,
MaxHeightRange: backendConfig.MaxHeightRange,
PreferredExecutionNodeIDs: backendConfig.PreferredExecutionNodeIDs,
FixedExecutionNodeIDs: backendConfig.FixedExecutionNodeIDs,
Log:node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
ArchiveAddressList: backendConfig.ArchiveAddressList,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
ScriptExecValidation: backendConfig.ScriptExecValidation,
})

observerCollector := metrics.NewObserverCollector()
restHandler, err := restapiproxy.NewRestProxyHandler(
Expand Down
Loading

0 comments on commit c2938e4

Please sign in to comment.