diff --git a/snapshots/manager.go b/snapshots/manager.go index 22e453346af4..c05e77ca397b 100644 --- a/snapshots/manager.go +++ b/snapshots/manager.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "math" + "os" "sort" "sync" @@ -38,12 +39,12 @@ type Manager struct { multistore types.Snapshotter logger log.Logger - mtx sync.Mutex - operation operation - chRestore chan<- io.ReadCloser - chRestoreDone <-chan restoreDone - restoreChunkHashes [][]byte - restoreChunkIndex uint32 + mtx sync.Mutex + operation operation + chRestore chan<- uint32 + chRestoreDone <-chan restoreDone + restoreSnapshot *types.Snapshot + restoreChunkIndex uint32 } // operation represents a Manager operation. Only one operation can be in progress at a time. @@ -61,7 +62,8 @@ const ( opPrune operation = "prune" opRestore operation = "restore" - chunkBufferSize = 4 + chunkBufferSize = 4 + chunkIDBufferSize = 1024 snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit ) @@ -134,7 +136,7 @@ func (m *Manager) endLocked() { m.chRestore = nil } m.chRestoreDone = nil - m.restoreChunkHashes = nil + m.restoreSnapshot = nil m.restoreChunkIndex = 0 } @@ -290,11 +292,18 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { } // Start an asynchronous snapshot restoration, passing chunks and completion status via channels. - chChunks := make(chan io.ReadCloser, chunkBufferSize) + chChunkIDs := make(chan uint32, chunkIDBufferSize) chDone := make(chan restoreDone, 1) + dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format) + if err := os.MkdirAll(dir, 0o750); err != nil { + return errorsmod.Wrapf(err, "failed to create snapshot directory %q", dir) + } + + chChunks := m.loadChunkStream(snapshot.Height, snapshot.Format, chChunkIDs) + go func() { - err := m.restoreSnapshot(snapshot, chChunks) + err := m.doRestoreSnapshot(snapshot, chChunks) chDone <- restoreDone{ complete: err == nil, err: err, @@ -302,17 +311,39 @@ func (m *Manager) Restore(snapshot types.Snapshot) error { close(chDone) }() - m.chRestore = chChunks + m.chRestore = chChunkIDs m.chRestoreDone = chDone - m.restoreChunkHashes = snapshot.Metadata.ChunkHashes + m.restoreSnapshot = &snapshot m.restoreChunkIndex = 0 return nil } -// restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed. -func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error { - var nextItem types.SnapshotItem +func (m *Manager) loadChunkStream(height uint64, format uint32, chunkIDs <-chan uint32) <-chan io.ReadCloser { + chunks := make(chan io.ReadCloser, chunkBufferSize) + go func() { + defer close(chunks) + + for chunkID := range chunkIDs { + chunk, err := m.store.loadChunkFile(height, format, chunkID) + if err != nil { + m.logger.Error("load chunk file failed", "height", height, "format", format, "chunk", chunkID, "err", err) + break + } + chunks <- chunk + } + }() + return chunks +} + +// doRestoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed. +func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error { + dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format) + if err := os.MkdirAll(dir, 0o750); err != nil { + return errorsmod.Wrapf(err, "failed to create snapshot directory %q", dir) + } + + var nextItem types.SnapshotItem streamReader, err := NewStreamReader(chChunks) if err != nil { return err @@ -374,8 +405,13 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) { return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "no restore operation in progress") } +<<<<<<< HEAD:snapshots/manager.go if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) { return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "received unexpected chunk") +======= + if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) { + return false, errorsmod.Wrap(storetypes.ErrLogic, "received unexpected chunk") +>>>>>>> 81ba019e5 (feat: save restored snapshot locally (#16060)):store/snapshots/manager.go } // Check if any errors have occurred yet. @@ -391,19 +427,30 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) { // Verify the chunk hash. hash := sha256.Sum256(chunk) - expected := m.restoreChunkHashes[m.restoreChunkIndex] + expected := m.restoreSnapshot.Metadata.ChunkHashes[m.restoreChunkIndex] if !bytes.Equal(hash[:], expected) { return false, sdkerrors.Wrapf(types.ErrChunkHashMismatch, "expected %x, got %x", hash, expected) } + if err := m.store.saveChunkContent(chunk, m.restoreChunkIndex, m.restoreSnapshot); err != nil { + return false, errorsmod.Wrapf(err, "save chunk content %d", m.restoreChunkIndex) + } + // Pass the chunk to the restore, and wait for completion if it was the final one. - m.chRestore <- io.NopCloser(bytes.NewReader(chunk)) + m.chRestore <- m.restoreChunkIndex m.restoreChunkIndex++ - if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) { + if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) { close(m.chRestore) m.chRestore = nil + + // the chunks are all written into files, we can save the snapshot to the db, + // even if the restoration may not completed yet. + if err := m.store.saveSnapshot(m.restoreSnapshot); err != nil { + return false, errorsmod.Wrap(err, "save restoring snapshot") + } + done := <-m.chRestoreDone m.endLocked() if done.err != nil { @@ -412,6 +459,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) { if !done.complete { return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended prematurely") } + return true, nil } return false, nil @@ -437,7 +485,7 @@ func (m *Manager) RestoreLocalSnapshot(height uint64, format uint32) error { } defer m.endLocked() - return m.restoreSnapshot(*snapshot, ch) + return m.doRestoreSnapshot(*snapshot, ch) } // sortedExtensionNames sort extension names for deterministic iteration. diff --git a/snapshots/manager_test.go b/snapshots/manager_test.go index a5343f759fcd..9b60691508a1 100644 --- a/snapshots/manager_test.go +++ b/snapshots/manager_test.go @@ -213,6 +213,13 @@ func TestManager_Restore(t *testing.T) { assert.Equal(t, expectItems, target.items) assert.Equal(t, 10, len(extSnapshotter.state)) + // The snapshot is saved in local snapshot store + snapshots, err := store.List() + require.NoError(t, err) + snapshot := snapshots[0] + require.Equal(t, uint64(3), snapshot.Height) + require.Equal(t, types.CurrentFormat, snapshot.Format) + // Starting a new restore should fail now, because the target already has contents. err = manager.Restore(types.Snapshot{ Height: 3, diff --git a/snapshots/store.go b/snapshots/store.go index 0c5b295a14e1..1087c826fab2 100644 --- a/snapshots/store.go +++ b/snapshots/store.go @@ -307,6 +307,12 @@ func (s *Store) saveChunk(chunkBody io.ReadCloser, index uint32, snapshot *types return nil } +// saveChunkContent save the chunk to disk +func (s *Store) saveChunkContent(chunk []byte, index uint32, snapshot *types.Snapshot) error { + path := s.PathChunk(snapshot.Height, snapshot.Format, index) + return os.WriteFile(path, chunk, 0o600) +} + // saveSnapshot saves snapshot metadata to the database. func (s *Store) saveSnapshot(snapshot *types.Snapshot) error { value, err := proto.Marshal(snapshot) diff --git a/store/CHANGELOG.md b/store/CHANGELOG.md new file mode 100644 index 000000000000..c5d108783cab --- /dev/null +++ b/store/CHANGELOG.md @@ -0,0 +1,40 @@ + + +# Changelog + +## [Unreleased] + +### Features + +- [#15712](https://github.com/cosmos/cosmos-sdk/pull/15712) Add `WorkingHash` function to the store interface to get the current app hash before commit. +* [#14645](https://github.com/cosmos/cosmos-sdk/pull/14645) Add limit to the length of key and value. +* [#15683](https://github.com/cosmos/cosmos-sdk/pull/15683) `rootmulti.Store.CacheMultiStoreWithVersion` now can handle loading archival states that don't persist any of the module stores the current state has. +* [#16060](https://github.com/cosmos/cosmos-sdk/pull/16060) Support saving restoring snapshot locally. + +## [v0.1.0-alpha.1](https://github.com/cosmos/cosmos-sdk/releases/tag/store%2Fv0.1.0-alpha.1) - 2023-03-17 + +### Features + +* [#14746](https://github.com/cosmos/cosmos-sdk/pull/14746) The `store` module is extracted to have a separate go.mod file which allows it be a standalone module. +* [#14410](https://github.com/cosmos/cosmos-sdk/pull/14410) `rootmulti.Store.loadVersion` has validation to check if all the module stores' height is correct, it will error if any module store has incorrect height.