Skip to content

Commit

Permalink
Add logging to merging vector segments (#31833)
Browse files Browse the repository at this point in the history
Add logging to merging vector segments so we can see which segments are being merged. Also adds a check for the id tracker not having any soft deletes earlier so we know exactly which segment update breaks it.

GitOrigin-RevId: a324518cd39293b4d1e10d98b1b5446eeef0fe66
  • Loading branch information
emmaling27 authored and Convex, Inc. committed Nov 27, 2024
1 parent 6cae8a7 commit 2176c06
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 9 deletions.
21 changes: 17 additions & 4 deletions crates/search/src/fragmented_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub(crate) struct FragmentedSegmentFetcher<RT: Runtime> {
archive_cache: ArchiveCacheManager<RT>,
}

#[derive(Debug)]
pub struct FragmentedSegmentStorageKeys {
pub segment: ObjectKey,
pub id_tracker: ObjectKey,
Expand Down Expand Up @@ -159,7 +160,7 @@ impl<RT: Runtime> FragmentedSegmentCompactor<RT> {
}
}

pub async fn compact<'a, T: TryInto<FragmentedSegmentStorageKeys> + Send + 'a>(
pub async fn compact<'a, T: TryInto<FragmentedSegmentStorageKeys> + Clone + Send + 'a>(
&'a self,
segments: Vec<T>,
dimension: usize,
Expand All @@ -170,15 +171,24 @@ impl<RT: Runtime> FragmentedSegmentCompactor<RT> {
<T as TryInto<FragmentedSegmentStorageKeys>>::Error: From<std::io::Error> + Send,
<T as TryInto<FragmentedSegmentStorageKeys>>::Error: From<anyhow::Error> + 'static,
{
let segment_keys: Vec<FragmentedSegmentStorageKeys> = segments
.clone()
.into_iter()
.map(|s| s.try_into())
.try_collect()?;
tracing::info!("Compacting {} segments: {:?}", segments.len(), segment_keys);
let timer = vector_compact_seconds_timer();
let fetch_timer = vector_compact_fetch_segments_seconds_timer();
let segments: Vec<_> = self
.segment_fetcher
.stream_fetch_fragmented_segments(search_storage.clone(), segments)
.and_then(|paths| async move {
self.blocking_thread_pool
let paths_clone = paths.clone();
let segment = self
.blocking_thread_pool
.execute(|| load_disk_segment(paths))
.await?
.await??;
anyhow::Ok((paths_clone, segment))
})
.try_collect()
.await?;
Expand All @@ -196,7 +206,10 @@ impl<RT: Runtime> FragmentedSegmentCompactor<RT> {
.execute(move || {
let timer = vector_compact_construct_segment_seconds_timer();
let result = merge_disk_segments_hnsw(
segments.iter().collect_vec(),
segments
.iter()
.map(|(paths, segment)| (Some(paths.clone()), segment))
.collect_vec(),
dimension,
&scratch_dir,
&target_path,
Expand Down
19 changes: 14 additions & 5 deletions crates/vector/src/qdrant_segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,11 @@ pub fn build_disk_segment(
disk_path: &Path,
segment_config: SegmentConfig,
) -> anyhow::Result<VectorDiskSegmentValues> {
merge_disk_segments(vec![segment], tmp_path, disk_path, segment_config)
merge_disk_segments(vec![(None, segment)], tmp_path, disk_path, segment_config)
}

pub fn merge_disk_segments_hnsw(
segments: Vec<&Segment>,
segments: Vec<(Option<UntarredVectorDiskSegmentPaths>, &Segment)>,
dimension: usize,
tmp_path: &Path,
disk_path: &Path,
Expand All @@ -265,7 +265,7 @@ pub fn merge_disk_segments_hnsw(
}

pub fn merge_disk_segments(
segments: Vec<&Segment>,
segments: Vec<(Option<UntarredVectorDiskSegmentPaths>, &Segment)>,
tmp_path: &Path,
disk_path: &Path,
segment_config: SegmentConfig,
Expand All @@ -285,8 +285,12 @@ pub fn merge_disk_segments(
let mut segment_builder =
SegmentBuilder::new(&tmp_segment_path, &segment_tmp_dir_path, &segment_config)?;
let stopped = AtomicBool::new(false);
for segment in segments {
for (paths, segment) in segments {
tracing::info!("Updating new segment with segment from paths {:?}", paths);
segment_builder.update_from(segment, &stopped)?;
if let Some(ref segment) = segment_builder.segment {
anyhow::ensure!(segment.id_tracker.borrow().deleted_point_count() == 0);
}
}
let permit = CpuPermit::dummy(4);
let disk_segment = segment_builder.build(permit, &stopped)?;
Expand Down Expand Up @@ -1317,7 +1321,12 @@ mod tests {
fs::create_dir_all(&indexing_path)?;
let disk_path = tmp_dir.path().join("disk");
fs::create_dir_all(&disk_path)?;
merge_disk_segments(segments, &indexing_path, &disk_path, config)
merge_disk_segments(
segments.into_iter().map(|s| (None, s)).collect(),
&indexing_path,
&disk_path,
config,
)
}

// One way this might happen is if we accidentally have the same vector in a
Expand Down

0 comments on commit 2176c06

Please sign in to comment.