Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Index Execution Data #4653

Merged
merged 230 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 222 commits
Commits
Show all changes
230 commits
Select commit Hold shift + click to select a range
9d8151d
[Access] Index execution data
peterargue Aug 11, 2023
c1ac532
integrate index engine into AN
peterargue Aug 12, 2023
460e418
implement script state execution struct
Aug 23, 2023
b2a714b
define register index store interface
Aug 23, 2023
522e615
rename register to registers
Aug 23, 2023
4fa8a74
change register types
Aug 23, 2023
cd7ed77
simple in-memory register index
Aug 23, 2023
3008022
add indexer module interface
Aug 23, 2023
fe6b0ef
add indexer module
Aug 23, 2023
fff7f95
update indexer type
Aug 23, 2023
f17f63f
use indexer module
Aug 23, 2023
f5d952d
store last index
Aug 23, 2023
3f0d81f
add height for block
Aug 23, 2023
2d3e711
add store commitment
Aug 23, 2023
6edc290
implement state commitment
Aug 23, 2023
dfce5f2
storage snapshot
Aug 23, 2023
eab225a
change script execution state interface
Aug 23, 2023
f85a1af
remove unneeded
Aug 23, 2023
89e90db
change script execution state type
Aug 23, 2023
a713dca
update to indexer for block height
Aug 23, 2023
ab3d3c6
divide indexer into two reader and writer
Aug 23, 2023
e7c9f06
update error
Aug 23, 2023
e3dee5b
update indexer builder
Aug 23, 2023
6ea95c4
add script execution state to indexer interface
Aug 24, 2023
92eb46a
Merge branch 'gregor/register-storage' into gregor/index-exeuction-data
Aug 29, 2023
ab804c1
add comments and update the interface names
Aug 29, 2023
44d83c2
Merge branch 'gregor/register-storage' into gregor/index-exeuction-data
Aug 29, 2023
f84e15d
Merge branch 'gregor/register-storage' into gregor/index-exeuction-data
Aug 29, 2023
2c2b9b9
update changes to store interface
Aug 29, 2023
7203286
rename indexer and unexport
Aug 29, 2023
93394e9
init from the persisted height
Aug 29, 2023
6f62ab3
add flag for index last height
Aug 29, 2023
7a2bfdb
export indexer
Aug 29, 2023
53bbea4
remove execution state interface
Aug 29, 2023
8b42560
todo commitment
Aug 29, 2023
e72cd6e
update indexer batch update
Aug 29, 2023
5f7a033
change the name and interfaces
Aug 30, 2023
024769a
add index events
Aug 30, 2023
c833dc4
change names of method to index
Aug 30, 2023
b71cd05
remove unneded index
Aug 30, 2023
457d66c
add events
Aug 30, 2023
9ddf1ed
move indexer and worker in the statesync
Aug 31, 2023
e1f9942
add comments
Aug 31, 2023
629ec00
remove unneded engine
Aug 31, 2023
5361a42
Update storage/memory/registers.go
sideninja Aug 31, 2023
36ada73
update the processed index func
Sep 1, 2023
1744e1a
fix comment
Sep 1, 2023
c6486cf
update return type
Sep 1, 2023
ceb81fc
create builder for execution state worker
Sep 1, 2023
ddcc16f
remove unneeded interface
Sep 1, 2023
a597e60
comment execution state indexer
Sep 1, 2023
dda7808
Merge branch 'gregor/register-storage' into gregor/index-exeuction-data
Sep 1, 2023
aab0869
add sequental counter
Sep 1, 2023
27bfa92
better handle context and add concurency
Sep 1, 2023
aea2ec2
add comment
Sep 1, 2023
c5c417d
add comment
Sep 1, 2023
2302623
add comment
Sep 1, 2023
3041db6
Merge remote-tracking branch 'origin/gregor/index-exeuction-data' int…
Sep 1, 2023
3ebde12
fix changes in storage interface
Sep 1, 2023
665af97
change to only 1 worker
Sep 4, 2023
f8da887
rewire highest available execution data
Sep 4, 2023
8c61179
check bounds
Sep 4, 2023
5aa7e23
pass in start height
Sep 4, 2023
db4aa16
fix test api change
Sep 4, 2023
6a78a1f
add tests for block height and commitments
Sep 4, 2023
5d039a8
add storage mock
Sep 4, 2023
640d123
add event mock
Sep 4, 2023
9167571
test generator
Sep 5, 2023
eddfee9
Merge branch 'gregor/register-storage' into gregor/index-exeuction-data
Sep 5, 2023
5868f09
update registers memory
Sep 5, 2023
0da5e96
update execution with new register method
Sep 5, 2023
7a44b07
update mocks
Sep 5, 2023
b46d27d
define test struct
Sep 5, 2023
b19a543
Merge branch 'gregor/register-storage' into gregor/index-exeuction-data
Sep 5, 2023
b2cb66f
update register index changes
Sep 5, 2023
f71b2fd
change the boundaries check
Sep 5, 2023
24a7484
execution tests fixtures
Sep 7, 2023
e11dd91
revert change
Sep 7, 2023
6a199a6
merge same registers and remove commitments
Sep 8, 2023
c2baca1
test that registers get merged
Sep 8, 2023
b8e19e6
extract boundary check
Sep 8, 2023
7cadec6
boundary check
Sep 8, 2023
9ba18ce
check indexing height
Sep 8, 2023
99cc238
fix comments
Sep 8, 2023
fb84b9e
add todo
Sep 8, 2023
431fcba
update comments
Sep 8, 2023
29b23b0
fix order of tests
Sep 8, 2023
33b37c2
add get register test
Sep 8, 2023
3714065
add get register test
Sep 8, 2023
3dcf82f
add nil check
Sep 13, 2023
4525eaa
add index range type
Sep 13, 2023
22ba479
update comments
Sep 13, 2023
328a840
provide whole interval
Sep 13, 2023
9bea8b6
use the index range
Sep 13, 2023
263740e
add validation
Sep 13, 2023
10ec365
fix index range init
Sep 13, 2023
3398ae3
fix test init
Sep 13, 2023
8df0be0
add range tests
Sep 13, 2023
fad1f91
remove unneeded func calls
Sep 13, 2023
dbbdbf5
build a component
Sep 13, 2023
21565f2
return an error on init and add comments
Sep 13, 2023
f9d2738
add headers to test
Sep 13, 2023
f1ad131
change worker startup
Sep 13, 2023
77e1ee3
add worker tests wip
Sep 13, 2023
7942fa9
fix range check
Sep 14, 2023
f103775
add start method to worker
Sep 14, 2023
44f678e
make progress test
Sep 14, 2023
da9d5d8
add trie register payload comparer
Sep 14, 2023
f06c4c2
update test using comparer
Sep 14, 2023
9ef1bf6
fix execution payload comparer
Sep 15, 2023
118f0c3
worker test simple case
Sep 15, 2023
90d0d9e
add block by id
Sep 15, 2023
f8be49c
add block by id on indexer test
Sep 15, 2023
cc2afc5
assert store called times
Sep 15, 2023
5725c79
add signaler context with expected error
Sep 15, 2023
c755e0c
use signaler context with expected error
Sep 15, 2023
852c2ee
add note
Sep 15, 2023
ef5603f
refactor worker test
Sep 15, 2023
b387757
fix exposed type
Sep 15, 2023
621abef
revert changes to get highest index
Sep 15, 2023
5c0f1cf
fix building indexer
Sep 15, 2023
6026189
highest index
Sep 15, 2023
37f4d0c
update register interface
Sep 18, 2023
df2d58f
update register interface
Sep 18, 2023
cd06179
update mocks
Sep 18, 2023
be2e074
add debug logging
Sep 18, 2023
bce162e
fix a few issues from localnet testing
peterargue Sep 18, 2023
1418631
fix building up
Sep 18, 2023
b0d53ec
fix if statement
Sep 18, 2023
c5165c8
Merge remote-tracking branch 'origin/petera/indexer-updates' into gre…
Sep 18, 2023
60decce
add store latest height
Sep 18, 2023
40f59c2
extract var on building
Sep 18, 2023
a7f78a8
add logging
Sep 18, 2023
6a5e083
correct errors on in-memory
Sep 18, 2023
ecf32f3
Merge branch 'master' into gregor/index-exeuction-data
Sep 18, 2023
290edad
merge master
Sep 18, 2023
a3cb4cf
add log with component
Sep 19, 2023
9842cd3
register store memory fix init
Sep 19, 2023
a2656be
memory get
Sep 20, 2023
a37d16e
memory init
Sep 20, 2023
51561f1
add storage integration tests
Sep 20, 2023
5ad8f1a
fix logging
Sep 20, 2023
873b77f
add admin command to get register values - temp
Sep 20, 2023
fe8190a
remove done todo
Sep 20, 2023
3a4ce09
improve logging
Sep 20, 2023
d16d52f
fix decoding arg
Sep 20, 2023
7281c9f
init register storage
Sep 20, 2023
d61ae2b
return hex encoded value
Sep 20, 2023
e663be2
improve range check
Sep 21, 2023
ef05316
remove unused
Sep 21, 2023
110a65f
fix error message
Sep 21, 2023
383fd4f
update register interface comments
Sep 21, 2023
6600c6a
update comments
Sep 21, 2023
650bfd4
fix tests
Sep 21, 2023
1834a19
update comment
Sep 21, 2023
21c6370
update init
Sep 21, 2023
4d138c5
extract command to another PR
Sep 21, 2023
cffa6ab
Merge branch 'master' into gregor/index-exeuction-data
Sep 21, 2023
7ad8f2e
leave a comment
Sep 21, 2023
6f214b6
removed admin
Sep 21, 2023
6a4ac33
remove unused
Sep 21, 2023
337411c
remove builder
Sep 22, 2023
d968119
remove builder
Sep 22, 2023
a5b89bb
Revert "change script execution state interface"
Sep 22, 2023
53bd8c7
revert to master
Sep 22, 2023
c18ec6c
not needed
Sep 22, 2023
a9bb46c
Merge branch 'master' into gregor/index-exeuction-data
Sep 22, 2023
c9ff79f
remove notes
Sep 22, 2023
c6011a0
add note
Sep 22, 2023
fd88385
formatting
Sep 22, 2023
34da4eb
improve looping
Sep 22, 2023
85688ce
use atomic latest index
Sep 22, 2023
aa98c5e
fix tests
Sep 22, 2023
e71f880
indexer mock update
Sep 22, 2023
f62cbd8
change order for init
Sep 22, 2023
1d638ce
fix tests
Sep 22, 2023
5550b73
Update module/state_synchronization/requester/jobs/execution_data_rea…
sideninja Sep 25, 2023
45d5a7d
remove unneeded context
Sep 25, 2023
e7d75a0
drop unneeded methods
Sep 25, 2023
eea9e5c
hide component
Sep 25, 2023
ba5584a
revert version boundaries
Sep 25, 2023
8a0247b
Merge branch 'master' into gregor/index-exeuction-data
Sep 25, 2023
0fc9143
delete memory implementation of storage
Sep 25, 2023
6d4899c
remove memory tests
Sep 25, 2023
efd9a3e
fix integration tests
Sep 25, 2023
ea8a1f9
remove unneeded check
Sep 25, 2023
44761dc
improve range test
Sep 25, 2023
dfd8ba8
remove range and tests
Sep 26, 2023
d4a4068
add the check
Sep 26, 2023
f4bebe2
implement height check
Sep 26, 2023
9f99b05
fix tests without range check
Sep 27, 2023
d93986a
fix tests without range check
Sep 27, 2023
9dac94c
fix tests
Sep 27, 2023
164eab8
fix api changes
Sep 27, 2023
986a6ed
log reindexing
Sep 27, 2023
327cc3f
fix mocks with api changes
Sep 27, 2023
c73b421
fix tests with storage interface changes
Sep 27, 2023
3603e10
fix tests with storage interface changes
Sep 27, 2023
baeb324
rename indexer and worker
Sep 27, 2023
8dc7aee
remove todo
Sep 27, 2023
2d80e88
Merge branch 'master' into gregor/index-exeuction-data
Sep 27, 2023
be947b7
remove unneeded mocks
Sep 27, 2023
a00feb3
rename
Sep 27, 2023
d923b22
update test names
Sep 27, 2023
2885c02
revert test changes
Sep 27, 2023
b57184e
update deprecated
Sep 27, 2023
62a79c5
Merge branch 'master' into gregor/index-exeuction-data
Sep 27, 2023
008496d
convert package usage
Sep 27, 2023
db0f147
convert package usage
Sep 27, 2023
fb52463
remove fmt
Sep 27, 2023
c43497a
move itteration so it doesn't block
Sep 27, 2023
e6f4031
remove check for nil events
Sep 27, 2023
fb0cfb0
rename method
Sep 27, 2023
004b5ec
embeed component
Sep 27, 2023
b58b477
remove outdated comment
Sep 27, 2023
4451010
remove unneeded context
Sep 27, 2023
740ffd6
rename receiver
Sep 27, 2023
a62d952
update go doc
Sep 27, 2023
5fe52ca
create atomic value
Sep 27, 2023
801124e
remove error
Sep 27, 2023
73ac361
mock events
Sep 27, 2023
826d1f7
mock events
Sep 27, 2023
f4dc6e0
change the index registers API
Sep 28, 2023
46d0672
mock reached index
Sep 28, 2023
ea9a756
mock reached index
Sep 28, 2023
35b2d87
return an error
Sep 28, 2023
21a5730
append
Sep 28, 2023
69dbbfb
Merge branch 'master' into gregor/index-exeuction-data
peterargue Sep 28, 2023
cb77a4b
Merge branch 'master' into gregor/index-exeuction-data
peterargue Sep 28, 2023
b5f9804
fix merge conflict in unittests
peterargue Sep 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion model/convert/service_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/coreos/go-semver/semver"

"github.com/onflow/cadence"
"github.com/onflow/cadence/encoding/ccf"

Expand Down
17 changes: 16 additions & 1 deletion module/irrecoverable/unittest.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,26 @@ package irrecoverable
import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

// MockSignalerContext is a SignalerContext which will immediately fail a test if an error is thrown.
type MockSignalerContext struct {
context.Context
t *testing.T
t *testing.T
expectError error
}

var _ SignalerContext = &MockSignalerContext{}

func (m MockSignalerContext) sealed() {}

func (m MockSignalerContext) Throw(err error) {
if m.expectError != nil {
assert.EqualError(m.t, err, m.expectError.Error())
return
}
m.t.Fatalf("mock signaler context received error: %v", err)
}

Expand All @@ -30,3 +37,11 @@ func NewMockSignalerContextWithCancel(t *testing.T, parent context.Context) (*Mo
ctx, cancel := context.WithCancel(parent)
return NewMockSignalerContext(t, ctx), cancel
}

func NewMockSignalerContextExpectError(t *testing.T, ctx context.Context, err error) *MockSignalerContext {
return &MockSignalerContext{
Context: ctx,
t: t,
expectError: err,
}
}
2 changes: 2 additions & 0 deletions module/jobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const (

ConsumeProgressExecutionDataRequesterBlockHeight = "ConsumeProgressExecutionDataRequesterBlockHeight"
ConsumeProgressExecutionDataRequesterNotification = "ConsumeProgressExecutionDataRequesterNotification"

ConsumeProgressExecutionDataIndexerBlockHeight = "ConsumeProgressExecutionDataIndexerBlockHeight"
)

// JobID is a unique ID of the job.
Expand Down
99 changes: 99 additions & 0 deletions module/state_synchronization/indexer/indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package indexer

import (
"time"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/module/executiondatasync/execution_data/cache"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/jobqueue"
"github.com/onflow/flow-go/module/state_synchronization/requester/jobs"
"github.com/onflow/flow-go/storage"
)

const (
workersCount = 1 // how many workers will concurrently process the tasks in the jobqueue
searchAhead = 1 // how many block heights ahead of the current will be requested and tasked for jobqueue
)

// Indexer handles ingestion of new execution data available and uses the execution data indexer module
// to index the data.
// The processing of new available data is done by creating a jobqueue that uses the execution data reader to
// obtain new jobs. The worker also implements the `highestConsecutiveHeight` method which is used by the execution
// data reader, so it doesn't surpass the highest sealed block height when fetching the data.
// The execution state worker has a callback that is used by the upstream queues which download new execution data to
// notify new data is available and kick off indexing.
type Indexer struct {
component.Component
log zerolog.Logger
exeDataReader *jobs.ExecutionDataReader
exeDataNotifier engine.Notifier
indexer *IndexerCore
}

// NewIndexer creates a new execution worker.
func NewIndexer(
log zerolog.Logger,
initHeight uint64,
fetchTimeout time.Duration,
indexer *IndexerCore,
executionCache *cache.ExecutionDataCache,
executionDataLatestHeight func() (uint64, error),
processedHeight storage.ConsumerProgress,
) *Indexer {
r := &Indexer{
exeDataNotifier: engine.NewNotifier(),
indexer: indexer,
}

r.exeDataReader = jobs.NewExecutionDataReader(executionCache, fetchTimeout, executionDataLatestHeight)

// create a jobqueue that will process new available block execution data. The `exeDataNotifier` is used to
// signal new work, which is being triggered on the `OnExecutionData` handler.
r.Component = jobqueue.NewComponentConsumer(
log.With().Str("module", "execution_indexer").Logger(),
r.exeDataNotifier.Channel(),
processedHeight,
r.exeDataReader,
initHeight,
r.processExecutionData,
workersCount,
searchAhead,
)

return r
}

// Start the worker jobqueue to consume the available data.
func (i *Indexer) Start(ctx irrecoverable.SignalerContext) {
i.exeDataReader.AddContext(ctx)
i.Component.Start(ctx)
}

// OnExecutionData is used to notify when new execution data is downloaded by the execution data requester jobqueue.
func (i *Indexer) OnExecutionData(_ *execution_data.BlockExecutionDataEntity) {
i.exeDataNotifier.Notify()
}

// processExecutionData is a worker method that is being called by the jobqueue when processing a new job.
// The job data contains execution data which we provide to the execution indexer to index it.
func (i *Indexer) processExecutionData(ctx irrecoverable.SignalerContext, job module.Job, done func()) {
entry, err := jobs.JobToBlockEntry(job)
if err != nil {
i.log.Error().Err(err).Str("job_id", string(job.ID())).Msg("error converting execution data job")
ctx.Throw(err)
}

err = i.indexer.IndexBlockData(entry.ExecutionData)
if err != nil {
i.log.Error().Err(err).Str("job_id", string(job.ID())).Msg("error during execution data index processing job")
ctx.Throw(err)
}

done()
}
174 changes: 174 additions & 0 deletions module/state_synchronization/indexer/indexer_core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package indexer

import (
"fmt"

"github.com/rs/zerolog"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/logging"
)

// IndexerCore indexes the execution state.
type IndexerCore struct {
registers storage.RegisterIndex
headers storage.Headers
events storage.Events
log zerolog.Logger
}

// New execution state indexer used to ingest block execution data and index it by height.
// The passed RegisterIndex storage must be populated to include the first and last height otherwise the indexer
// won't be initialized to ensure we have bootstrapped the storage first.
func New(
log zerolog.Logger,
registers storage.RegisterIndex,
headers storage.Headers,
events storage.Events,
) (*IndexerCore, error) {
return &IndexerCore{
registers: registers,
headers: headers,
events: events,
log: log.With().Str("component", "execution_indexer").Logger(),
}, nil
}

// RegisterValues retrieves register values by the register IDs at the provided block height.
// Even if the register wasn't indexed at the provided height, returns the highest height the register was indexed at.
// Expected errors:
// - storage.ErrNotFound if the register by the ID was never indexed
func (c *IndexerCore) RegisterValues(IDs flow.RegisterIDs, height uint64) ([]flow.RegisterValue, error) {
values := make([]flow.RegisterValue, len(IDs))

for j, id := range IDs {
value, err := c.registers.Get(id, height)
if err != nil {
return nil, err
}

values[j] = value
}

return values, nil
}

// IndexBlockData indexes all execution block data by height.
// This method shouldn't be used concurrently.
// Expected errors:
// - storage.ErrNotFound if the block for execution data was not found
func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEntity) error {
block, err := c.headers.ByBlockID(data.BlockID)
if err != nil {
return fmt.Errorf("could not get the block by ID %s: %w", data.BlockID, err)
}

lg := c.log.With().
Hex("block_id", logging.ID(data.BlockID)).
Uint64("height", block.Height).
Logger()

lg.Debug().Msgf("indexing new block")

// the height we are indexing must be exactly one bigger or same as the latest height indexed from the storage
latest := c.registers.LatestHeight()
if block.Height != latest+1 && block.Height != latest {
return fmt.Errorf("must store registers with the next height %d, but got %d", latest+1, block.Height)
}
// allow rerunning the indexer for same height since we are fetching height from register storage, but there are other storages
// for indexing resources which might fail to update the values, so this enables rerunning and reindexing those resources
if block.Height == latest {
lg.Warn().Msg("reindexing block data")
}

// concurrently process indexing of block data
g := errgroup.Group{}

g.Go(func() error {
events := make([]flow.Event, 0)
for _, chunk := range data.ChunkExecutionDatas {
events = append(events, chunk.Events...)
}

err := c.indexEvents(data.BlockID, events)
if err != nil {
return fmt.Errorf("could not index events at height %d: %w", block.Height, err)
}
return nil
})

g.Go(func() error {
updates := make([]*ledger.TrieUpdate, 0)

for _, chunk := range data.ChunkExecutionDatas {
if chunk.TrieUpdate != nil {
updates = append(updates, chunk.TrieUpdate)
}
}
sideninja marked this conversation as resolved.
Show resolved Hide resolved

// we are iterating all the registers and overwrite any existing register at the same path
// this will make sure if we have multiple register changes only the last change will get persisted
// if block has two chucks:
// first chunk updates: { X: 1, Y: 2 }
// second chunk updates: { X: 2 }
// then we should persist only {X: 2: Y: 2}
payloads := make(map[ledger.Path]*ledger.Payload)
for _, update := range updates {
for i, path := range update.Paths {
payloads[path] = update.Payloads[i]
}
}

err = c.indexRegisters(maps.Values(payloads), block.Height)
sideninja marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("could not index register payloads at height %d: %w", block.Height, err)
}

lg.Debug().
Int("register_count", len(payloads)).
Msg("indexed registers")

return nil
})

err = g.Wait()
if err != nil {
return fmt.Errorf("failed to index block data at height %d: %w", block.Height, err)
}
return nil
}

func (c *IndexerCore) indexEvents(blockID flow.Identifier, events flow.EventsList) error {
// Note: the last chunk in an execution data is the system chunk. All events in that ChunkExecutionData are service events.
err := c.events.Store(blockID, []flow.EventsList{events})
return err
}

func (c *IndexerCore) indexRegisters(payloads []*ledger.Payload, height uint64) error {
regEntries := make(flow.RegisterEntries, len(payloads))

for j, payload := range payloads {
k, err := payload.Key()
if err != nil {
return err
}

id, err := convert.LedgerKeyToRegisterID(k)
if err != nil {
return err
}

regEntries[j] = flow.RegisterEntry{
Key: id,
Value: payload.Value(),
}
}

return c.registers.Store(regEntries, height)
}
Loading
Loading