From 43331db9838094fc0c3dfe1f878c4bba0e902466 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Sat, 4 Feb 2023 01:22:16 +0800 Subject: [PATCH] fix: reduce allocations in copy_to_overlay --- Cargo.toml | 2 +- src/column.rs | 67 ++++++++++++++------------- src/db.rs | 117 +++++++++++++++++++++++++++++------------------ src/migration.rs | 22 ++++----- 4 files changed, 119 insertions(+), 89 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e88d0850..4e18031a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ snap = "1" loom = { version = "0.5.1", optional = true } [dev-dependencies] -env_logger = "0.9.0" +env_logger = "0.10.0" fdlimit = "0.2.1" rand = { version = "0.8.2", features = ["small_rng"] } tempfile = "3.2" diff --git a/src/column.rs b/src/column.rs index 036f861b..f70ec86a 100644 --- a/src/column.rs +++ b/src/column.rs @@ -4,7 +4,7 @@ use crate::{ btree::BTreeTable, compress::Compress, - db::{check::CheckDisplay, Operation}, + db::{check::CheckDisplay, Operation, ValuePtr}, display::hex, error::{Error, Result}, index::{Address, IndexTable, PlanOutcome, TableId as IndexTableId}, @@ -167,14 +167,14 @@ impl HashColumn { if self.collect_stats { self.stats.query_hit(tier); } - return Ok(Some(value)) + return Ok(Some(value)); } for r in &self.reindex.read().queue { if let Some((tier, value)) = self.get_in_index(key, r, values, log)? { if self.collect_stats { self.stats.query_hit(tier); } - return Ok(Some(value)) + return Ok(Some(value)); } } if self.collect_stats { @@ -238,7 +238,7 @@ impl Column { tables.tables[size_tier].query(&mut key, address.offset(), log)? { let value = if compressed { tables.compression.decompress(&value)? } else { value }; - return Ok(Some((size_tier as u8, value))) + return Ok(Some((size_tier as u8, value))); } Ok(None) } @@ -419,7 +419,7 @@ impl HashColumn { ) -> Result { if Self::contains_partial_key_with_address(key, address, &tables.index, log)? { log::trace!(target: "parity-db", "{}: Skipped reindex entry {} when reindexing", tables.index.id, hex(key)); - return Ok(PlanOutcome::Skipped) + return Ok(PlanOutcome::Skipped); } let mut outcome = PlanOutcome::Written; while let PlanOutcome::NeedReindex = @@ -448,7 +448,7 @@ impl HashColumn { &table_key, log, )? { - return Ok(Some((index, sub_index, existing_address))) + return Ok(Some((index, sub_index, existing_address))); } let (next_entry, next_index) = index.get(key, sub_index + 1, log)?; @@ -468,7 +468,7 @@ impl HashColumn { while !existing_entry.is_empty() { let existing_address = existing_entry.address(index.id.index_bits()); if existing_address == address { - return Ok(true) + return Ok(true); } let (next_entry, next_index) = index.get(key, sub_index + 1, log)?; existing_entry = next_entry; @@ -484,13 +484,13 @@ impl HashColumn { log: &LogWriter, ) -> Result> { if let Some(r) = Self::search_index(key, &tables.index, tables, log)? { - return Ok(Some(r)) + return Ok(Some(r)); } // Check old indexes // TODO: don't search if index precedes reindex progress for index in &reindex.queue { if let Some(r) = Self::search_index(key, index, tables, log)? { - return Ok(Some(r)) + return Ok(Some(r)); } } Ok(None) @@ -498,7 +498,7 @@ impl HashColumn { pub fn write_plan( &self, - change: &Operation>, + change: &Operation, log: &mut LogWriter, ) -> Result { let tables = self.tables.upgradable_read(); @@ -509,7 +509,8 @@ impl HashColumn { } else { match change { Operation::Set(key, value) => { - let (r, _, _) = self.write_plan_new(tables, reindex, key, value, log)?; + let (r, _, _) = + self.write_plan_new(tables, reindex, key, value.as_ref(), log)?; Ok(r) }, Operation::Dereference(key) => { @@ -534,7 +535,7 @@ impl HashColumn { fn write_plan_existing( &self, tables: &Tables, - change: &Operation>, + change: &Operation, log: &mut LogWriter, index: &IndexTable, sub_index: usize, @@ -645,7 +646,7 @@ impl HashColumn { if record.table.index_bits() < tables.index.id.index_bits() { // Insertion into a previously dropped index. log::warn!( target: "parity-db", "Index {} is too old. Current is {}", record.table, tables.index.id); - return Err(Error::Corruption("Unexpected log index id".to_string())) + return Err(Error::Corruption("Unexpected log index id".to_string())); } // Re-launch previously started reindex // TODO: add explicit log records for reindexing events. @@ -656,7 +657,7 @@ impl HashColumn { ); let lock = Self::trigger_reindex(tables, reindex, self.path.as_path()); std::mem::drop(lock); - return self.validate_plan(LogAction::InsertIndex(record), log) + return self.validate_plan(LogAction::InsertIndex(record), log); } }, LogAction::InsertValue(record) => { @@ -664,7 +665,7 @@ impl HashColumn { }, _ => { log::error!(target: "parity-db", "Unexpected log action"); - return Err(Error::Corruption("Unexpected log action".to_string())) + return Err(Error::Corruption("Unexpected log action".to_string())); }, } Ok(()) @@ -708,8 +709,9 @@ impl HashColumn { pub fn iter_while(&self, log: &Log, mut f: impl FnMut(IterState) -> bool) -> Result<()> { let action = |state| match state { IterStateOrCorrupted::Item(item) => Ok(f(item)), - IterStateOrCorrupted::Corrupted(..) => - Err(Error::Corruption("Missing indexed value".into())), + IterStateOrCorrupted::Corrupted(..) => { + Err(Error::Corruption("Missing indexed value".into())) + }, }; self.iter_while_inner(log, action, 0, true) } @@ -736,7 +738,7 @@ impl HashColumn { if let Ok(value) = self.compression.decompress(&value) { value } else { - return false + return false; } } else { value @@ -759,7 +761,7 @@ impl HashColumn { let entries = source.entries(c, log.overlays())?; for entry in entries.iter() { if entry.is_empty() { - continue + continue; } let (size_tier, offset) = if self.db_version >= 4 { let address = entry.address(source.id.index_bits()); @@ -772,21 +774,21 @@ impl HashColumn { (size_tier, offset) }; - if skip_preimage_indexes && - self.preimage && size_tier as usize != tables.value.len() - 1 + if skip_preimage_indexes + && self.preimage && size_tier as usize != tables.value.len() - 1 { - continue + continue; } let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays()); let (value, rc, pk, compressed) = match value { Ok(Some(v)) => v, Ok(None) => { f(IterStateOrCorrupted::Corrupted(*entry, None))?; - continue + continue; }, Err(e) => { f(IterStateOrCorrupted::Corrupted(*entry, Some(e)))?; - continue + continue; }, }; let mut key = source.recover_key_prefix(c, *entry); @@ -804,7 +806,7 @@ impl HashColumn { let state = IterStateOrCorrupted::Item(IterState { chunk_index: c, key, rc, value }); if !f(state)? { - return Ok(()) + return Ok(()); } } } @@ -824,7 +826,7 @@ impl HashColumn { |state| match state { IterStateOrCorrupted::Item(IterState { chunk_index, key, rc, value }) => { if Some(chunk_index) == end_chunk { - return Ok(false) + return Ok(false); } if chunk_index % step == 0 { log::info!(target: "parity-db", "Chunk iteration at {}", chunk_index); @@ -883,7 +885,7 @@ impl HashColumn { let entries = source.entries(source_index, log.overlays())?; for entry in entries.iter() { if entry.is_empty() { - continue + continue; } // We only need key prefix to reindex. let key = source.recover_key_prefix(source_index, *entry); @@ -911,7 +913,7 @@ impl HashColumn { table.unwrap().drop_file()?; } else { log::warn!(target: "parity-db", "Dropping invalid index {}", id); - return Ok(()) + return Ok(()); } log::debug!(target: "parity-db", "Dropped {}", id); Ok(()) @@ -948,7 +950,7 @@ impl Column { }; match change { - Operation::Reference(_) => + Operation::Reference(_) => { if ref_counted { log::trace!(target: "parity-db", "{}: Increment ref {}", tables.col, key); tables.tables[tier].write_inc_ref(address.offset(), log)?; @@ -958,16 +960,17 @@ impl Column { Ok((Some(PlanOutcome::Written), None)) } else { Ok((Some(PlanOutcome::Skipped), None)) - }, + } + }, Operation::Set(_, val) => { if ref_counted { log::trace!(target: "parity-db", "{}: Increment ref {}", tables.col, key); tables.tables[tier].write_inc_ref(address.offset(), log)?; - return Ok((Some(PlanOutcome::Written), None)) + return Ok((Some(PlanOutcome::Written), None)); } if tables.preimage { // Replace is not supported - return Ok((Some(PlanOutcome::Skipped), None)) + return Ok((Some(PlanOutcome::Skipped), None)); } let (cval, target_tier) = diff --git a/src/db.rs b/src/db.rs index 6216da7e..59cac476 100644 --- a/src/db.rs +++ b/src/db.rs @@ -56,6 +56,27 @@ const KEEP_LOGS: usize = 16; /// Value is just a vector of bytes. Value sizes up to 4Gb are allowed. pub type Value = Vec; +#[derive(Debug, Clone)] +pub struct ValuePtr(Arc); + +impl ValuePtr { + pub fn value(&self) -> &Value { + &*self.0 + } +} + +impl AsRef<[u8]> for ValuePtr { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +impl From for ValuePtr { + fn from(value: Value) -> Self { + Self(value.into()) + } +} + // Commit data passed to `commit` #[derive(Debug, Default)] struct Commit { @@ -136,7 +157,7 @@ impl DbInner { if opening_mode == OpeningMode::Create { try_io!(std::fs::create_dir_all(&options.path)); } else if !options.path.is_dir() { - return Err(Error::DatabaseNotFound) + return Err(Error::DatabaseNotFound); } let mut lock_path: std::path::PathBuf = options.path.clone(); @@ -192,7 +213,7 @@ impl DbInner { let overlay = self.commit_overlay.read(); // Check commit overlay first if let Some(v) = overlay.get(col as usize).and_then(|o| o.get(&key)) { - return Ok(v) + return Ok(v.map(|i| i.value().clone())); } // Go into tables and log overlay. let log = self.log.overlays(); @@ -201,7 +222,7 @@ impl DbInner { Column::Tree(column) => { let overlay = self.commit_overlay.read(); if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) { - return Ok(l.cloned()) + return Ok(l.cloned()); } // We lock log, if btree structure changed while reading that would be an issue. let log = self.log.overlays().read(); @@ -217,7 +238,7 @@ impl DbInner { let overlay = self.commit_overlay.read(); // Check commit overlay first if let Some(l) = overlay.get(col as usize).and_then(|o| o.get_size(&key)) { - return Ok(l) + return Ok(l); } // Go into tables and log overlay. let log = self.log.overlays(); @@ -226,7 +247,7 @@ impl DbInner { Column::Tree(column) => { let overlay = self.commit_overlay.read(); if let Some(l) = overlay.get(col as usize).and_then(|o| o.btree_get(key)) { - return Ok(l.map(|v| v.len() as u32)) + return Ok(l.map(|v| v.len() as u32)); } let log = self.log.overlays().read(); let l = column.with_locked(|btree| BTreeTable::get(key, &*log, btree))?; @@ -237,8 +258,9 @@ impl DbInner { fn btree_iter(&self, col: ColId) -> Result { match &self.columns[col as usize] { - Column::Hash(_column) => - Err(Error::InvalidConfiguration("Not an indexed column.".to_string())), + Column::Hash(_column) => { + Err(Error::InvalidConfiguration("Not an indexed column.".to_string())) + }, Column::Tree(column) => { let log = self.log.overlays(); BTreeIterator::new(column, col, log, &self.commit_overlay) @@ -303,7 +325,7 @@ impl DbInner { { let bg_err = self.bg_err.lock(); if let Some(err) = &*bg_err { - return Err(Error::Background(err.clone())) + return Err(Error::Background(err.clone())); } } @@ -368,8 +390,8 @@ impl DbInner { commit.bytes, queue.bytes, ); - if queue.bytes <= MAX_COMMIT_QUEUE_BYTES && - (queue.bytes + commit.bytes) > MAX_COMMIT_QUEUE_BYTES + if queue.bytes <= MAX_COMMIT_QUEUE_BYTES + && (queue.bytes + commit.bytes) > MAX_COMMIT_QUEUE_BYTES { // Past the waiting threshold. log::debug!( @@ -406,10 +428,11 @@ impl DbInner { for (c, btree) in commit.changeset.btree_indexed.iter_mut() { match &self.columns[*c as usize] { - Column::Hash(_column) => + Column::Hash(_column) => { return Err(Error::InvalidConfiguration( "Not an indexed column.".to_string(), - )), + )) + }, Column::Tree(column) => { btree.write_plan(column, &mut writer, &mut ops)?; }, @@ -468,7 +491,7 @@ impl DbInner { fn process_reindex(&self) -> Result { let next_reindex = self.next_reindex.load(Ordering::SeqCst); if next_reindex == 0 || next_reindex > self.last_enacted.load(Ordering::SeqCst) { - return Ok(false) + return Ok(false); } // Process any pending reindexes for column in self.columns.iter() { @@ -508,7 +531,7 @@ impl DbInner { self.start_reindex(record_id); } self.flush_worker_wait.signal(); - return Ok(true) + return Ok(true); } } self.next_reindex.store(0, Ordering::SeqCst); @@ -522,7 +545,7 @@ impl DbInner { Err(Error::Corruption(_)) if validation_mode => { log::debug!(target: "parity-db", "Bad log header"); self.log.clear_replay_logs(); - return Ok(false) + return Ok(false); }, Err(e) => return Err(e), }; @@ -542,7 +565,7 @@ impl DbInner { ); drop(reader); self.log.clear_replay_logs(); - return Ok(false) + return Ok(false); } // Validate all records before applying anything loop { @@ -552,7 +575,7 @@ impl DbInner { log::debug!(target: "parity-db", "Error reading log: {:?}", e); drop(reader); self.log.clear_replay_logs(); - return Ok(false) + return Ok(false); }, }; match next { @@ -560,7 +583,7 @@ impl DbInner { log::debug!(target: "parity-db", "Unexpected log header"); drop(reader); self.log.clear_replay_logs(); - return Ok(false) + return Ok(false); }, LogAction::EndRecord => break, LogAction::InsertIndex(insertion) => { @@ -577,7 +600,7 @@ impl DbInner { log::warn!(target: "parity-db", "Error replaying log: {:?}. Reverting", e); drop(reader); self.log.clear_replay_logs(); - return Ok(false) + return Ok(false); } }, LogAction::InsertValue(insertion) => { @@ -594,7 +617,7 @@ impl DbInner { log::warn!(target: "parity-db", "Error replaying log: {:?}. Reverting", e); drop(reader); self.log.clear_replay_logs(); - return Ok(false) + return Ok(false); } }, LogAction::DropTable(_) => continue, @@ -605,8 +628,9 @@ impl DbInner { } loop { match reader.next()? { - LogAction::BeginRecord => - return Err(Error::Corruption("Bad log record".into())), + LogAction::BeginRecord => { + return Err(Error::Corruption("Bad log record".into())) + }, LogAction::EndRecord => break, LogAction::InsertIndex(insertion) => { self.columns[insertion.table.col() as usize] @@ -666,8 +690,8 @@ impl DbInner { ); } *queue -= bytes as i64; - if *queue <= MAX_LOG_QUEUE_BYTES && - (*queue + bytes as i64) > MAX_LOG_QUEUE_BYTES + if *queue <= MAX_LOG_QUEUE_BYTES + && (*queue + bytes as i64) > MAX_LOG_QUEUE_BYTES { self.log_queue_wait.cv.notify_one(); } @@ -741,7 +765,7 @@ impl DbInner { // to no attempt any further log enactment. log::debug!(target: "parity-db", "Shutdown with error state {}", err); self.log.clean_logs(self.log.num_dirty_logs())?; - return self.log.kill_logs() + return self.log.kill_logs(); } } log::debug!(target: "parity-db", "Processing leftover commits"); @@ -855,7 +879,7 @@ impl Db { log::debug!(target: "parity-db", "Error during log replay, doing log cleanup"); db.log.clean_logs(db.log.num_dirty_logs())?; db.log.kill_logs()?; - return Err(e) + return Err(e); } let db = Arc::new(db); #[cfg(any(test, feature = "instrumentation"))] @@ -1124,7 +1148,7 @@ impl Drop for Db { } } -pub type IndexedCommitOverlay = HashMap), IdentityBuildHasher>; +pub type IndexedCommitOverlay = HashMap), IdentityBuildHasher>; pub type BTreeCommitOverlay = BTreeMap, (u64, Option)>; #[derive(Debug)] @@ -1145,16 +1169,16 @@ impl CommitOverlay { } impl CommitOverlay { - fn get_ref(&self, key: &[u8]) -> Option> { + fn get_ref(&self, key: &[u8]) -> Option> { self.indexed.get(key).map(|(_, v)| v.as_ref()) } - fn get(&self, key: &[u8]) -> Option> { + fn get(&self, key: &[u8]) -> Option> { self.get_ref(key).map(|v| v.cloned()) } fn get_size(&self, key: &[u8]) -> Option> { - self.get_ref(key).map(|res| res.as_ref().map(|b| b.len() as u32)) + self.get_ref(key).map(|res| res.as_ref().map(|b| b.value().len() as u32)) } fn btree_get(&self, key: &[u8]) -> Option> { @@ -1271,7 +1295,7 @@ pub struct CommitChangeSet { #[derive(Debug)] pub struct IndexedChangeSet { pub col: ColId, - pub changes: Vec>>, + pub changes: Vec>, } impl IndexedChangeSet { @@ -1291,13 +1315,13 @@ impl IndexedChangeSet { }; self.push_change_hashed(match change { - Operation::Set(k, v) => Operation::Set(hash_key(k.as_ref()), v), + Operation::Set(k, v) => Operation::Set(hash_key(k.as_ref()), v.into()), Operation::Dereference(k) => Operation::Dereference(hash_key(k.as_ref())), Operation::Reference(k) => Operation::Reference(hash_key(k.as_ref())), }) } - fn push_change_hashed(&mut self, change: Operation>) { + fn push_change_hashed(&mut self, change: Operation) { self.changes.push(change); } @@ -1313,7 +1337,7 @@ impl IndexedChangeSet { match &change { Operation::Set(k, v) => { *bytes += k.len(); - *bytes += v.len(); + *bytes += v.value().len(); overlay.indexed.insert(*k, (record_id, Some(v.clone()))); }, Operation::Dereference(k) => { @@ -1326,7 +1350,7 @@ impl IndexedChangeSet { // Don't add (we allow remove value in overlay when using rc: some // indexing on top of it is expected). if !ref_counted { - return Err(Error::InvalidInput(format!("No Rc for column {}", self.col))) + return Err(Error::InvalidInput(format!("No Rc for column {}", self.col))); } }, } @@ -1345,7 +1369,7 @@ impl IndexedChangeSet { Column::Hash(column) => column, Column::Tree(_) => { log::warn!(target: "parity-db", "Skipping unindex commit in indexed column"); - return Ok(()) + return Ok(()); }, }; for change in self.changes.iter() { @@ -1362,12 +1386,13 @@ impl IndexedChangeSet { use std::collections::hash_map::Entry; for change in self.changes.iter() { match change { - Operation::Set(k, _) | Operation::Dereference(k) => + Operation::Set(k, _) | Operation::Dereference(k) => { if let Entry::Occupied(e) = overlay.indexed.entry(*k) { if e.get().0 == record_id { e.remove_entry(); } - }, + } + }, Operation::Reference(..) => (), } } @@ -1466,8 +1491,8 @@ mod tests { fn run_stages(&self, db: &Db) { let db = &db.inner; - if *self == EnableCommitPipelineStages::DbFile || - *self == EnableCommitPipelineStages::LogOverlay + if *self == EnableCommitPipelineStages::DbFile + || *self == EnableCommitPipelineStages::LogOverlay { while db.process_commits().unwrap() {} while db.process_reindex().unwrap() {} @@ -1494,7 +1519,7 @@ mod tests { // or removed. std::thread::sleep(std::time::Duration::from_millis(100)); } else { - return false + return false; } } } @@ -2026,18 +2051,20 @@ mod tests { let mut remove = false; let mut insert = false; match state.get_mut(k) { - Some(Some((_, counter))) => + Some(Some((_, counter))) => { if v.is_some() { *counter += 1; } else if *counter == 1 { remove = true; } else { *counter -= 1; - }, - Some(None) | None => + } + }, + Some(None) | None => { if v.is_some() { insert = true; - }, + } + }, } if insert { state.insert(k.clone(), v.clone().map(|v| (v, 1))); diff --git a/src/migration.rs b/src/migration.rs index 6a7e050e..ac9ce1dd 100644 --- a/src/migration.rs +++ b/src/migration.rs @@ -31,14 +31,14 @@ pub fn migrate(from: &Path, mut to: Options, overwrite: bool, force_migrate: &[u to_migrate.insert(*force); } if source_meta.columns.len() != to.columns.len() { - return Err(Error::Migration("Source and dest columns mismatch".into())) + return Err(Error::Migration("Source and dest columns mismatch".into())); } // Make sure we are using the same salt value. to.salt = Some(source_meta.salt); if (to.salt.is_none()) && overwrite { - return Err(Error::Migration("Changing salt need to update metadata at once.".into())) + return Err(Error::Migration("Changing salt need to update metadata at once.".into())); } let mut source_options = Options::with_columns(from, source_meta.columns.len() as u8); @@ -62,7 +62,7 @@ pub fn migrate(from: &Path, mut to: Options, overwrite: bool, force_migrate: &[u if source_options.columns[*c as usize].btree_index || to.columns[*c as usize].btree_index { return Err(Error::Migration( "Migrate only implemented for hash indexed column to hash indexed column".into(), - )) + )); } } @@ -73,7 +73,7 @@ pub fn migrate(from: &Path, mut to: Options, overwrite: bool, force_migrate: &[u copy_column(c, from, &to.path)?; dest = Db::open_or_create(&to)?; } - continue + continue; } log::info!("Migrating col {}", c); source.iter_column_while(c, |IterState { chunk_index: index, key, rc, mut value }| { @@ -85,13 +85,13 @@ pub fn migrate(from: &Path, mut to: Options, overwrite: bool, force_migrate: &[u .entry(c) .or_insert_with(|| IndexedChangeSet::new(c)) .changes - .push(Operation::Set(key, value)); + .push(Operation::Set(key, value.into())); nb_commit += 1; if nb_commit == COMMIT_SIZE { ncommits += 1; if let Err(e) = dest.commit_raw(std::mem::take(&mut commit)) { log::warn!("Migration error: {:?}", e); - return false + return false; } nb_commit = 0; @@ -157,7 +157,7 @@ pub fn clear_column(path: &Path, column: ColId) -> Result<()> { .ok_or_else(|| Error::Migration("Error loading source metadata".into()))?; if (column as usize) >= meta.columns.len() { - return Err(Error::Migration("Invalid column index".into())) + return Err(Error::Migration("Invalid column index".into())); } // Validate the database by opening. This also makes sure all the logs are enacted, @@ -174,8 +174,8 @@ pub fn clear_column(path: &Path, column: ColId) -> Result<()> { for entry in try_io!(std::fs::read_dir(path)) { let entry = try_io!(entry); if let Some(file) = entry.path().file_name().and_then(|f| f.to_str()) { - if crate::index::TableId::is_file_name(column, file) || - crate::table::TableId::is_file_name(column, file) + if crate::index::TableId::is_file_name(column, file) + || crate::table::TableId::is_file_name(column, file) { to_delete.push(PathBuf::from(file)); } @@ -202,8 +202,8 @@ fn deplace_column(c: ColId, from: &Path, to: &Path, copy: bool) -> Result<()> { for entry in try_io!(std::fs::read_dir(from)) { let entry = try_io!(entry); if let Some(file) = entry.path().file_name().and_then(|f| f.to_str()) { - if crate::index::TableId::is_file_name(c, file) || - crate::table::TableId::is_file_name(c, file) + if crate::index::TableId::is_file_name(c, file) + || crate::table::TableId::is_file_name(c, file) { let mut from = from.to_path_buf(); from.push(file);