diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index 1790046f2d..784f0ba894 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -60,7 +60,7 @@ pub trait CompactionPicker { fn pick_compaction( &self, ctx: PickerContext, - levels_controller: &LevelsController, + levels_controller: &mut LevelsController, ) -> Result; } @@ -123,7 +123,7 @@ impl CompactionPicker for CommonCompactionPicker { fn pick_compaction( &self, ctx: PickerContext, - levels_controller: &LevelsController, + levels_controller: &mut LevelsController, ) -> Result { let expire_time = ctx.ttl.map(Timestamp::expire_time); let mut builder = @@ -732,8 +732,8 @@ mod tests { }; let now = Timestamp::now(); { - let lc = build_old_bucket_case(now.as_i64()); - let task = twp.pick_compaction(ctx.clone(), &lc).unwrap(); + let mut lc = build_old_bucket_case(now.as_i64()); + let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap(); assert_eq!(task.inputs[0].files.len(), 2); assert_eq!(task.inputs[0].files[0].id(), 0); assert_eq!(task.inputs[0].files[1].id(), 1); @@ -742,8 +742,8 @@ mod tests { } { - let lc = build_newest_bucket_case(now.as_i64()); - let task = twp.pick_compaction(ctx.clone(), &lc).unwrap(); + let mut lc = build_newest_bucket_case(now.as_i64()); + let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap(); assert_eq!(task.inputs[0].files.len(), 4); assert_eq!(task.inputs[0].files[0].id(), 2); assert_eq!(task.inputs[0].files[1].id(), 3); @@ -752,16 +752,16 @@ mod tests { } { - let lc = build_newest_bucket_no_match_case(now.as_i64()); - let task = twp.pick_compaction(ctx.clone(), &lc).unwrap(); + let mut lc = build_newest_bucket_no_match_case(now.as_i64()); + let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap(); assert_eq!(task.inputs.len(), 0); } // If ttl is None, then no file is expired. ctx.ttl = None; { - let lc = build_old_bucket_case(now.as_i64()); - let task = twp.pick_compaction(ctx, &lc).unwrap(); + let mut lc = build_old_bucket_case(now.as_i64()); + let task = twp.pick_compaction(ctx, &mut lc).unwrap(); assert_eq!(task.inputs[0].files.len(), 2); assert_eq!(task.inputs[0].files[0].id(), 0); assert_eq!(task.inputs[0].files[1].id(), 1); diff --git a/analytic_engine/src/table/version.rs b/analytic_engine/src/table/version.rs index 396dd66610..ddb7ffd21d 100644 --- a/analytic_engine/src/table/version.rs +++ b/analytic_engine/src/table/version.rs @@ -727,11 +727,9 @@ impl TableVersion { picker_ctx: PickerContext, picker: &CompactionPickerRef, ) -> picker::Result { - // Pick will set FileHandle to being_compacted state, so we require - // write lock here to prevent other threads pick same file to compact. - let inner = self.inner.write().unwrap(); + let mut inner = self.inner.write().unwrap(); - picker.pick_compaction(picker_ctx, &inner.levels_controller) + picker.pick_compaction(picker_ctx, &mut inner.levels_controller) } pub fn has_expired_sst(&self, expire_time: Option) -> bool {