From 1dca687d21a0cdfbf82f8bdaa93dd8f9aa31149a Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 28 Jan 2021 15:43:46 +0800 Subject: [PATCH] backup: support split big region into small backup files (#9283) (#9448) cherry-pick #9283 to release-4.0 --- ### What problem does this PR solve? Issue Number: close #9144 Problem Summary: BR will read all data of a region and fill it in a SST writer. But it is in-memory. If there is a huge region, TiKV may crash for OOM because of keeping all data of this region in memory. ### What is changed and how it works? What's Changed: Record the written txn entries' size. When it reaches `region_max_size`, we will save the data cached in RocksDB to a SST file and then switch to the next file. ### Related changes - Need to cherry-pick to the release branch ### Check List Tests - Unit test - Integration test - Manual test (add detailed scripts or steps below) 1. Set `sst-max-size` to 15MiB. ``` mysql> select * from CLUSTER_CONFIG where `TYPE`="tikv"; +------+-----------------+---------------------------------------------------------------+------------------------------------------------------+ | TYPE | INSTANCE | KEY | VALUE | +------+-----------------+---------------------------------------------------------------+------------------------------------------------------+ | tikv | 127.0.0.1:20160 | backup.batch-size | 8 | | tikv | 127.0.0.1:20160 | backup.num-threads | 9 | | tikv | 127.0.0.1:20160 | backup.sst-max-size | 15MiB | ... ``` 2. Backup around 100MB data(without compaction) successfully. ``` $ ./br backup full -s ./backup --pd http://127.0.0.1:2379 Full backup <--------------------------------------------------------------------------------------------------------------------------------------------------------------------> 100.00% Checksum <-----------------------------------------------------------------------------------------------------------------------------------------------------------------------> 100.00% [2020/12/31 14:39:12.534 +08:00] [INFO] [collector.go:60] ["Full backup Success summary: total backup ranges: 2, total success: 2, total failed: 0, total take(Full backup time): 4.273097395s, total take(real time): 8.133315406s, total kv: 8000000, total size(MB): 361.27, avg speed(MB/s): 84.55"] ["backup checksum"=901.754111ms] ["backup fast checksum"=6.09384ms] ["backup total regions"=10] [BackupTS=421893700168974340] [Size=48023090] ``` 3. The big region can be split into several files: ``` -rw-r--r-- 1 * * 1.5M Dec 31 14:39 1_60_28_74219326eeb0a4ae3a0f5190f7784132bb0e44791391547ef66862aaeb668579_1609396745730_write.sst -rw-r--r-- 1 * * 1.2M Dec 31 14:39 1_60_28_b7a5509d9912c66a21589d614cfc8828acd4051a7eeea3f24f5a7b337b5a389e_1609396746062_write.sst -rw-r--r-- 1 * * 1.5M Dec 31 14:39 1_60_28_cdcc2ce1c18a30a2b779b574f64de9f0e3be81c2d8720d5af0a9ef9633f8fbb7_1609396745429_write.sst -rw-r--r-- 1 * * 2.4M Dec 31 14:39 1_62_28_4259e616a6e7b70c33ee64af60230f3e4160af9ac7aac723f033cddf6681826a_1609396747038_write.sst -rw-r--r-- 1 * * 2.4M Dec 31 14:39 1_62_28_5d0de44b65fb805e45c93278661edd39792308c8ce90855b54118c4959ec9f16_1609396746731_write.sst -rw-r--r-- 1 * * 2.4M Dec 31 14:39 1_62_28_ef7ab4b5471b088ee909870e316d926f31f4f6ec771754690eac61af76e8782c_1609396747374_write.sst -rw-r--r-- 1 * * 1.5M Dec 31 14:39 1_64_29_74211aae8215fe9cde8bd7ceb8494afdcc18e5c6a8c5830292a577a9859d38e1_1609396746671_write.sst -rw-r--r-- 1 * * 1.2M Dec 31 14:39 1_64_29_81e152c98742938c1662241fac1c841319029e800da6881d799a16723cb42888_1609396747010_write.sst -rw-r--r-- 1 * * 1.5M Dec 31 14:39 1_64_29_ce0dde9826aee9e5ccac0a516f18b9871d3897effd559ff7450b8e56ac449bbd_1609396746349_write.sst -rw-r--r-- 1 * * 78 Dec 31 14:39 backup.lock -rw-r--r-- 1 * * 229K Dec 31 14:39 backupmeta ``` 4. Restore backuped data. It works successfully and passes the manual check. ``` ./br restore full -s ./backup --pd http://127.0.0.1:2379 Full restore <-------------------------------------------------------------------------------------------------------------------------------------------------------------------> 100.00% [2020/12/31 14:42:49.983 +08:00] [INFO] [collector.go:60] ["Full restore Success summary: total restore files: 27, total success: 27, total failed: 0, total take(Full restore time): 5.063048828s, total take(real time): 7.84620924s, total kv: 8000000, total size(MB): 361.27, avg speed(MB/s): 71.36"] ["split region"=26.217737ms] ["restore checksum"=4.10792638s] ["restore ranges"=26] [Size=48023090] ``` ### Release note - Fix the problem that TiKV OOM when we backup a huge region. --- components/backup/src/endpoint.rs | 155 +++++++++++------- components/backup/src/lib.rs | 2 +- components/backup/src/writer.rs | 98 ++++++++++- .../backup/tests/integrations/test_backup.rs | 137 +++++++++++++++- etc/config-template.toml | 8 + src/config.rs | 16 ++ src/storage/txn/store.rs | 10 ++ tests/integrations/config/mod.rs | 1 + tests/integrations/config/test-custom.toml | 1 + 9 files changed, 358 insertions(+), 70 deletions(-) diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index c0c1a8cec4be..1205fec53883 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -30,6 +30,8 @@ use yatp::task::callback::{Handle, TaskCell}; use yatp::ThreadPool; use crate::metrics::*; +use crate::writer::BackupWriterBuilder; +use crate::Error; use crate::*; const BACKUP_BATCH_LIMIT: usize = 1024; @@ -142,11 +144,12 @@ impl BackupRange { /// Get entries from the scanner and save them to storage fn backup( &self, - writer: &mut BackupWriter, + writer_builder: BackupWriterBuilder, engine: &E, backup_ts: TimeStamp, begin_ts: TimeStamp, - ) -> Result { + storage: &LimitedStorage, + ) -> Result<(Vec, Statistics)> { assert!(!self.is_raw_kv); let mut ctx = Context::default(); @@ -181,7 +184,17 @@ impl BackupRange { .unwrap(); let start_scan = Instant::now(); + let mut files: Vec = Vec::with_capacity(2); let mut batch = EntryBatch::with_capacity(BACKUP_BATCH_LIMIT); + let mut last_key = self + .start_key + .clone() + .map_or_else(Vec::new, |k| k.into_raw().unwrap()); + let mut cur_key = self + .end_key + .clone() + .map_or_else(Vec::new, |k| k.into_raw().unwrap()); + let mut writer = writer_builder.build(last_key.clone())?; loop { if let Err(e) = scanner.scan_entries(&mut batch) { error!(?e; "backup scan entries failed"); @@ -191,6 +204,48 @@ impl BackupRange { break; } debug!("backup scan entries"; "len" => batch.len()); + + if writer.need_split_keys() { + let res = { + batch.iter().next().map_or_else( + || Err(Error::Other(box_err!("get entry error"))), + |x| match x.to_key() { + Ok(k) => { + cur_key = k.into_raw().unwrap(); + writer_builder.build(cur_key.clone()) + } + Err(e) => { + error!(?e; "backup save file failed"); + Err(Error::Other(box_err!("Decode error: {:?}", e))) + } + }, + ) + }; + match writer.save(&storage.storage) { + Ok(mut split_files) => { + for file in split_files.iter_mut() { + file.set_start_key(last_key.clone()); + file.set_end_key(cur_key.clone()); + } + last_key = cur_key.clone(); + files.append(&mut split_files); + } + Err(e) => { + error!(?e; "backup save file failed"); + return Err(e); + } + } + match res { + Ok(w) => { + writer = w; + } + Err(e) => { + error!(?e; "backup writer failed"); + return Err(e); + } + } + } + // Build sst files. if let Err(e) = writer.write(batch.drain(), true) { error!(?e; "backup build sst failed"); @@ -200,8 +255,29 @@ impl BackupRange { BACKUP_RANGE_HISTOGRAM_VEC .with_label_values(&["scan"]) .observe(start_scan.elapsed().as_secs_f64()); + + if writer.need_flush_keys() { + match writer.save(&storage.storage) { + Ok(mut split_files) => { + cur_key = self + .end_key + .clone() + .map_or_else(Vec::new, |k| k.into_raw().unwrap()); + for file in split_files.iter_mut() { + file.set_start_key(last_key.clone()); + file.set_end_key(cur_key.clone()); + } + files.append(&mut split_files); + } + Err(e) => { + error!(?e; "backup save file failed"); + return Err(e); + } + } + } + let stat = scanner.take_statistics(); - Ok(stat) + Ok((files, stat)) } fn backup_raw( @@ -264,44 +340,6 @@ impl BackupRange { Ok(statistics) } - fn backup_to_file( - &self, - engine: &E, - db: Arc, - storage: &LimitedStorage, - file_name: String, - backup_ts: TimeStamp, - start_ts: TimeStamp, - compression_type: Option, - compression_level: i32, - ) -> Result<(Vec, Statistics)> { - let mut writer = match BackupWriter::new( - db, - &file_name, - storage.limiter.clone(), - compression_type, - compression_level, - ) { - Ok(w) => w, - Err(e) => { - error!(?e; "backup writer failed"); - return Err(e); - } - }; - let stat = match self.backup(&mut writer, engine, backup_ts, start_ts) { - Ok(s) => s, - Err(e) => return Err(e), - }; - // Save sst files to storage. - match writer.save(&storage.storage) { - Ok(files) => Ok((files, stat)), - Err(e) => { - error!(?e; "backup save file failed"); - Err(e) - } - } - } - fn backup_raw_kv_to_file( &self, engine: &E, @@ -586,6 +624,7 @@ impl Endpoint { let db = self.db.clone(); let store_id = self.store_id; let batch_size = self.config_manager.0.read().unwrap().batch_size; + let sst_max_size = self.config_manager.0.read().unwrap().sst_max_size.0; // TODO: make it async. self.pool.borrow_mut().spawn(move || { @@ -654,17 +693,17 @@ impl Endpoint { brange.end_key.map_or_else(Vec::new, |k| k.into_encoded()), ) } else { + let writer_builder = BackupWriterBuilder::new( + store_id, + storage.limiter.clone(), + brange.region.clone(), + db.clone(), + ct, + request.compression_level, + sst_max_size, + ); ( - brange.backup_to_file( - &engine, - db.clone(), - &storage, - name, - backup_ts, - start_ts, - ct, - request.compression_level, - ), + brange.backup(writer_builder, &engine, backup_ts, start_ts, &storage), brange .start_key .map_or_else(Vec::new, |k| k.into_raw().unwrap()), @@ -692,8 +731,10 @@ impl Endpoint { "details" => ?stat); for file in files.iter_mut() { - file.set_start_key(start_key.clone()); - file.set_end_key(end_key.clone()); + if is_raw_kv { + file.set_start_key(start_key.clone()); + file.set_end_key(end_key.clone()); + } file.set_start_version(start_ts.into_inner()); file.set_end_version(end_ts.into_inner()); } @@ -817,7 +858,7 @@ fn get_max_start_key(start_key: Option<&Key>, region: &Region) -> Option { /// A name consists with five parts: store id, region_id, a epoch version, the hash of range start key and timestamp. /// range start key is used to keep the unique file name for file, to handle different tables exists on the same region. /// local unix timestamp is used to keep the unique file name for file, to handle receive the same request after connection reset. -fn backup_file_name(store_id: u64, region: &Region, key: Option) -> String { +pub fn backup_file_name(store_id: u64, region: &Region, key: Option) -> String { let start = SystemTime::now(); let since_the_epoch = start .duration_since(UNIX_EPOCH) @@ -871,6 +912,7 @@ pub mod tests { use txn_types::SHORT_VALUE_MAX_LEN; use super::*; + use tikv_util::config::ReadableSize; #[derive(Clone)] pub struct MockRegionInfoProvider { @@ -939,6 +981,7 @@ pub mod tests { BackupConfig { num_threads: 4, batch_size: 8, + sst_max_size: ReadableSize::mb(144), }, ), ) @@ -1191,8 +1234,10 @@ pub mod tests { let resp = resp.unwrap(); assert!(!resp.has_error(), "{:?}", resp); let file_len = if *len <= SHORT_VALUE_MAX_LEN { 1 } else { 2 }; + let files = resp.get_files(); + info!("{:?}", files); assert_eq!( - resp.get_files().len(), + files.len(), file_len, /* default and write */ "{:?}", resp diff --git a/components/backup/src/lib.rs b/components/backup/src/lib.rs index aed4097eb639..e21baf2792db 100644 --- a/components/backup/src/lib.rs +++ b/components/backup/src/lib.rs @@ -16,7 +16,7 @@ mod metrics; mod service; mod writer; -pub use endpoint::{Endpoint, Task}; +pub use endpoint::{backup_file_name, Endpoint, Task}; pub use errors::{Error, Result}; pub use service::Service; pub use writer::{BackupRawKVWriter, BackupWriter}; diff --git a/components/backup/src/writer.rs b/components/backup/src/writer.rs index f9319d373606..c46de5edf0d7 100644 --- a/components/backup/src/writer.rs +++ b/components/backup/src/writer.rs @@ -10,13 +10,14 @@ use engine_traits::{ExternalSstFileInfo, SstCompressionType, SstWriter, SstWrite use external_storage::ExternalStorage; use futures_util::io::AllowStdIo; use kvproto::backup::File; +use kvproto::metapb::Region; use tikv::coprocessor::checksum_crc64_xor; use tikv::storage::txn::TxnEntry; use tikv_util::{self, box_err, file::Sha256Reader, time::Limiter}; use txn_types::KvPair; use crate::metrics::*; -use crate::{Error, Result}; +use crate::{backup_file_name, Error, Result}; struct Writer { writer: RocksSstWriter, @@ -110,12 +111,59 @@ impl Writer { } } +pub struct BackupWriterBuilder { + store_id: u64, + limiter: Limiter, + region: Region, + db: Arc, + compression_type: Option, + compression_level: i32, + sst_max_size: u64, +} + +impl BackupWriterBuilder { + pub fn new( + store_id: u64, + limiter: Limiter, + region: Region, + db: Arc, + compression_type: Option, + compression_level: i32, + sst_max_size: u64, + ) -> BackupWriterBuilder { + Self { + store_id, + limiter, + region, + db, + compression_type, + compression_level, + sst_max_size, + } + } + + pub fn build(&self, start_key: Vec) -> Result { + let key = tikv_util::file::sha256(&start_key).ok().map(hex::encode); + let store_id = self.store_id; + let name = backup_file_name(store_id, &self.region, key); + BackupWriter::new( + self.db.clone(), + &name, + self.compression_type, + self.compression_level, + self.limiter.clone(), + self.sst_max_size, + ) + } +} + /// A writer writes txn entries into SST files. pub struct BackupWriter { name: String, default: Writer, write: Writer, limiter: Limiter, + sst_max_size: u64, } impl BackupWriter { @@ -123,9 +171,10 @@ impl BackupWriter { pub fn new( db: Arc, name: &str, - limiter: Limiter, compression_type: Option, compression_level: i32, + limiter: Limiter, + sst_max_size: u64, ) -> Result { let default = RocksSstWriterBuilder::new() .set_in_memory(true) @@ -147,6 +196,7 @@ impl BackupWriter { default: Writer::new(default), write: Writer::new(write), limiter, + sst_max_size, }) } @@ -210,6 +260,14 @@ impl BackupWriter { .observe(start.elapsed().as_secs_f64()); Ok(files) } + + pub fn need_split_keys(&self) -> bool { + self.default.total_bytes + self.write.total_bytes >= self.sst_max_size + } + + pub fn need_flush_keys(&self) -> bool { + self.default.total_bytes + self.write.total_bytes > 0 + } } /// A writer writes Raw kv into SST files. @@ -290,6 +348,7 @@ impl BackupRawKVWriter { mod tests { use super::*; use engine_traits::Iterable; + use raftstore::store::util::new_peer; use std::collections::BTreeMap; use std::f64::INFINITY; use std::path::Path; @@ -351,14 +410,31 @@ mod tests { let storage = external_storage::create_storage(&backend).unwrap(); // Test empty file. - let mut writer = - BackupWriter::new(db.clone(), "foo", Limiter::new(INFINITY), None, 0).unwrap(); + let mut r = kvproto::metapb::Region::default(); + r.set_id(1); + r.mut_peers().push(new_peer(1, 1)); + let mut writer = BackupWriter::new( + db.clone(), + "foo", + None, + 0, + Limiter::new(INFINITY), + 144 * 1024 * 1024, + ) + .unwrap(); writer.write(vec![].into_iter(), false).unwrap(); assert!(writer.save(&storage).unwrap().is_empty()); // Test write only txn. - let mut writer = - BackupWriter::new(db.clone(), "foo1", Limiter::new(INFINITY), None, 0).unwrap(); + let mut writer = BackupWriter::new( + db.clone(), + "foo1", + None, + 0, + Limiter::new(INFINITY), + 144 * 1024 * 1024, + ) + .unwrap(); writer .write( vec![TxnEntry::Commit { @@ -384,7 +460,15 @@ mod tests { ); // Test write and default. - let mut writer = BackupWriter::new(db, "foo2", Limiter::new(INFINITY), None, 0).unwrap(); + let mut writer = BackupWriter::new( + db, + "foo2", + None, + 0, + Limiter::new(INFINITY), + 144 * 1024 * 1024, + ) + .unwrap(); writer .write( vec![ diff --git a/components/backup/tests/integrations/test_backup.rs b/components/backup/tests/integrations/test_backup.rs index 547aa0129900..865c97f02716 100644 --- a/components/backup/tests/integrations/test_backup.rs +++ b/components/backup/tests/integrations/test_backup.rs @@ -34,6 +34,7 @@ use tikv::coprocessor::dag::TiKVStorage; use tikv::storage::kv::Engine; use tikv::storage::SnapshotStore; use tikv_util::collections::HashMap; +use tikv_util::config::ReadableSize; use tikv_util::file::calc_crc32_bytes; use tikv_util::worker::Worker; use tikv_util::HandyRwLock; @@ -69,7 +70,7 @@ macro_rules! retry_req { } impl TestSuite { - fn new(count: usize) -> TestSuite { + fn new(count: usize, sst_max_size: u64) -> TestSuite { super::init(); let mut cluster = new_server_cluster(1, count); // Increase the Raft tick interval to make this test case running reliably. @@ -88,6 +89,7 @@ impl TestSuite { BackupConfig { num_threads: 4, batch_size: 8, + sst_max_size: ReadableSize(sst_max_size), }, ); let mut worker = Worker::new(format!("backup-{}", id)); @@ -347,9 +349,39 @@ fn make_unique_dir(path: &Path) -> PathBuf { unique } +fn assert_same_file_name(s1: String, s2: String) { + let tokens1: Vec<&str> = s1.split('_').collect(); + let tokens2: Vec<&str> = s2.split('_').collect(); + assert_eq!(tokens1.len(), tokens2.len()); + // 2_1_1_e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855_1609407693105_write.sst + // 2_1_1_e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855_1609407693199_write.sst + // should be equal + for i in 0..tokens1.len() { + if i != 4 { + assert_eq!(tokens1[i], tokens2[i]); + } + } +} + +fn assert_same_files(files1: Vec, files2: Vec) { + assert_eq!(files1.len(), files2.len()); + + // After https://github.com/tikv/tikv/pull/8707 merged. + // the backup file name will based on local timestamp. + // so the two backup's file name may not be same, we should skip this check. + for i in 0..files1.len() { + let mut f1 = files1[i].clone(); + let mut f2 = files2[i].clone(); + assert_same_file_name(f1.name, f2.name); + f1.name = "".to_string(); + f2.name = "".to_string(); + assert_eq!(f1, f2); + } +} + #[test] fn test_backup_and_import() { - let mut suite = TestSuite::new(3); + let mut suite = TestSuite::new(3, 144 * 1024 * 1024); // 3 version for each key. let key_count = 60; suite.must_kv_put(key_count, 3); @@ -441,14 +473,105 @@ fn test_backup_and_import() { &make_unique_dir(tmp.path()), ); let resps3 = block_on(rx.collect::>()); - assert_eq!(files1, resps3[0].files); + assert_same_files(files1.into_vec(), resps3[0].files.clone().into_vec()); + + suite.stop(); +} + +#[test] +fn test_backup_huge_range_and_import() { + let mut suite = TestSuite::new(3, 100); + // 3 version for each key. + // make sure we will have two batch files + let key_count = 1024 * 3 / 2; + suite.must_kv_put(key_count, 3); + + // Push down backup request. + let tmp = Builder::new().tempdir().unwrap(); + let backup_ts = suite.alloc_ts(); + let storage_path = make_unique_dir(tmp.path()); + let rx = suite.backup( + vec![], // start + vec![], // end + 0.into(), // begin_ts + backup_ts, + &storage_path, + ); + let resps1 = block_on(rx.collect::>()); + // Only leader can handle backup. + assert_eq!(resps1.len(), 1); + let files1 = resps1[0].files.clone(); + // Short value is piggybacked in write cf, so we get 1 sst at least. + assert!(!resps1[0].get_files().is_empty()); + assert_eq!(files1.len(), 2); + assert_ne!(files1[0].start_key, files1[0].end_key); + assert_ne!(files1[1].start_key, files1[1].end_key); + assert_eq!(files1[0].end_key, files1[1].start_key); + + // Use importer to restore backup files. + let backend = make_local_backend(&storage_path); + let storage = create_storage(&backend).unwrap(); + let region = suite.cluster.get_region(b""); + let mut sst_meta = SstMeta::default(); + sst_meta.region_id = region.get_id(); + sst_meta.set_region_epoch(region.get_region_epoch().clone()); + let mut metas = vec![]; + for f in files1.clone().into_iter() { + let mut reader = storage.read(&f.name); + let mut content = vec![]; + block_on(reader.read_to_end(&mut content)).unwrap(); + let mut m = sst_meta.clone(); + m.crc32 = calc_crc32_bytes(&content); + m.length = content.len() as _; + // set different uuid for each file + m.set_uuid(uuid::Uuid::new_v4().as_bytes().to_vec()); + m.cf_name = name_to_cf(&f.name).to_owned(); + metas.push((m, content)); + } + + for (m, c) in &metas { + for importer in suite.cluster.sim.rl().importers.values() { + let mut f = importer.create(m).unwrap(); + f.append(c).unwrap(); + f.finish().unwrap(); + } + + // Make ingest command. + let mut ingest = Request::default(); + ingest.set_cmd_type(CmdType::IngestSst); + ingest.mut_ingest_sst().set_sst(m.clone()); + let mut header = RaftRequestHeader::default(); + let leader = suite.context.get_peer().clone(); + header.set_peer(leader); + header.set_region_id(suite.context.get_region_id()); + header.set_region_epoch(suite.context.get_region_epoch().clone()); + let mut cmd = RaftCmdRequest::default(); + cmd.set_header(header); + cmd.mut_requests().push(ingest); + let resp = suite + .cluster + .call_command_on_leader(cmd, Duration::from_secs(5)) + .unwrap(); + assert!(!resp.get_header().has_error(), resp); + } + + // Backup file should have same contents. + let rx = suite.backup( + vec![], // start + vec![], // end + 0.into(), // begin_ts + backup_ts, + &make_unique_dir(tmp.path()), + ); + let resps3 = block_on(rx.collect::>()); + assert_same_files(files1.into_vec(), resps3[0].files.clone().into_vec()); suite.stop(); } #[test] fn test_backup_meta() { - let mut suite = TestSuite::new(3); + let mut suite = TestSuite::new(3, 144 * 1024 * 1024); // 3 version for each key. let key_count = 60; suite.must_kv_put(key_count, 3); @@ -492,7 +615,7 @@ fn test_backup_meta() { #[test] fn test_backup_rawkv() { - let mut suite = TestSuite::new(3); + let mut suite = TestSuite::new(3, 144 * 1024 * 1024); let key_count = 60; let cf = String::from(CF_DEFAULT); @@ -599,7 +722,7 @@ fn test_backup_rawkv() { #[test] fn test_backup_raw_meta() { - let suite = TestSuite::new(3); + let suite = TestSuite::new(3, 144 * 1024 * 1024); let key_count: u64 = 60; let cf = String::from(CF_DEFAULT); @@ -648,7 +771,7 @@ fn test_backup_raw_meta() { #[test] fn test_invalid_external_storage() { - let mut suite = TestSuite::new(1); + let mut suite = TestSuite::new(1, 144 * 1024 * 1024); // Put some data. suite.must_kv_put(3, 1); diff --git a/etc/config-template.toml b/etc/config-template.toml index cfed8847ba66..f5fffe1195fb 100644 --- a/etc/config-template.toml +++ b/etc/config-template.toml @@ -868,6 +868,14 @@ ## The default value is set to min(CPU_NUM * 0.75, 32). # num-threads = 24 +## Number of ranges to backup in one batch. +# batch = 8 + +## When Backup region [a,e) size exceeds `sst-max-size`, it will be backuped into several Files [a,b), +## [b,c), [c,d), [d,e) and the size of [a,b), [b,c), [c,d) will be `sst-max-size` (or a +## little larger). +# sst-max-size = "144MB" + [pessimistic-txn] ## Enable pessimistic transaction # enabled = true diff --git a/src/config.rs b/src/config.rs index 472837d4e177..f79c62a9b7c6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1982,6 +1982,7 @@ mod readpool_tests { pub struct BackupConfig { pub num_threads: usize, pub batch_size: usize, + pub sst_max_size: ReadableSize, } impl BackupConfig { @@ -1998,11 +1999,13 @@ impl BackupConfig { impl Default for BackupConfig { fn default() -> Self { + let default_coprocessor = CopConfig::default(); let cpu_num = SysQuota::new().cpu_cores_quota(); Self { // use at most 75% of vCPU by default num_threads: (cpu_num * 0.75).clamp(1.0, 32.0) as usize, batch_size: 8, + sst_max_size: default_coprocessor.region_max_size, } } } @@ -2344,6 +2347,19 @@ impl TiKvConfig { + self.raftdb.defaultcf.block_cache_size.0, }); } + if self.backup.sst_max_size.0 < default_coprocessor.region_max_size.0 / 10 { + warn!( + "override backup.sst-max-size with min sst-max-size, {:?}", + default_coprocessor.region_max_size / 10 + ); + self.backup.sst_max_size = default_coprocessor.region_max_size / 10; + } else if self.backup.sst_max_size.0 > default_coprocessor.region_max_size.0 * 2 { + warn!( + "override backup.sst-max-size with max sst-max-size, {:?}", + default_coprocessor.region_max_size * 2 + ); + self.backup.sst_max_size = default_coprocessor.region_max_size * 2; + } self.readpool.adjust_use_unified_pool(); } diff --git a/src/storage/txn/store.rs b/src/storage/txn/store.rs index 65c3d121b455..cf11027efb84 100644 --- a/src/storage/txn/store.rs +++ b/src/storage/txn/store.rs @@ -162,6 +162,16 @@ impl TxnEntry { _ => unreachable!(), } } + /// This method will generate this kv pair's key + pub fn to_key(&self) -> Result { + match self { + TxnEntry::Commit { write, .. } => Ok(Key::from_encoded_slice( + Key::truncate_ts_for(&write.0).unwrap(), + )), + // Prewrite are not support + _ => unreachable!(), + } + } } /// A batch of transaction entries. diff --git a/tests/integrations/config/mod.rs b/tests/integrations/config/mod.rs index 9a5ce005d79c..e607615ad400 100644 --- a/tests/integrations/config/mod.rs +++ b/tests/integrations/config/mod.rs @@ -595,6 +595,7 @@ fn test_serde_custom_tikv_config() { value.backup = BackupConfig { num_threads: 456, batch_size: 7, + sst_max_size: ReadableSize::mb(789), }; value.import = ImportConfig { num_threads: 123, diff --git a/tests/integrations/config/test-custom.toml b/tests/integrations/config/test-custom.toml index 1048f42ce44c..dd2802305ece 100644 --- a/tests/integrations/config/test-custom.toml +++ b/tests/integrations/config/test-custom.toml @@ -502,6 +502,7 @@ type = "plaintext" [backup] num-threads = 456 batch-size = 7 +sst-max-size = "789MB" [import] num-threads = 123