Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Implement gRPC streaming endpoint SubscribeBlocks #5307

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
8e24fde
Merged with master
UlyanaAndrukhiv Dec 19, 2023
a2f4e9c
Replaced protobuf version to new one
UlyanaAndrukhiv Dec 21, 2023
7f8356f
Implemented sealed blocks retriving
Guitarheroua Dec 27, 2023
118361f
Updated protobuf, added tests for finalized subscribe blocks
UlyanaAndrukhiv Dec 27, 2023
f6f2a16
Merged with stream-sealed
UlyanaAndrukhiv Dec 27, 2023
9f5e399
Refactored structs and naming
UlyanaAndrukhiv Dec 27, 2023
20e7d23
Make universal functions in for block watcher
Guitarheroua Dec 27, 2023
31d3f1e
Added checks for block status
UlyanaAndrukhiv Dec 27, 2023
69b04f1
Merge branch 'merge-stream-sealed' of github.com:Guitarheroua/flow-go…
UlyanaAndrukhiv Dec 27, 2023
ab097a6
Added test for subscribe blocks backend
UlyanaAndrukhiv Jan 3, 2024
c93df5b
Added more comments to subscribe blocks impl
UlyanaAndrukhiv Jan 4, 2024
d84b1d4
Updated subscriptionOnFinalizedBlock process
UlyanaAndrukhiv Jan 29, 2024
7ba6f9d
Updated protobuf
UlyanaAndrukhiv Jan 29, 2024
aa09fb1
Updated protobuf module
UlyanaAndrukhiv Jan 29, 2024
a7358b0
Reverted back unnecessary changes
UlyanaAndrukhiv Jan 29, 2024
5399dd3
Updated according to comments
UlyanaAndrukhiv Jan 30, 2024
44ebeb8
Added godoc for GetStartHeight
UlyanaAndrukhiv Jan 30, 2024
a9d99a1
Removed default block status for subscribe blocks
UlyanaAndrukhiv Jan 30, 2024
8d3af7e
Merged with master
UlyanaAndrukhiv Jan 30, 2024
98daa77
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Jan 30, 2024
579416a
Merged with master
UlyanaAndrukhiv Jan 30, 2024
ceebdcb
Fixed rpc backend tests, mocked seal head state when rpc backend creates
UlyanaAndrukhiv Jan 31, 2024
a168107
Renamed BlockWatcher to ChainStateTracker, added interface for it, ad…
UlyanaAndrukhiv Feb 6, 2024
79c9386
Updated generate-mocks
UlyanaAndrukhiv Feb 6, 2024
ecfa3bb
Updated version of flow-emulator
UlyanaAndrukhiv Feb 7, 2024
33d2526
Merged with master
UlyanaAndrukhiv Feb 14, 2024
e66c9cc
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Feb 14, 2024
0f2c249
Fixed dependencies
UlyanaAndrukhiv Feb 14, 2024
d06382b
Added updated protobuf for insecure
UlyanaAndrukhiv Feb 14, 2024
5a6c4d0
refactored creating ChainStateTracker, updated tests
UlyanaAndrukhiv Feb 16, 2024
536509a
Reverted back changes in backend_test
UlyanaAndrukhiv Feb 16, 2024
d07bc45
Updated last commit
UlyanaAndrukhiv Feb 16, 2024
52a1579
Merged with master
UlyanaAndrukhiv Feb 16, 2024
2bd3a79
Updated test
UlyanaAndrukhiv Feb 16, 2024
254aa51
Reverted back unnecessary changes
UlyanaAndrukhiv Feb 16, 2024
e2460d0
Implemented SubscribeBlockHeaders and SubscribeBlockDigests
UlyanaAndrukhiv Feb 20, 2024
ae56423
Refactored block and block headers subscribes
UlyanaAndrukhiv Feb 20, 2024
022e50e
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Feb 20, 2024
6c04830
Added more comments, updated backendSubscribeBlocks
UlyanaAndrukhiv Feb 20, 2024
2fdcbb7
Updated protobuf version
UlyanaAndrukhiv Feb 22, 2024
9c6ac0b
Moved index package to engine/access/index
UlyanaAndrukhiv Feb 22, 2024
cd8e6f9
Updated godoc, logs, errors andtests according to comments
UlyanaAndrukhiv Feb 22, 2024
03052b6
Added getBlockHeader method to avoid extra db lookup
UlyanaAndrukhiv Feb 22, 2024
6746499
Merged with master
UlyanaAndrukhiv Feb 22, 2024
c194f8d
Added irrecoverable handling errors for GetStartHeight
UlyanaAndrukhiv Feb 23, 2024
d19ed31
Removed multierror according to comment
UlyanaAndrukhiv Feb 23, 2024
cc46ed2
Updated errors
UlyanaAndrukhiv Feb 23, 2024
4557080
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Feb 23, 2024
1578064
Updated comment in chain_state_tracker.go
UlyanaAndrukhiv Feb 27, 2024
2c2ca8c
Renamed LightweightBlock structure to BlockDigest
UlyanaAndrukhiv Feb 27, 2024
76645bf
Updated handling errors on backend stream blocks
UlyanaAndrukhiv Feb 27, 2024
91f1663
Updated access handler according to commits
UlyanaAndrukhiv Feb 27, 2024
3489349
Merge branch 'master' of github.com:UlyanaAndrukhiv/flow-go into Ulya…
UlyanaAndrukhiv Feb 27, 2024
b629916
Merged with master
UlyanaAndrukhiv Feb 27, 2024
f7388fb
Split chain state tracker into multiple trackers, updated tests
UlyanaAndrukhiv Feb 27, 2024
2c19e2e
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Feb 27, 2024
422329e
Updated handler and protobuf
UlyanaAndrukhiv Feb 28, 2024
f1e66ac
Updated flow-go emulator version
UlyanaAndrukhiv Feb 28, 2024
0c6aa2e
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Feb 28, 2024
c1530f2
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Feb 28, 2024
1676f44
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Feb 28, 2024
b76bdbb
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
franklywatson Feb 28, 2024
f193119
Merged with master, moved transaction result index to index folder
UlyanaAndrukhiv Feb 28, 2024
528f267
Linted
UlyanaAndrukhiv Feb 28, 2024
0698b7c
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Feb 29, 2024
7e0bd39
Updated protobuf version
UlyanaAndrukhiv Feb 29, 2024
a3bbe3b
Updated godoc and errors according to comments
UlyanaAndrukhiv Feb 29, 2024
3b8d311
Added more godoc, updated according to comments
UlyanaAndrukhiv Feb 29, 2024
0d913b8
Updated namimg in tests, updated block tracker constructor
UlyanaAndrukhiv Feb 29, 2024
c7eb4d1
Refactored trackers
UlyanaAndrukhiv Feb 29, 2024
1115cfa
Removed unused variable
UlyanaAndrukhiv Feb 29, 2024
d5d812a
Updated godoc to trackers, added fixed according to comments
UlyanaAndrukhiv Feb 29, 2024
61323b8
Updated godoc
UlyanaAndrukhiv Feb 29, 2024
7f9b5bc
Added rest of updates according to comments
UlyanaAndrukhiv Mar 1, 2024
0548595
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Mar 1, 2024
593ff54
Updated flow-emulator version
UlyanaAndrukhiv Mar 1, 2024
ae3b494
Merge branch 'UlyanaAndrukhiv/subscribe-blocks' of github.com:UlyanaA…
UlyanaAndrukhiv Mar 1, 2024
139f082
Updated protobuf for integration tests
UlyanaAndrukhiv Mar 1, 2024
2b7b592
First batch of updates according to comments
UlyanaAndrukhiv Mar 5, 2024
4f592a7
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Mar 5, 2024
0b658f0
Moved broadcaster from state_stream engine to executionDataTracker ac…
UlyanaAndrukhiv Mar 5, 2024
af480e7
Updated protobuf and access handler according to comments
UlyanaAndrukhiv Mar 5, 2024
509238e
Refactoredd handler for subscriptions, updated BlockDigest according …
UlyanaAndrukhiv Mar 5, 2024
11c944f
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
franklywatson Mar 5, 2024
915da03
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Mar 6, 2024
3cb9be2
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Mar 6, 2024
3f1e02e
Updated flow-emulator version
UlyanaAndrukhiv Mar 7, 2024
365b642
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Mar 7, 2024
3e6563e
Added batch first of updates according to comments
UlyanaAndrukhiv Mar 8, 2024
d4fcbc6
Refactored handler.go according to suggested comment
UlyanaAndrukhiv Mar 8, 2024
852246a
Updated access api, according to comment, updated tests, moved common…
UlyanaAndrukhiv Mar 11, 2024
41248d5
Splitted handling the start block, updated godoc, updated tests
UlyanaAndrukhiv Mar 11, 2024
9020ed7
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Mar 11, 2024
cf65415
Generated mocks
UlyanaAndrukhiv Mar 11, 2024
2112249
Merge branch 'UlyanaAndrukhiv/subscribe-blocks' of github.com:UlyanaA…
UlyanaAndrukhiv Mar 11, 2024
fa2a1eb
Updated flow-emulator version
UlyanaAndrukhiv Mar 11, 2024
4f20305
Merged with master, added execution data tracker to observer
UlyanaAndrukhiv Mar 13, 2024
4bed022
Updated according to comments
UlyanaAndrukhiv Mar 13, 2024
a1241bd
Removed OnExecutionData from state stream engine
UlyanaAndrukhiv Mar 13, 2024
2f8254d
Moved fatal log inside execution data tracker, removed unnecessary Ge…
UlyanaAndrukhiv Mar 13, 2024
9675f59
Fixed error code in godoc
UlyanaAndrukhiv Mar 13, 2024
ffe008c
Merge branch 'master' into UlyanaAndrukhiv/subscribe-blocks
UlyanaAndrukhiv Mar 13, 2024
39d6307
Updated according to last comment
UlyanaAndrukhiv Mar 13, 2024
4b23c0a
Updated flow-emulator
UlyanaAndrukhiv Mar 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions access/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/entities"

"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
)
Expand Down Expand Up @@ -52,6 +53,10 @@ type API interface {

GetExecutionResultForBlockID(ctx context.Context, blockID flow.Identifier) (*flow.ExecutionResult, error)
GetExecutionResultByID(ctx context.Context, id flow.Identifier) (*flow.ExecutionResult, error)

// SubscribeBlocks subscribes to the blocks starting from a specific block ID and block height, with an optional block status.
// By default, returns sealed blocks.
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
SubscribeBlocks(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, blockStatus flow.BlockStatus) subscription.Subscription
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO: Combine this with flow.TransactionResult?
Expand Down
83 changes: 82 additions & 1 deletion access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package access

import (
"context"
"fmt"
"sync/atomic"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/signature"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
Expand All @@ -17,6 +21,7 @@ import (
)

type Handler struct {
subscription.StreamingData
api API
chain flow.Chain
signerIndicesDecoder hotstuff.BlockSignerDecoder
Expand All @@ -29,8 +34,17 @@ type HandlerOption func(*Handler)

var _ access.AccessAPIServer = (*Handler)(nil)

func NewHandler(api API, chain flow.Chain, finalizedHeader module.FinalizedHeaderCache, me module.Local, options ...HandlerOption) *Handler {
func NewHandler(api API,
chain flow.Chain,
finalizedHeader module.FinalizedHeaderCache,
me module.Local,
maxStreams uint32,
options ...HandlerOption) *Handler {
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
h := &Handler{
StreamingData: subscription.StreamingData{
MaxStreams: int32(maxStreams),
StreamCount: atomic.Int32{},
},
api: api,
chain: chain,
finalizedHeaderCache: finalizedHeader,
Expand Down Expand Up @@ -701,6 +715,66 @@ func (h *Handler) GetExecutionResultByID(ctx context.Context, req *access.GetExe
}, nil
}

// SubscribeBlocks handles subscription requests for blocks.
// It takes a SubscribeBlocksRequest and an AccessAPI_SubscribeBlocksServer stream as input.
// The handler manages the subscription to block updates and sends the subscribed block information
// to the client via the provided stream.
func (h *Handler) SubscribeBlocks(request *access.SubscribeBlocksRequest, stream access.AccessAPI_SubscribeBlocksServer) error {
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
// check if the maximum number of streams is reached
if h.StreamCount.Load() >= h.MaxStreams {
return status.Errorf(codes.ResourceExhausted, "maximum number of streams reached")
}
h.StreamCount.Add(1)
defer h.StreamCount.Add(-1)

startBlockID := flow.ZeroID
if request.GetStartBlockId() != nil {
blockID, err := convert.BlockID(request.GetStartBlockId())
if err != nil {
return status.Errorf(codes.InvalidArgument, "could not convert start block ID: %v", err)
}
startBlockID = blockID
}

blockStatus := convert.MessageToBlockStatus(request.BlockStatus)
err := checkBlockStatus(blockStatus)
if err != nil {
return status.Errorf(codes.InvalidArgument, "invalid block status argument: %v", err)
}

sub := h.api.SubscribeBlocks(stream.Context(), startBlockID, request.GetStartBlockHeight(), blockStatus)
for {
v, ok := <-sub.Channel()
if !ok {
if sub.Err() != nil {
return rpc.ConvertError(sub.Err(), "stream encountered an error", codes.Internal)
}
return nil
}

blockResp, ok := v.(*flow.Block)
if !ok {
return status.Errorf(codes.Internal, "unexpected response type: %T", v)
}

resp, err := h.blockResponse(blockResp, request.GetFullBlockResponse(), blockStatus)
if err != nil {
return rpc.ConvertError(err, "could not convert block to message", codes.Internal)
}

err = stream.Send(&access.SubscribeBlocksResponse{
Block: resp.Block,
})
if err != nil {
return rpc.ConvertError(err, "could not send response", codes.Internal)
}
}
}

func (h *Handler) SendAndSubscribeTransactionStatuses(request *access.SendAndSubscribeTransactionStatusesRequest, stream access.AccessAPI_SendAndSubscribeTransactionStatusesServer) error {
panic("not implemented!")
}

func (h *Handler) blockResponse(block *flow.Block, fullResponse bool, status flow.BlockStatus) (*access.BlockResponse, error) {
metadata := h.buildMetadataResponse()

Expand Down Expand Up @@ -777,3 +851,10 @@ func WithBlockSignerDecoder(signerIndicesDecoder hotstuff.BlockSignerDecoder) fu
handler.signerIndicesDecoder = signerIndicesDecoder
}
}

func checkBlockStatus(blockStatus flow.BlockStatus) error {
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
if blockStatus == flow.BlockStatusUnknown {
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("block status is unlnown. Possible variants: BLOCK_FINALIZED, BLOCK_SEALED")
}
return nil
}
18 changes: 18 additions & 0 deletions access/mock/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 28 additions & 11 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
rpcConnection "github.com/onflow/flow-go/engine/access/rpc/connection"
"github.com/onflow/flow-go/engine/access/state_stream"
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
followereng "github.com/onflow/flow-go/engine/common/follower"
"github.com/onflow/flow-go/engine/common/requester"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
Expand Down Expand Up @@ -84,7 +85,7 @@ import (
"github.com/onflow/flow-go/network/p2p/p2pbuilder"
p2pconfig "github.com/onflow/flow-go/network/p2p/p2pbuilder/config"
"github.com/onflow/flow-go/network/p2p/p2pnet"
"github.com/onflow/flow-go/network/p2p/subscription"
networkingsubscription "github.com/onflow/flow-go/network/p2p/subscription"
"github.com/onflow/flow-go/network/p2p/translator"
"github.com/onflow/flow-go/network/p2p/unicast/protocols"
relaynet "github.com/onflow/flow-go/network/relay"
Expand Down Expand Up @@ -147,6 +148,7 @@ type AccessNodeConfig struct {
registersDBPath string
checkpointFile string
scriptExecutorConfig query.QueryConfig
broadcaster *engine.Broadcaster
}

type PublicNetworkConfig struct {
Expand Down Expand Up @@ -195,14 +197,14 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
},
stateStreamConf: statestreambackend.Config{
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
ExecutionDataCacheSize: state_stream.DefaultCacheSize,
ClientSendTimeout: state_stream.DefaultSendTimeout,
ClientSendBufferSize: state_stream.DefaultSendBufferSize,
MaxGlobalStreams: state_stream.DefaultMaxGlobalStreams,
ExecutionDataCacheSize: subscription.DefaultCacheSize,
ClientSendTimeout: subscription.DefaultSendTimeout,
ClientSendBufferSize: subscription.DefaultSendBufferSize,
MaxGlobalStreams: subscription.DefaultMaxGlobalStreams,
EventFilterConfig: state_stream.DefaultEventFilterConfig,
ResponseLimit: state_stream.DefaultResponseLimit,
HeartbeatInterval: state_stream.DefaultHeartbeatInterval,
RegisterIDsRequestLimit: state_stream.DefaultRegisterIDsRequestLimit,
ResponseLimit: subscription.DefaultResponseLimit,
HeartbeatInterval: subscription.DefaultHeartbeatInterval,
},
stateStreamFilterConf: nil,
ExecutionNodeAddress: "localhost:9000",
Expand Down Expand Up @@ -236,6 +238,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
registersDBPath: filepath.Join(homedir, ".flow", "execution_state"),
checkpointFile: cmd.NotSet,
scriptExecutorConfig: query.NewDefaultConfig(),
broadcaster: nil,
}
}

Expand Down Expand Up @@ -811,7 +814,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
if err != nil {
return nil, fmt.Errorf("could not get highest consecutive height: %w", err)
}
broadcaster := engine.NewBroadcaster()
builder.broadcaster = engine.NewBroadcaster()
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved

builder.stateStreamBackend, err = statestreambackend.New(
node.Logger,
Expand All @@ -822,7 +825,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
node.Storage.Results,
builder.ExecutionDataStore,
executionDataStoreCache,
broadcaster,
builder.broadcaster,
builder.executionDataConfig.InitialBlockHeight,
highestAvailableHeight,
builder.RegistersAsyncStore)
Expand All @@ -838,7 +841,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
node.RootChainID,
builder.stateStreamGrpcServer,
builder.stateStreamBackend,
broadcaster,
builder.broadcaster,
)
if err != nil {
return nil, fmt.Errorf("could not create state stream engine: %w", err)
Expand Down Expand Up @@ -1439,6 +1442,11 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil, fmt.Errorf("could not parse script execution mode: %w", err)
}

highestAvailableHeight, err := builder.ExecutionDataRequester.HighestConsecutiveHeight()
if err != nil {
return nil, fmt.Errorf("could not get highest consecutive height: %w", err)
}

nodeBackend, err := backend.New(backend.Params{
State: node.State,
CollectionRPC: builder.CollectionRPC,
Expand All @@ -1463,6 +1471,15 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
TxErrorMessagesCacheSize: builder.TxErrorMessagesCacheSize,
ScriptExecutor: builder.ScriptExecutor,
ScriptExecutionMode: scriptExecMode,
SubscriptionParams: backend.SubscriptionParams{
Broadcaster: builder.broadcaster,
SendTimeout: builder.stateStreamConf.ClientSendTimeout,
ResponseLimit: builder.stateStreamConf.ResponseLimit,
SendBufferSize: int(builder.stateStreamConf.ClientSendBufferSize),
RootHeight: builder.executionDataConfig.InitialBlockHeight,
HighestAvailableHeight: highestAvailableHeight,
Seals: node.Storage.Seals,
},
})
if err != nil {
return nil, fmt.Errorf("could not initialize backend: %w", err)
Expand Down Expand Up @@ -1725,7 +1742,7 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
Unicast: builder.FlowConfig.NetworkConfig.Unicast,
}).
SetBasicResolver(builder.Resolver).
SetSubscriptionFilter(subscription.NewRoleBasedFilter(flow.RoleAccess, builder.IdentityProvider)).
SetSubscriptionFilter(networkingsubscription.NewRoleBasedFilter(flow.RoleAccess, builder.IdentityProvider)).
SetConnectionManager(connManager).
SetRoutingSystem(func(ctx context.Context, h host.Host) (routing.Routing, error) {
return dht.NewDHT(ctx, h, protocols.FlowPublicDHTProtocolID(builder.SporkID), builder.Logger, networkMetrics, dht.AsServer())
Expand Down
10 changes: 6 additions & 4 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
accessmock "github.com/onflow/flow-go/engine/access/mock"
"github.com/onflow/flow-go/engine/access/rpc/backend"
connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/factory"
Expand Down Expand Up @@ -169,6 +170,7 @@ func (suite *Suite) RunTest(
suite.chainID.Chain(),
suite.finalizedHeaderCache,
suite.me,
subscription.DefaultMaxGlobalStreams,
access.WithBlockSignerDecoder(suite.signerIndicesDecoder),
)
f(handler, db, all)
Expand Down Expand Up @@ -336,7 +338,7 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() {
})
require.NoError(suite.T(), err)

handler := access.NewHandler(bnd, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me)
handler := access.NewHandler(bnd, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, subscription.DefaultMaxGlobalStreams)

// Send transaction 1
resp, err := handler.SendTransaction(context.Background(), sendReq1)
Expand Down Expand Up @@ -658,7 +660,7 @@ func (suite *Suite) TestGetSealedTransaction() {
})
require.NoError(suite.T(), err)

handler := access.NewHandler(bnd, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me)
handler := access.NewHandler(bnd, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, subscription.DefaultMaxGlobalStreams)

// create the ingest engine
ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, all.Blocks, all.Headers, collections,
Expand Down Expand Up @@ -796,7 +798,7 @@ func (suite *Suite) TestGetTransactionResult() {
})
require.NoError(suite.T(), err)

handler := access.NewHandler(bnd, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me)
handler := access.NewHandler(bnd, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, subscription.DefaultMaxGlobalStreams)

// create the ingest engine
ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, all.Blocks, all.Headers, collections,
Expand Down Expand Up @@ -989,7 +991,7 @@ func (suite *Suite) TestExecuteScript() {
})
require.NoError(suite.T(), err)

handler := access.NewHandler(suite.backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me)
handler := access.NewHandler(suite.backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, subscription.DefaultMaxGlobalStreams)

// initialize metrics related storage
metrics := metrics.NewNoopCollector()
Expand Down
8 changes: 8 additions & 0 deletions engine/access/apiproxy/access_api_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ func (h *FlowAccessAPIRouter) GetExecutionResultByID(context context.Context, re
return res, err
}

func (h *FlowAccessAPIRouter) SubscribeBlocks(req *access.SubscribeBlocksRequest, server access.AccessAPI_SubscribeBlocksServer) error {
panic("not implemented")
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
}

func (h *FlowAccessAPIRouter) SendAndSubscribeTransactionStatuses(req *access.SendAndSubscribeTransactionStatusesRequest, server access.AccessAPI_SendAndSubscribeTransactionStatusesServer) error {
panic("not implemented")
}

// FlowAccessAPIForwarder forwards all requests to a set of upstream access nodes or observers
type FlowAccessAPIForwarder struct {
*forwarder.Forwarder
Expand Down
8 changes: 4 additions & 4 deletions engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
accessmock "github.com/onflow/flow-go/engine/access/mock"
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
"github.com/onflow/flow-go/engine/access/state_stream"
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/blobs"
"github.com/onflow/flow-go/module/execution"
Expand Down Expand Up @@ -118,7 +118,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {

suite.broadcaster = engine.NewBroadcaster()

suite.execDataHeroCache = herocache.NewBlockExecutionData(state_stream.DefaultCacheSize, suite.log, metrics.NewNoopCollector())
suite.execDataHeroCache = herocache.NewBlockExecutionData(subscription.DefaultCacheSize, suite.log, metrics.NewNoopCollector())
suite.execDataCache = cache.NewExecutionDataCache(suite.eds, suite.headers, suite.seals, suite.results, suite.execDataHeroCache)

accessIdentity := unittest.IdentityFixture(unittest.WithRole(flow.RoleAccess))
Expand Down Expand Up @@ -232,8 +232,8 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
).Maybe()

conf := statestreambackend.Config{
ClientSendTimeout: state_stream.DefaultSendTimeout,
ClientSendBufferSize: state_stream.DefaultSendBufferSize,
ClientSendTimeout: subscription.DefaultSendTimeout,
ClientSendBufferSize: subscription.DefaultSendBufferSize,
}

stateStreamBackend, err := statestreambackend.New(
Expand Down
Loading