-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement partition store restore-from-snapshot #2353
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @pcholakov. The changes look really nice! I had a few minor question. It would be great to add the streaming write before merging. Once this is resolved +1 for merging :-)
)); | ||
let file_path = snapshot_dir.path().join(filename); | ||
let file_data = self.object_store.get(&key).await?; | ||
tokio::fs::write(&file_path, file_data.bytes().await?).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it would indeed be great to write the file in streaming fashion to disk. Especially once our SSTs grow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something like
let mut file_data = self.object_store.get(&key).await?.into_stream();
let mut snapshot_file = tokio::fs::File::create_new(&file_path).await?;
while let Some(data) = file_data.next().await {
snapshot_file.write_all(&data?).await?;
}
can already be enough. Do you know how large the chunks of the stream returned by self.object_store.get(&key).await?.into_stream()
will be?
let partition_store = if !partition_store_manager | ||
.has_partition_store(pp_builder.partition_id) | ||
.await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of scope of this PR: What is the plan how to handle a PP that has some data but the data is lagging too far behind? So starting the PP would result into a trim gap. Would we then drop the respective column family and restart it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't tracked down all the places that would need to be updated yet but I believe we can shut down the PP, drop the existing CF (or just move it out of the way), and follow the bootstrap path from there.
/// Discover and download the latest snapshot available. Dropping the returned | ||
/// `LocalPartitionSnapshot` will delete the local snapshot data files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it because the files are stored in a temp directory? On LocalPartitionSnapshot
itself I couldn't find how the files are deleted when dropping it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the temp dir also the mechanism to clean things up if downloading it failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that TempDir::with_prefix_in
takes care of it since it deletes the files when it gets dropped. This is a nice solution!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is exactly right! I'm not 100% in love with it - it works but it's a big magical as deletion happens implicitly when LocalPartitionSnapshot
is dropped, and that could be quite far removed from the find_latest
call. Something that's hard to do with this approach is to retain the snapshot files if importing it fails, which could be useful for troubleshooting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description seems off though. If I understand the code correctly, then dropping LocalPartitionSnapshot
won't delete the files. What will happen via TempDir
is that if an error occurs before we call snapshot_dir.into_path()
, then the snapshot_dir
will be deleted. Afterwards its the responsibility of the whoever owns LocalPartitionSnapshot
to clean things up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the description is accurate :-) Ownership of the TempDir
moves into the LocalPartitionSnapshot
instance when we construct it with base_dir: snapshot_dir.into_path()
. Dropping the LocalPartitionSnapshot
thus drops the TempDir
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you verify that this is really how things work? According to the Rust docs of TempDir::into_path
it reads a bit differently:
Persist the temporary directory to disk, returning the PathBuf where it is located.
This consumes the TempDir without deleting directory on the filesystem, meaning that the directory will no longer be automatically deleted.
What into_path()
returns is a PathBuf
and PathBuf
does not have the functionality to delete the file it points to once it gets dropped.
"Found snapshot to bootstrap partition, restoring it", | ||
); | ||
partition_store_manager | ||
.open_partition_store_from_snapshot( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In
restate/crates/rocksdb/src/rock_access.rs
Line 156 in 531b987
fn import_cf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RocksDB will create hard-links if possible - so no real I/O happens as long as the snapshot files and the DB are on the same filesystem. I kept it this way because it was useful for my initial testing but I think it would be better to set move_files
= true
now. In the future, we may add a config option to retain the snapshot files on import/export, and even then maybe only if an error occurs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, moving by default and copying if explicitly configured sounds like the right settings.
defc6ee
to
7291ede
Compare
7291ede
to
ff6d9ce
Compare
40e74fd
to
aceb786
Compare
Next revision is up ready for review:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pcholakov for creating this PR. I left few comments below. I hope they help
let _permit = concurrency_limiter.acquire().await?; | ||
let mut file_data = StreamReader::new(object_store.get(&key).await?.into_stream()); | ||
let mut snapshot_file = tokio::fs::File::create_new(&file_path).await?; | ||
let size = io::copy(&mut file_data, &mut snapshot_file).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a sanity check that the downloaded file size matches what is expected from the snapshot metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to do another pass and add some kind of checksum to the metadata file, but a size sanity check is a good and cheap start! Thanks!
downloads.abort_all(); | ||
return Err(e.into()); | ||
} | ||
Some(Ok(_)) => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intentional to not handle errors returned by the download routine ? the _
here is actually an anyhow::Result
which itself can be an error.
Check suggestion below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right and we need to handle the inner error case as well. Right now, we might accept an incomplete snapshot as complete if any of the file downloads fails with an error and not a panic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably also want to include in the error message which file failed the download process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a pretty big miss, thanks for catching it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, let me know how you like it! I might have overdone the error handling a bit 😅
loop { | ||
match downloads.join_next().await { | ||
None => { | ||
break; | ||
} | ||
Some(Err(e)) => { | ||
downloads.abort_all(); | ||
return Err(e.into()); | ||
} | ||
Some(Ok(_)) => {} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
loop { | |
match downloads.join_next().await { | |
None => { | |
break; | |
} | |
Some(Err(e)) => { | |
downloads.abort_all(); | |
return Err(e.into()); | |
} | |
Some(Ok(_)) => {} | |
} | |
} | |
for result in downloads.join_next().await { | |
match result { | |
Err(err) => { | |
// join error | |
}, | |
Ok(Err(err)) => { | |
// anyhow error | |
} | |
Ok(Ok(_)) => { | |
// download succeeded | |
}, | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @pcholakov. The changes look good to me. The one thing we need to fix is the handling of failed downloads as pointed out by Azmy. Otherwise we might consider a snapshot completely downloaded while some ssts are missing.
if snapshot.key_range.start() > partition_key_range.start() | ||
|| snapshot.key_range.end() < partition_key_range.end() | ||
{ | ||
warn!( | ||
%partition_id, | ||
snapshot_range = ?snapshot.key_range, | ||
partition_range = ?partition_key_range, | ||
"The snapshot key range does not fully cover the partition key range" | ||
); | ||
return Err(RocksError::SnapshotKeyRangeMismatch); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good check :-)
/// Discover and download the latest snapshot available. Dropping the returned | ||
/// `LocalPartitionSnapshot` will delete the local snapshot data files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description seems off though. If I understand the code correctly, then dropping LocalPartitionSnapshot
won't delete the files. What will happen via TempDir
is that if an error occurs before we call snapshot_dir.into_path()
, then the snapshot_dir
will be deleted. Afterwards its the responsibility of the whoever owns LocalPartitionSnapshot
to clean things up.
}; | ||
|
||
let latest: LatestSnapshot = | ||
serde_json::from_slice(latest.bytes().await?.iter().as_slice())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does iter().as_slice()
do? Would &latest.bytes().await?
work instead?
trace!("Latest snapshot metadata: {:?}", latest); | ||
|
||
let snapshot_metadata_path = object_store::path::Path::from(format!( | ||
"{prefix}{partition_id}/{path}/metadata.json", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These paths are probably used on the write and read path. Should we share them through a function. That makes it easier to keep them in sync between the two paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you gonna unify these paths once #2310 gets merged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't get to this unfortunately - let me track this separately as #2389. I don't want to do any last minute fixes right now for fear of breaking things, and it would be great to merge this into main so it doesn't rot sitting in PR for another week 😅
info!("Latest snapshot points to a snapshot that was not found in the repository!"); | ||
return Ok(None); // arguably this could also be an error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering whether this does not denote a "corruption" of our snapshots and therefore might even warrant a panic? I mean we might still be lucky and don't encounter a trim gap because a) we haven't trimmed yet or b) our applied index is still after the trim point. So I guess this might have been the motivation to return None
, here? This is actually more resilient than panicking in some cases. The downside is that we might be stuck in a retry loop if we are encountering a trim gap. Maybe raise the log level to warn so that this becomes more visible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for flagging this! I haven't thought about this path extensively; let me make it WARN for now, and I'll revisit what's the best way to behave here when I implement trim gap handling. I would be inclined to use some combination of retry-with-backoff while posting a metric that we can't make progress with this partition.
pub(crate) async fn get_latest( | ||
&self, | ||
partition_id: PartitionId, | ||
) -> anyhow::Result<Option<LocalPartitionSnapshot>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something for the future: It feels as if callers might be interested in why get_latest
failed in the future. I could imagine that different errors are handled differently (e.g. retried because connection to S3 failed vs. unsupported snapshot format). So anyhow::Result
(while convienent) might not be the perfect fit here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely, as soon as there is any difference in how they're handled, this should become a properly typed error.
let _permit = concurrency_limiter.acquire().await?; | ||
let mut file_data = StreamReader::new(object_store.get(&key).await?.into_stream()); | ||
let mut snapshot_file = tokio::fs::File::create_new(&file_path).await?; | ||
let size = io::copy(&mut file_data, &mut snapshot_file).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this solution for copying the file in streaming fashion :-)
downloads.abort_all(); | ||
return Err(e.into()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I think it does not make a difference right now for correctness, I would still recommend to drain downloads
after aborting all because abort_all
does not guarantee that tasks have completely finished (e.g. if something calls spawn_blocking
).
downloads.abort_all(); | ||
return Err(e.into()); | ||
} | ||
Some(Ok(_)) => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right and we need to handle the inner error case as well. Right now, we might accept an incomplete snapshot as complete if any of the file downloads fails with an error and not a panic.
downloads.abort_all(); | ||
return Err(e.into()); | ||
} | ||
Some(Ok(_)) => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably also want to include in the error message which file failed the download process.
ff6d9ce
to
7a0242b
Compare
05cf7ea
to
1b817bb
Compare
Hey folks! I haven't addressed the snapshot key factoring suggestion (which I strongly agree with but ran out of time!) but everything else should be covered. PTAL when you have a chance 😊 @tillrohrmann @muhamadazmy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @pcholakov. The changes look good to me :-) I left a few minor comments which you could resolve before merging.
@@ -163,7 +165,7 @@ impl RocksAccess for rocksdb::DB { | |||
let options = prepare_cf_options(&cf_patterns, default_cf_options, &name)?; | |||
|
|||
let mut import_opts = ImportColumnFamilyOptions::default(); | |||
import_opts.set_move_files(false); // keep the snapshot files intact | |||
import_opts.set_move_files(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice :-)
/// Discover and download the latest snapshot available. Dropping the returned | ||
/// `LocalPartitionSnapshot` will delete the local snapshot data files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you verify that this is really how things work? According to the Rust docs of TempDir::into_path
it reads a bit differently:
Persist the temporary directory to disk, returning the PathBuf where it is located.
This consumes the TempDir without deleting directory on the filesystem, meaning that the directory will no longer be automatically deleted.
What into_path()
returns is a PathBuf
and PathBuf
does not have the functionality to delete the file it points to once it gets dropped.
trace!("Latest snapshot metadata: {:?}", latest); | ||
|
||
let snapshot_metadata_path = object_store::path::Path::from(format!( | ||
"{prefix}{partition_id}/{path}/metadata.json", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you gonna unify these paths once #2310 gets merged?
panic!("Snapshot does not match the cluster name of latest snapshot at destination in snapshot id {}! Expected: cluster name=\"{}\", found: \"{}\"", | ||
snapshot_metadata.snapshot_id, | ||
self.cluster_name, | ||
snapshot_metadata.cluster_name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might become a problem in the future once we want to start a new cluster from a snapshot of another cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, definitely! But then we should really be explicit about which cluster we are importing from, once we add that path.
let size = io::copy(&mut file_data, &mut snapshot_file) | ||
.await | ||
.map_err(|e| anyhow!("Failed to download snapshot file {:?}: {}", key, e))?; | ||
if size != expected_size as u64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't this that this is problem here but a good practice is imo to use u64::try_from(expected_size).expect("usize to fit into u64")
. That way we do observe overflows.
let snapshot = if snapshot_repository.is_none() { | ||
debug!( | ||
partition_id = %partition_id, | ||
"No snapshot repository configured", | ||
); | ||
None | ||
} else { | ||
debug!( | ||
partition_id = %partition_id, | ||
"Looking for partition snapshot from which to bootstrap partition store", | ||
); | ||
snapshot_repository.expect("is some").get_latest(partition_id).await? | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You could use if let Some(snapshot_repository) = snapshot_respository { } else {}
instead of is_none
and then expecting
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to get it to work this way initially but the code ended up being deeply nested and looked less readable to my eyes. I'll give this another pass when I come back to this.
3a87e07
to
aee1a12
Compare
With this change, Partition Processor startup now checks the snapshot repository for a partition snapshot before creating a blank store database. If a recent snapshot is available, we will restore that instead of replaying the log from the beginning.
1b817bb
to
69b4a9a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Till! Punting on a couple of the comments so I can get this into main
- rather not let it sit in PR another week. I really appreciate your feedback throughout this cycle!
trace!("Latest snapshot metadata: {:?}", latest); | ||
|
||
let snapshot_metadata_path = object_store::path::Path::from(format!( | ||
"{prefix}{partition_id}/{path}/metadata.json", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't get to this unfortunately - let me track this separately as #2389. I don't want to do any last minute fixes right now for fear of breaking things, and it would be great to merge this into main so it doesn't rot sitting in PR for another week 😅
let snapshot = if snapshot_repository.is_none() { | ||
debug!( | ||
partition_id = %partition_id, | ||
"No snapshot repository configured", | ||
); | ||
None | ||
} else { | ||
debug!( | ||
partition_id = %partition_id, | ||
"Looking for partition snapshot from which to bootstrap partition store", | ||
); | ||
snapshot_repository.expect("is some").get_latest(partition_id).await? | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to get it to work this way initially but the code ended up being deeply nested and looked less readable to my eyes. I'll give this another pass when I come back to this.
With this change, Partition Processor startup now checks the snapshot repository
for a partition snapshot before creating a blank store database. If a recent
snapshot is available, we will restore that instead of replaying the log from
the beginning.
This PR builds on #2310
Closes: #2000
Testing
Created snapshot by running
restatectl create-snapshot -p 0
, then dropped the partition CF withrocksdb_ldb drop_column_family --db=./restate-data/.../db data-0
.Running
restate-server
correctly restores the most recent available snapshot: