From baaec213e6e9a79bb3ca8269034062a6177547f4 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 11 May 2024 15:40:41 +0200 Subject: [PATCH] add an example program to try reproduce odb errors due to parallelism. A motivating example is here: https://github.com/praetorian-inc/noseyparker/issues/179 --- gitoxide-core/src/repository/odb.rs | 76 ++++++++++++++++--- gix-odb/src/store_impls/dynamic/load_index.rs | 5 +- src/plumbing/main.rs | 8 +- src/plumbing/options/mod.rs | 6 +- 4 files changed, 81 insertions(+), 14 deletions(-) diff --git a/gitoxide-core/src/repository/odb.rs b/gitoxide-core/src/repository/odb.rs index 85c35618db2..239a082b2fa 100644 --- a/gitoxide-core/src/repository/odb.rs +++ b/gitoxide-core/src/repository/odb.rs @@ -1,4 +1,5 @@ use std::io; +use std::sync::atomic::Ordering; use anyhow::bail; @@ -50,6 +51,8 @@ pub mod statistics { pub struct Options { pub format: OutputFormat, pub thread_limit: Option, + /// A debug-flag that triggers looking up the headers of all objects again, but without indices preloaded + pub extra_header_lookup: bool, } } @@ -59,7 +62,11 @@ pub fn statistics( mut progress: impl gix::Progress, out: impl io::Write, mut err: impl io::Write, - statistics::Options { format, thread_limit }: statistics::Options, + statistics::Options { + format, + thread_limit, + extra_header_lookup, + }: statistics::Options, ) -> anyhow::Result<()> { use bytesize::ByteSize; use gix::odb::{find, HeaderExt}; @@ -76,6 +83,10 @@ pub fn statistics( #[cfg_attr(feature = "serde", derive(serde::Serialize))] #[derive(Default)] struct Statistics { + /// All objects that were used to produce these statistics. + /// Only `Some` if we are doing an extra round of header queries on a repository without loaded indices. + #[cfg_attr(feature = "serde", serde(skip_serializing))] + ids: Option>, total_objects: usize, loose_objects: usize, packed_objects: usize, @@ -135,14 +146,17 @@ pub fn statistics( } impl gix::parallel::Reduce for Reduce { - type Input = Result, anyhow::Error>; + type Input = Result, anyhow::Error>; type FeedProduce = (); type Output = Statistics; type Error = anyhow::Error; fn feed(&mut self, items: Self::Input) -> Result { - for item in items? { + for (id, item) in items? { self.stats.consume(item); + if let Some(ids) = self.stats.ids.as_mut() { + ids.push(id); + } } Ok(()) } @@ -154,9 +168,9 @@ pub fn statistics( } let cancelled = || anyhow::anyhow!("Cancelled by user"); - let object_ids = repo.objects.store_ref().iter()?.filter_map(Result::ok); + let object_ids = repo.objects.iter()?.filter_map(Result::ok); let chunk_size = 1_000; - let stats = if gix::parallel::num_threads(thread_limit) > 1 { + let mut stats = if gix::parallel::num_threads(thread_limit) > 1 { gix::parallel::in_parallel( gix::interrupt::Iter::new( gix::features::iter::Chunks { @@ -166,19 +180,30 @@ pub fn statistics( cancelled, ), thread_limit, - move |_| (repo.objects.clone().into_inner(), counter), + { + let objects = repo.objects.clone(); + move |_| (objects.clone().into_inner(), counter) + }, |ids, (handle, counter)| { let ids = ids?; - counter.fetch_add(ids.len(), std::sync::atomic::Ordering::Relaxed); + counter.fetch_add(ids.len(), Ordering::Relaxed); let out = ids .into_iter() - .map(|id| handle.header(id)) + .map(|id| handle.header(id).map(|hdr| (id, hdr))) .collect::, _>>()?; Ok(out) }, - Reduce::default(), + Reduce { + stats: Statistics { + ids: extra_header_lookup.then(Vec::new), + ..Default::default() + }, + }, )? } else { + if extra_header_lookup { + bail!("extra-header-lookup is only meaningful in threaded mode"); + } let mut stats = Statistics::default(); for (count, id) in object_ids.enumerate() { @@ -193,6 +218,39 @@ pub fn statistics( progress.show_throughput(start); + if let Some(mut ids) = stats.ids.take() { + // Critical to re-open the repo to assure we don't have any ODB state and start fresh. + let start = std::time::Instant::now(); + let repo = gix::open_opts(repo.git_dir(), repo.open_options().to_owned())?; + progress.set_name("re-counting".into()); + progress.init(Some(ids.len()), gix::progress::count("objects")); + let counter = progress.counter(); + counter.store(0, Ordering::Relaxed); + let errors = gix::parallel::in_parallel_with_slice( + &mut ids, + thread_limit, + { + let objects = repo.objects.clone(); + move |_| (objects.clone().into_inner(), counter, false) + }, + |id, (odb, counter, has_error), _threads_left, _stop_everything| -> anyhow::Result<()> { + counter.fetch_add(1, Ordering::Relaxed); + if let Err(err) = odb.header(id) { + *has_error = true; + gix::trace::error!(err = ?err, "Object that is known to be present wasn't found"); + } + Ok(()) + }, + || Some(std::time::Duration::from_millis(100)), + |(_, _, has_error)| has_error, + )?; + + progress.show_throughput(start); + if errors.contains(&true) { + bail!("At least one object couldn't be looked up even though it must exist"); + } + } + #[cfg(feature = "serde")] { serde_json::to_writer_pretty(out, &stats)?; diff --git a/gix-odb/src/store_impls/dynamic/load_index.rs b/gix-odb/src/store_impls/dynamic/load_index.rs index a255f6c4b58..2537cb5438c 100644 --- a/gix-odb/src/store_impls/dynamic/load_index.rs +++ b/gix-odb/src/store_impls/dynamic/load_index.rs @@ -119,7 +119,7 @@ impl super::Store { let slot = &self.files[index.slot_indices[slot_map_index]]; let _lock = slot.write.lock(); if slot.generation.load(Ordering::SeqCst) > index.generation { - // There is a disk consolidation in progress which just overwrote a slot that cold be disposed with some other + // There is a disk consolidation in progress which just overwrote a slot that could be disposed with some other // index, one we didn't intend to load. // Continue with the next slot index in the hope there is something else we can do… continue 'retry_with_next_slot_index; @@ -134,7 +134,8 @@ impl super::Store { slot.files.store(bundle); break 'retry_with_next_slot_index; } - Err(_) => { + Err(_err) => { + gix_features::trace::error!(err=?_err, "Failed to load index file - some objects may seem to not exist"); slot.files.store(bundle); continue 'retry_with_next_slot_index; } diff --git a/src/plumbing/main.rs b/src/plumbing/main.rs index 0f5a146d0ed..49b2c50fe9e 100644 --- a/src/plumbing/main.rs +++ b/src/plumbing/main.rs @@ -1153,7 +1153,7 @@ pub fn main() -> Result<()> { ), }, Subcommands::Odb(cmd) => match cmd { - odb::Subcommands::Stats => prepare_and_run( + odb::Subcommands::Stats { extra_header_lookup } => prepare_and_run( "odb-stats", trace, auto_verbose, @@ -1166,7 +1166,11 @@ pub fn main() -> Result<()> { progress, out, err, - core::repository::odb::statistics::Options { format, thread_limit }, + core::repository::odb::statistics::Options { + format, + thread_limit, + extra_header_lookup, + }, ) }, ), diff --git a/src/plumbing/options/mod.rs b/src/plumbing/options/mod.rs index b5fa3649a68..10d0312bc7b 100644 --- a/src/plumbing/options/mod.rs +++ b/src/plumbing/options/mod.rs @@ -586,7 +586,11 @@ pub mod odb { Info, /// Count and obtain information on all, possibly duplicate, objects in the database. #[clap(visible_alias = "statistics")] - Stats, + Stats { + /// Lookup headers again, but without preloading indices. + #[clap(long)] + extra_header_lookup: bool, + }, } }