From a974d48cdef9bf5d71cdf1f4a8cbe62b7edeabf4 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Tue, 21 Nov 2023 17:30:03 -0800 Subject: [PATCH 1/2] [Access] Update websockets event streaming to return JSON-CDC encoded events --- .../access/rest/routes/subscribe_events_test.go | 15 +++++++++++++++ engine/access/rest/routes/websocket_handler.go | 15 +++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/engine/access/rest/routes/subscribe_events_test.go b/engine/access/rest/routes/subscribe_events_test.go index ec48ac0586b..adc716ae6d1 100644 --- a/engine/access/rest/routes/subscribe_events_test.go +++ b/engine/access/rest/routes/subscribe_events_test.go @@ -14,6 +14,8 @@ import ( "golang.org/x/exp/slices" + jsoncdc "github.com/onflow/cadence/encoding/json" + "github.com/onflow/flow/protobuf/go/flow/entities" mocks "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -24,6 +26,7 @@ import ( mockstatestream "github.com/onflow/flow-go/engine/access/state_stream/mock" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" + "github.com/onflow/flow-go/utils/unittest/generator" ) type testType struct { @@ -66,6 +69,9 @@ func (s *SubscribeEventsSuite) SetupTest() { s.blocks = make([]*flow.Block, 0, blockCount) s.blockEvents = make(map[flow.Identifier]flow.EventsList, blockCount) + // by default, events are in CCF encoding + eventsGenerator := generator.EventGenerator(generator.WithEncoding(entities.EventEncodingVersion_CCF_V0)) + for i := 0; i < blockCount; i++ { block := unittest.BlockWithParentFixture(parent) // update for next iteration @@ -74,6 +80,11 @@ func (s *SubscribeEventsSuite) SetupTest() { result := unittest.ExecutionResultFixture() blockEvents := unittest.BlockEventsFixture(block.Header, (i%len(testEventTypes))*3+1, testEventTypes...) + // update payloads with valid CCF encoded data + for i := range blockEvents.Events { + blockEvents.Events[i].Payload = eventsGenerator.New().Payload + } + s.blocks = append(s.blocks, block) s.blockEvents[block.ID()] = blockEvents.Events @@ -411,6 +422,10 @@ func requireResponse(t *testing.T, recorder *testHijackResponseRecorder, expecte require.Equal(t, expectedEvent.TransactionIndex, actualEvent.TransactionIndex) require.Equal(t, expectedEvent.EventIndex, actualEvent.EventIndex) require.Equal(t, expectedEvent.Payload, actualEvent.Payload) + + // payload must decode to valid json-cdc encoded data + _, err := jsoncdc.Decode(nil, actualEvent.Payload) + require.NoError(t, err) } } } diff --git a/engine/access/rest/routes/websocket_handler.go b/engine/access/rest/routes/websocket_handler.go index 063cc4ed5c4..818e65da00d 100644 --- a/engine/access/rest/routes/websocket_handler.go +++ b/engine/access/rest/routes/websocket_handler.go @@ -15,6 +15,7 @@ import ( "github.com/onflow/flow-go/engine/access/rest/request" "github.com/onflow/flow-go/engine/access/state_stream" "github.com/onflow/flow-go/engine/access/state_stream/backend" + "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" ) @@ -158,6 +159,20 @@ func (wsController *WebsocketController) writeEvents(sub state_stream.Subscripti blocksSinceLastMessage = 0 } + // EventsResponse contains CCF encoded events, and this API returns JSON-CDC events. + // convert event payload formats. + events := make([]flow.Event, len(resp.Events)) + for i, e := range resp.Events { + payload, err := convert.CcfPayloadToJsonPayload(e.Payload) + if err != nil { + err = fmt.Errorf("could not convert event payload from CCF to Json: %w", err) + wsController.wsErrorHandler(err) + return + } + events[i].Payload = payload + } + resp.Events = events + // Write the response to the WebSocket connection err = wsController.conn.WriteJSON(event) if err != nil { From 5eb72b7ab5251383681c8daf5228b436d9ed65f9 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Wed, 22 Nov 2023 15:27:54 -0800 Subject: [PATCH 2/2] Fix bug in reencoding and update tests --- .../rest/routes/subscribe_events_test.go | 39 ++++++++++++------- .../access/rest/routes/websocket_handler.go | 4 +- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/engine/access/rest/routes/subscribe_events_test.go b/engine/access/rest/routes/subscribe_events_test.go index adc716ae6d1..0b5626c64b2 100644 --- a/engine/access/rest/routes/subscribe_events_test.go +++ b/engine/access/rest/routes/subscribe_events_test.go @@ -182,26 +182,35 @@ func (s *SubscribeEventsSuite) TestSubscribeEvents() { // construct expected event responses based on the provided test configuration for i, block := range s.blocks { - if startBlockFound || block.ID() == test.startBlockID { + blockID := block.ID() + if startBlockFound || blockID == test.startBlockID { startBlockFound = true if test.startHeight == request.EmptyHeight || block.Header.Height >= test.startHeight { - eventsForBlock := flow.EventsList{} - for _, event := range s.blockEvents[block.ID()] { + // track 2 lists, one for the expected results and one that is passed back + // from the subscription to the handler. These cannot be shared since the + // response struct is passed by reference from the mock to the handler, so + // a bug within the handler could go unnoticed + expectedEvents := flow.EventsList{} + subscriptionEvents := flow.EventsList{} + for _, event := range s.blockEvents[blockID] { if slices.Contains(test.eventTypes, string(event.Type)) || - len(test.eventTypes) == 0 { //Include all events - eventsForBlock = append(eventsForBlock, event) + len(test.eventTypes) == 0 { // Include all events + expectedEvents = append(expectedEvents, event) + subscriptionEvents = append(subscriptionEvents, event) } } - eventResponse := &backend.EventsResponse{ - Height: block.Header.Height, - BlockID: block.ID(), - Events: eventsForBlock, - } - - if len(eventsForBlock) > 0 || (i+1)%int(test.heartbeatInterval) == 0 { - expectedEventsResponses = append(expectedEventsResponses, eventResponse) + if len(expectedEvents) > 0 || (i+1)%int(test.heartbeatInterval) == 0 { + expectedEventsResponses = append(expectedEventsResponses, &backend.EventsResponse{ + Height: block.Header.Height, + BlockID: blockID, + Events: expectedEvents, + }) } - subscriptionEventsResponses = append(subscriptionEventsResponses, eventResponse) + subscriptionEventsResponses = append(subscriptionEventsResponses, &backend.EventsResponse{ + Height: block.Header.Height, + BlockID: blockID, + Events: subscriptionEvents, + }) } } } @@ -421,7 +430,7 @@ func requireResponse(t *testing.T, recorder *testHijackResponseRecorder, expecte require.Equal(t, expectedEvent.TransactionID, actualEvent.TransactionID) require.Equal(t, expectedEvent.TransactionIndex, actualEvent.TransactionIndex) require.Equal(t, expectedEvent.EventIndex, actualEvent.EventIndex) - require.Equal(t, expectedEvent.Payload, actualEvent.Payload) + // payload is not expected to match, but it should decode // payload must decode to valid json-cdc encoded data _, err := jsoncdc.Decode(nil, actualEvent.Payload) diff --git a/engine/access/rest/routes/websocket_handler.go b/engine/access/rest/routes/websocket_handler.go index 818e65da00d..221a18ea7b0 100644 --- a/engine/access/rest/routes/websocket_handler.go +++ b/engine/access/rest/routes/websocket_handler.go @@ -161,7 +161,6 @@ func (wsController *WebsocketController) writeEvents(sub state_stream.Subscripti // EventsResponse contains CCF encoded events, and this API returns JSON-CDC events. // convert event payload formats. - events := make([]flow.Event, len(resp.Events)) for i, e := range resp.Events { payload, err := convert.CcfPayloadToJsonPayload(e.Payload) if err != nil { @@ -169,9 +168,8 @@ func (wsController *WebsocketController) writeEvents(sub state_stream.Subscripti wsController.wsErrorHandler(err) return } - events[i].Payload = payload + resp.Events[i].Payload = payload } - resp.Events = events // Write the response to the WebSocket connection err = wsController.conn.WriteJSON(event)