Skip to content

Commit

Permalink
feat: save restored snapshot locally (#16060)
Browse files Browse the repository at this point in the history
Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
(cherry picked from commit 81ba019)

# Conflicts:
#	snapshots/manager.go
#	store/CHANGELOG.md
  • Loading branch information
yihuang authored and mergify[bot] committed May 23, 2023
1 parent 273386f commit 1531efe
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 19 deletions.
86 changes: 67 additions & 19 deletions snapshots/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"math"
"os"
"sort"
"sync"

Expand Down Expand Up @@ -38,12 +39,12 @@ type Manager struct {
multistore types.Snapshotter
logger log.Logger

mtx sync.Mutex
operation operation
chRestore chan<- io.ReadCloser
chRestoreDone <-chan restoreDone
restoreChunkHashes [][]byte
restoreChunkIndex uint32
mtx sync.Mutex
operation operation
chRestore chan<- uint32
chRestoreDone <-chan restoreDone
restoreSnapshot *types.Snapshot
restoreChunkIndex uint32
}

// operation represents a Manager operation. Only one operation can be in progress at a time.
Expand All @@ -61,7 +62,8 @@ const (
opPrune operation = "prune"
opRestore operation = "restore"

chunkBufferSize = 4
chunkBufferSize = 4
chunkIDBufferSize = 1024

snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit
)
Expand Down Expand Up @@ -134,7 +136,7 @@ func (m *Manager) endLocked() {
m.chRestore = nil
}
m.chRestoreDone = nil
m.restoreChunkHashes = nil
m.restoreSnapshot = nil
m.restoreChunkIndex = 0
}

Expand Down Expand Up @@ -290,29 +292,58 @@ func (m *Manager) Restore(snapshot types.Snapshot) error {
}

// Start an asynchronous snapshot restoration, passing chunks and completion status via channels.
chChunks := make(chan io.ReadCloser, chunkBufferSize)
chChunkIDs := make(chan uint32, chunkIDBufferSize)
chDone := make(chan restoreDone, 1)

dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format)
if err := os.MkdirAll(dir, 0o750); err != nil {
return errorsmod.Wrapf(err, "failed to create snapshot directory %q", dir)
}

chChunks := m.loadChunkStream(snapshot.Height, snapshot.Format, chChunkIDs)

go func() {
err := m.restoreSnapshot(snapshot, chChunks)
err := m.doRestoreSnapshot(snapshot, chChunks)
chDone <- restoreDone{
complete: err == nil,
err: err,
}
close(chDone)
}()

m.chRestore = chChunks
m.chRestore = chChunkIDs
m.chRestoreDone = chDone
m.restoreChunkHashes = snapshot.Metadata.ChunkHashes
m.restoreSnapshot = &snapshot
m.restoreChunkIndex = 0
return nil
}

// restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed.
func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error {
var nextItem types.SnapshotItem
func (m *Manager) loadChunkStream(height uint64, format uint32, chunkIDs <-chan uint32) <-chan io.ReadCloser {
chunks := make(chan io.ReadCloser, chunkBufferSize)
go func() {
defer close(chunks)

for chunkID := range chunkIDs {
chunk, err := m.store.loadChunkFile(height, format, chunkID)
if err != nil {
m.logger.Error("load chunk file failed", "height", height, "format", format, "chunk", chunkID, "err", err)
break
}
chunks <- chunk
}
}()

return chunks
}

// doRestoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed.
func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error {
dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format)
if err := os.MkdirAll(dir, 0o750); err != nil {
return errorsmod.Wrapf(err, "failed to create snapshot directory %q", dir)
}

var nextItem types.SnapshotItem
streamReader, err := NewStreamReader(chChunks)
if err != nil {
return err
Expand Down Expand Up @@ -374,8 +405,13 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "no restore operation in progress")
}

<<<<<<< HEAD:snapshots/manager.go
if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "received unexpected chunk")
=======
if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) {
return false, errorsmod.Wrap(storetypes.ErrLogic, "received unexpected chunk")
>>>>>>> 81ba019e5 (feat: save restored snapshot locally (#16060)):store/snapshots/manager.go
}

// Check if any errors have occurred yet.
Expand All @@ -391,19 +427,30 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {

// Verify the chunk hash.
hash := sha256.Sum256(chunk)
expected := m.restoreChunkHashes[m.restoreChunkIndex]
expected := m.restoreSnapshot.Metadata.ChunkHashes[m.restoreChunkIndex]
if !bytes.Equal(hash[:], expected) {
return false, sdkerrors.Wrapf(types.ErrChunkHashMismatch,
"expected %x, got %x", hash, expected)
}

if err := m.store.saveChunkContent(chunk, m.restoreChunkIndex, m.restoreSnapshot); err != nil {
return false, errorsmod.Wrapf(err, "save chunk content %d", m.restoreChunkIndex)
}

// Pass the chunk to the restore, and wait for completion if it was the final one.
m.chRestore <- io.NopCloser(bytes.NewReader(chunk))
m.chRestore <- m.restoreChunkIndex
m.restoreChunkIndex++

if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) {
close(m.chRestore)
m.chRestore = nil

// the chunks are all written into files, we can save the snapshot to the db,
// even if the restoration may not completed yet.
if err := m.store.saveSnapshot(m.restoreSnapshot); err != nil {
return false, errorsmod.Wrap(err, "save restoring snapshot")
}

done := <-m.chRestoreDone
m.endLocked()
if done.err != nil {
Expand All @@ -412,6 +459,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
if !done.complete {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended prematurely")
}

return true, nil
}
return false, nil
Expand All @@ -437,7 +485,7 @@ func (m *Manager) RestoreLocalSnapshot(height uint64, format uint32) error {
}
defer m.endLocked()

return m.restoreSnapshot(*snapshot, ch)
return m.doRestoreSnapshot(*snapshot, ch)
}

// sortedExtensionNames sort extension names for deterministic iteration.
Expand Down
7 changes: 7 additions & 0 deletions snapshots/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@ func TestManager_Restore(t *testing.T) {
assert.Equal(t, expectItems, target.items)
assert.Equal(t, 10, len(extSnapshotter.state))

// The snapshot is saved in local snapshot store
snapshots, err := store.List()
require.NoError(t, err)
snapshot := snapshots[0]
require.Equal(t, uint64(3), snapshot.Height)
require.Equal(t, types.CurrentFormat, snapshot.Format)

// Starting a new restore should fail now, because the target already has contents.
err = manager.Restore(types.Snapshot{
Height: 3,
Expand Down
6 changes: 6 additions & 0 deletions snapshots/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ func (s *Store) saveChunk(chunkBody io.ReadCloser, index uint32, snapshot *types
return nil
}

// saveChunkContent save the chunk to disk
func (s *Store) saveChunkContent(chunk []byte, index uint32, snapshot *types.Snapshot) error {
path := s.PathChunk(snapshot.Height, snapshot.Format, index)
return os.WriteFile(path, chunk, 0o600)
}

// saveSnapshot saves snapshot metadata to the database.
func (s *Store) saveSnapshot(snapshot *types.Snapshot) error {
value, err := proto.Marshal(snapshot)
Expand Down
40 changes: 40 additions & 0 deletions store/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<!--
Guiding Principles:
Changelogs are for humans, not machines.
There should be an entry for every single version.
The same types of changes should be grouped.
Versions and sections should be linkable.
The latest version comes first.
The release date of each version is displayed.
Mention whether you follow Semantic Versioning.
Usage:
Change log entries are to be added to the Unreleased section under the
appropriate stanza (see below). Each entry should ideally include a tag and
the Github issue reference in the following format:
* (<tag>) [#<issue-number>] Changelog message.
Types of changes (Stanzas):
"Features" for new features.
"Improvements" for changes in existing functionality.
"Deprecated" for soon-to-be removed features.
"Bug Fixes" for any bug fixes.
"API Breaking" for breaking exported APIs used by developers building on SDK.
Ref: https://keepachangelog.com/en/1.0.0/
-->

# Changelog

## [Unreleased]

### Features

- [#15712](https://github.com/cosmos/cosmos-sdk/pull/15712) Add `WorkingHash` function to the store interface to get the current app hash before commit.
* [#14645](https://github.com/cosmos/cosmos-sdk/pull/14645) Add limit to the length of key and value.
* [#15683](https://github.com/cosmos/cosmos-sdk/pull/15683) `rootmulti.Store.CacheMultiStoreWithVersion` now can handle loading archival states that don't persist any of the module stores the current state has.
* [#16060](https://github.com/cosmos/cosmos-sdk/pull/16060) Support saving restoring snapshot locally.

## [v0.1.0-alpha.1](https://github.com/cosmos/cosmos-sdk/releases/tag/store%2Fv0.1.0-alpha.1) - 2023-03-17

### Features

* [#14746](https://github.com/cosmos/cosmos-sdk/pull/14746) The `store` module is extracted to have a separate go.mod file which allows it be a standalone module.
* [#14410](https://github.com/cosmos/cosmos-sdk/pull/14410) `rootmulti.Store.loadVersion` has validation to check if all the module stores' height is correct, it will error if any module store has incorrect height.

0 comments on commit 1531efe

Please sign in to comment.