Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Update websockets event streaming to return JSON-CDC encoded events #5048

Merged
merged 3 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 39 additions & 15 deletions engine/access/rest/routes/subscribe_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (

"golang.org/x/exp/slices"

jsoncdc "github.com/onflow/cadence/encoding/json"
"github.com/onflow/flow/protobuf/go/flow/entities"
mocks "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand All @@ -24,6 +26,7 @@ import (
mockstatestream "github.com/onflow/flow-go/engine/access/state_stream/mock"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/utils/unittest"
"github.com/onflow/flow-go/utils/unittest/generator"
)

type testType struct {
Expand Down Expand Up @@ -66,6 +69,9 @@ func (s *SubscribeEventsSuite) SetupTest() {
s.blocks = make([]*flow.Block, 0, blockCount)
s.blockEvents = make(map[flow.Identifier]flow.EventsList, blockCount)

// by default, events are in CCF encoding
eventsGenerator := generator.EventGenerator(generator.WithEncoding(entities.EventEncodingVersion_CCF_V0))

for i := 0; i < blockCount; i++ {
block := unittest.BlockWithParentFixture(parent)
// update for next iteration
Expand All @@ -74,6 +80,11 @@ func (s *SubscribeEventsSuite) SetupTest() {
result := unittest.ExecutionResultFixture()
blockEvents := unittest.BlockEventsFixture(block.Header, (i%len(testEventTypes))*3+1, testEventTypes...)

// update payloads with valid CCF encoded data
for i := range blockEvents.Events {
blockEvents.Events[i].Payload = eventsGenerator.New().Payload
}

s.blocks = append(s.blocks, block)
s.blockEvents[block.ID()] = blockEvents.Events

Expand Down Expand Up @@ -171,26 +182,35 @@ func (s *SubscribeEventsSuite) TestSubscribeEvents() {

// construct expected event responses based on the provided test configuration
for i, block := range s.blocks {
if startBlockFound || block.ID() == test.startBlockID {
blockID := block.ID()
if startBlockFound || blockID == test.startBlockID {
startBlockFound = true
if test.startHeight == request.EmptyHeight || block.Header.Height >= test.startHeight {
eventsForBlock := flow.EventsList{}
for _, event := range s.blockEvents[block.ID()] {
// track 2 lists, one for the expected results and one that is passed back
// from the subscription to the handler. These cannot be shared since the
// response struct is passed by reference from the mock to the handler, so
// a bug within the handler could go unnoticed
expectedEvents := flow.EventsList{}
subscriptionEvents := flow.EventsList{}
for _, event := range s.blockEvents[blockID] {
if slices.Contains(test.eventTypes, string(event.Type)) ||
len(test.eventTypes) == 0 { //Include all events
eventsForBlock = append(eventsForBlock, event)
len(test.eventTypes) == 0 { // Include all events
expectedEvents = append(expectedEvents, event)
subscriptionEvents = append(subscriptionEvents, event)
}
}
eventResponse := &backend.EventsResponse{
Height: block.Header.Height,
BlockID: block.ID(),
Events: eventsForBlock,
}

if len(eventsForBlock) > 0 || (i+1)%int(test.heartbeatInterval) == 0 {
expectedEventsResponses = append(expectedEventsResponses, eventResponse)
if len(expectedEvents) > 0 || (i+1)%int(test.heartbeatInterval) == 0 {
expectedEventsResponses = append(expectedEventsResponses, &backend.EventsResponse{
Height: block.Header.Height,
BlockID: blockID,
Events: expectedEvents,
})
}
subscriptionEventsResponses = append(subscriptionEventsResponses, eventResponse)
subscriptionEventsResponses = append(subscriptionEventsResponses, &backend.EventsResponse{
Height: block.Header.Height,
BlockID: blockID,
Events: subscriptionEvents,
})
}
}
}
Expand Down Expand Up @@ -410,7 +430,11 @@ func requireResponse(t *testing.T, recorder *testHijackResponseRecorder, expecte
require.Equal(t, expectedEvent.TransactionID, actualEvent.TransactionID)
require.Equal(t, expectedEvent.TransactionIndex, actualEvent.TransactionIndex)
require.Equal(t, expectedEvent.EventIndex, actualEvent.EventIndex)
require.Equal(t, expectedEvent.Payload, actualEvent.Payload)
// payload is not expected to match, but it should decode

// payload must decode to valid json-cdc encoded data
_, err := jsoncdc.Decode(nil, actualEvent.Payload)
require.NoError(t, err)
}
}
}
13 changes: 13 additions & 0 deletions engine/access/rest/routes/websocket_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/onflow/flow-go/engine/access/rest/request"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
)

Expand Down Expand Up @@ -158,6 +159,18 @@ func (wsController *WebsocketController) writeEvents(sub state_stream.Subscripti
blocksSinceLastMessage = 0
}

// EventsResponse contains CCF encoded events, and this API returns JSON-CDC events.
// convert event payload formats.
for i, e := range resp.Events {
payload, err := convert.CcfPayloadToJsonPayload(e.Payload)
if err != nil {
err = fmt.Errorf("could not convert event payload from CCF to Json: %w", err)
wsController.wsErrorHandler(err)
return
}
resp.Events[i].Payload = payload
}

// Write the response to the WebSocket connection
err = wsController.conn.WriteJSON(event)
if err != nil {
Expand Down
Loading