Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: executes pending ddls if region memtable is empty while scheduling next flush #4119

Merged
merged 6 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 1 addition & 27 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,13 +518,10 @@ fn get_expired_ssts(

#[cfg(test)]
mod tests {
use std::sync::Mutex;

use tokio::sync::oneshot;

use super::*;
use crate::schedule::scheduler::{Job, Scheduler};
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
use crate::test_util::version_util::{apply_edit, VersionControlBuilder};

#[tokio::test]
Expand Down Expand Up @@ -574,29 +571,6 @@ mod tests {
assert!(scheduler.region_status.is_empty());
}

#[derive(Default)]
struct VecScheduler {
jobs: Mutex<Vec<Job>>,
}

impl VecScheduler {
fn num_jobs(&self) -> usize {
self.jobs.lock().unwrap().len()
}
}

#[async_trait::async_trait]
impl Scheduler for VecScheduler {
fn schedule(&self, job: Job) -> Result<()> {
self.jobs.lock().unwrap().push(job);
Ok(())
}

async fn stop(&self, _await_termination: bool) -> Result<()> {
Ok(())
}
}

#[tokio::test]
async fn test_schedule_on_finished() {
let job_scheduler = Arc::new(VecScheduler::default());
Expand Down
99 changes: 98 additions & 1 deletion src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use api::v1::value::ValueData;
use api::v1::{ColumnDataType, Row, Rows, SemanticType};
Expand All @@ -29,9 +31,11 @@ use store_api::region_request::{
use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::engine::listener::AlterFlushListener;
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows, build_rows_for_key, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder,
TestEnv,
};

async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) {
Expand Down Expand Up @@ -300,3 +304,96 @@ async fn test_alter_region_retry() {
assert_eq!(1, version_data.version.flushed_entry_id);
assert_eq!(2, version_data.version.flushed_sequence);
}

#[tokio::test]
async fn test_alter_on_flushing() {
common_telemetry::init_default_ut_logging();

let mut env = TestEnv::new();
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
.await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Prepares rows for flush.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;

// Spawns a task to flush the engine.
let engine_cloned = engine.clone();
let flush_job = tokio::spawn(async move {
flush_region(&engine_cloned, region_id, None).await;
});
// Waits for flush begin.
listener.wait_flush_begin().await;

// Consumes the notify permit in the listener.
listener.wait_request_begin().await;

// Submits an alter request to the region. The region should add the request
// to the pending ddl request list.
let request = add_tag1();
let engine_cloned = engine.clone();
let alter_job = tokio::spawn(async move {
engine_cloned
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();
});
// Waits until the worker handles the alter request.
listener.wait_request_begin().await;

// Spawns two task to flush the engine. The flush scheduler should put them to the
// pending task list.
let engine_cloned = engine.clone();
let pending_flush_job = tokio::spawn(async move {
flush_region(&engine_cloned, region_id, None).await;
});
// Waits until the worker handles the flush request.
listener.wait_request_begin().await;

// Wake up flush.
listener.wake_flush();
// Wait for the flush job.
tokio::time::timeout(Duration::from_secs(5), flush_job)
.await
.unwrap()
.unwrap();
// Wait for pending flush job.
tokio::time::timeout(Duration::from_secs(5), pending_flush_job)
.await
.unwrap()
.unwrap();
// Wait for the write job.
tokio::time::timeout(Duration::from_secs(5), alter_job)
.await
.unwrap()
.unwrap();

let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+-------+---------+---------------------+
| tag_1 | tag_0 | field_0 | ts |
+-------+-------+---------+---------------------+
| | a | 0.0 | 1970-01-01T00:00:00 |
| | a | 1.0 | 1970-01-01T00:00:01 |
+-------+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
48 changes: 48 additions & 0 deletions src/mito2/src/engine/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ pub trait EventListener: Send + Sync {
async fn on_merge_ssts_finished(&self, region_id: RegionId) {
let _ = region_id;
}

/// Notifies the listener that the worker receives requests from the request channel.
fn on_recv_requests(&self, request_num: usize) {
let _ = request_num;
}
}

pub type EventListenerRef = Arc<dyn EventListener>;
Expand Down Expand Up @@ -210,3 +215,46 @@ impl EventListener for CompactionListener {
self.blocker.notified().await;
}
}

/// Listener to block on flush and alter.
#[derive(Default)]
pub struct AlterFlushListener {
flush_begin_notify: Notify,
block_flush_notify: Notify,
request_begin_notify: Notify,
}

impl AlterFlushListener {
/// Waits on flush begin.
pub async fn wait_flush_begin(&self) {
self.flush_begin_notify.notified().await;
}

/// Waits on request begin.
pub async fn wait_request_begin(&self) {
self.request_begin_notify.notified().await;
}

/// Continue the flush job.
pub fn wake_flush(&self) {
self.block_flush_notify.notify_one();
}
}

#[async_trait]
impl EventListener for AlterFlushListener {
async fn on_flush_begin(&self, region_id: RegionId) {
info!("Wait on notify to start flush for region {}", region_id);

self.flush_begin_notify.notify_one();
self.block_flush_notify.notified().await;

info!("region {} begin flush", region_id);
}

fn on_recv_requests(&self, request_num: usize) {
info!("receive {} request", request_num);

self.request_begin_notify.notify_one();
}
}
102 changes: 98 additions & 4 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,11 +523,26 @@ impl FlushScheduler {

let pending_requests = if flush_status.pending_task.is_none() {
// The region doesn't have any pending flush task.
// Safety: The flush status exists.
// Safety: The flush status must exist.
let flush_status = self.region_status.remove(&region_id).unwrap();
Some((flush_status.pending_ddls, flush_status.pending_writes))
} else {
None
let version_data = flush_status.version_control.current();
if version_data.version.memtables.is_empty() {
// The region has nothing to flush, we also need to remove it from the status.
// Safety: The pending task is not None.
let task = flush_status.pending_task.take().unwrap();
// The region has nothing to flush. We can notify pending task.
task.on_success();
// `schedule_next_flush()` may pick up the same region to flush, so we must remove
// it from the status to avoid leaking pending requests.
// Safety: The flush status must exist.
let flush_status = self.region_status.remove(&region_id).unwrap();
Some((flush_status.pending_ddls, flush_status.pending_writes))
} else {
// We can flush the region again, keep it in the region status.
None
}
};

// Schedule next flush job.
Expand Down Expand Up @@ -718,8 +733,9 @@ mod tests {

use super::*;
use crate::cache::CacheManager;
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::version_util::VersionControlBuilder;
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
use crate::test_util::version_util::{write_rows_to_version, VersionControlBuilder};

#[test]
fn test_get_mutable_limit() {
Expand Down Expand Up @@ -807,4 +823,82 @@ mod tests {
assert_eq!(output, 0);
assert!(scheduler.region_status.is_empty());
}

#[tokio::test]
async fn test_schedule_pending_request() {
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_flush_scheduler();
let mut builder = VersionControlBuilder::new();
// Overwrites the empty memtable builder.
builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
let version_control = Arc::new(builder.build());
// Writes data to the memtable so it is not empty.
let version_data = version_control.current();
write_rows_to_version(&version_data.version, "host0", 0, 10);
let manifest_ctx = env
.mock_manifest_context(version_data.version.metadata.clone())
.await;
// Creates 3 tasks.
let mut tasks: Vec<_> = (0..3)
.map(|_| RegionFlushTask {
region_id: builder.region_id(),
reason: FlushReason::Others,
senders: Vec::new(),
request_sender: tx.clone(),
access_layer: env.access_layer.clone(),
listener: WorkerListener::default(),
engine_config: Arc::new(MitoConfig::default()),
row_group_size: None,
cache_manager: Arc::new(CacheManager::default()),
manifest_ctx: manifest_ctx.clone(),
index_options: IndexOptions::default(),
})
.collect();
// Schedule first task.
let task = tasks.pop().unwrap();
scheduler
.schedule_flush(builder.region_id(), &version_control, task)
.unwrap();
// Should schedule 1 flush.
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
// Check the new version.
let version_data = version_control.current();
assert_eq!(0, version_data.version.memtables.immutables()[0].id());
// Schedule remaining tasks.
let output_rxs: Vec<_> = tasks
.into_iter()
.map(|mut task| {
let (output_tx, output_rx) = oneshot::channel();
task.push_sender(OptionOutputTx::from(output_tx));
scheduler
.schedule_flush(builder.region_id(), &version_control, task)
.unwrap();
output_rx
})
.collect();
// Assumes the flush job is finished.
version_control.apply_edit(
RegionEdit {
files_to_add: Vec::new(),
files_to_remove: Vec::new(),
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
},
&[0],
builder.file_purger(),
);
scheduler.on_flush_success(builder.region_id());
// No new flush task.
assert_eq!(1, job_scheduler.num_jobs());
// The flush status is cleared.
assert!(scheduler.region_status.is_empty());
for output_rx in output_rxs {
let output = output_rx.await.unwrap().unwrap();
assert_eq!(output, 0);
}
}
}
3 changes: 0 additions & 3 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,17 +737,14 @@ impl CreateRequestBuilder {
}
}

// TODO(yingwen): Support conversion in greptime-proto.
/// Creates value for i64.
#[cfg(test)]
pub(crate) fn i64_value(data: i64) -> v1::Value {
v1::Value {
value_data: Some(ValueData::I64Value(data)),
}
}

/// Creates value for timestamp millis.
#[cfg(test)]
pub(crate) fn ts_ms_value(data: i64) -> v1::Value {
v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(data)),
Expand Down
Loading