From 85eb0b7d7f221ccd1aa3247d9a1699edab90a21a Mon Sep 17 00:00:00 2001 From: WEI Xikai Date: Mon, 19 Jun 2023 19:58:45 +0800 Subject: [PATCH] fix: avoid any updates after table is closed (#998) ## Rationale Part of #990. Some background jobs are still allowed to execute, and it will lead to data corrupted when a table is migrated between different nodes because of multiple writers for the same table. ## Detailed Changes Introduce a flag called `invalid` in the table data to denote whether the serial executor is valid, and this flag is protected with the `TableOpSerialExecutor` in table data, and the `TableOpSerialExecutor` won't be acquired if the flag is set, that is to say, any table operation including updating manifest, altering table and so on, can't be executed after the flag is set because these operations require the `TableOpSerialExecutor`. Finally, the flag will be set when the table is closed. --- analytic_engine/src/compaction/scheduler.rs | 11 +- analytic_engine/src/instance/close.rs | 16 +-- analytic_engine/src/instance/drop.rs | 11 +- analytic_engine/src/instance/engine.rs | 101 +++++------------ analytic_engine/src/instance/mod.rs | 15 ++- analytic_engine/src/instance/wal_replayer.rs | 34 +++--- analytic_engine/src/instance/write.rs | 8 +- analytic_engine/src/table/data.rs | 107 +++++++++++++------ analytic_engine/src/table/mod.rs | 74 ++++++++----- table_engine/src/table.rs | 3 + 10 files changed, 219 insertions(+), 161 deletions(-) diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index 77acc23ac2..866629678d 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -650,7 +650,16 @@ impl ScheduleWorker { self.max_unflushed_duration, ); - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec = if let Some(v) = table_data.acquire_serial_exec_ctx().await { + v + } else { + warn!( + "Table is closed, ignore this periodical flush, table:{}", + table_data.name + ); + continue; + }; + let flush_scheduler = serial_exec.flush_scheduler(); // Instance flush the table asynchronously. if let Err(e) = flusher diff --git a/analytic_engine/src/instance/close.rs b/analytic_engine/src/instance/close.rs index f45199c164..d933b01214 100644 --- a/analytic_engine/src/instance/close.rs +++ b/analytic_engine/src/instance/close.rs @@ -3,12 +3,12 @@ //! Close table logic of instance use log::{info, warn}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table_engine::engine::CloseTableRequest; use crate::{ instance::{ - engine::{DoManifestSnapshot, FlushTable, Result}, + engine::{DoManifestSnapshot, FlushTable, OperateClosedTable, Result}, flush_compaction::{Flusher, TableFlushOptions}, }, manifest::{ManifestRef, SnapshotRequest}, @@ -37,8 +37,11 @@ impl Closer { // Flush table. let opts = TableFlushOptions::default(); - let mut serial_exec = table_data.serial_exec.lock().await; - let flush_scheduler = serial_exec.flush_scheduler(); + let mut serial_exec_ctx = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; + let flush_scheduler = serial_exec_ctx.flush_scheduler(); self.flusher .do_flush(flush_scheduler, &table_data, opts) @@ -67,9 +70,10 @@ impl Closer { let removed_table = self.space.remove_table(&request.table_name); assert!(removed_table.is_some()); + serial_exec_ctx.invalidate(); info!( - "table:{}-{} has been removed from the space_id:{}", - table_data.name, table_data.id, self.space.id + "table:{} has been removed from the space_id:{}, table_id:{}", + table_data.name, self.space.id, table_data.id, ); Ok(()) } diff --git a/analytic_engine/src/instance/drop.rs b/analytic_engine/src/instance/drop.rs index ac6d1653fd..08d673f820 100644 --- a/analytic_engine/src/instance/drop.rs +++ b/analytic_engine/src/instance/drop.rs @@ -3,12 +3,12 @@ //! Drop table logic of instance use log::{info, warn}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table_engine::engine::DropTableRequest; use crate::{ instance::{ - engine::{FlushTable, Result, WriteManifest}, + engine::{FlushTable, OperateClosedTable, Result, WriteManifest}, flush_compaction::{Flusher, TableFlushOptions}, SpaceStoreRef, }, @@ -36,7 +36,10 @@ impl Dropper { } }; - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec_ctx = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; if table_data.is_dropped() { warn!( @@ -51,7 +54,7 @@ impl Dropper { // be avoided. let opts = TableFlushOptions::default(); - let flush_scheduler = serial_exec.flush_scheduler(); + let flush_scheduler = serial_exec_ctx.flush_scheduler(); self.flusher .do_flush(flush_scheduler, &table_data, opts) .await diff --git a/analytic_engine/src/instance/engine.rs b/analytic_engine/src/instance/engine.rs index e32fa064d5..562c97ff2e 100644 --- a/analytic_engine/src/instance/engine.rs +++ b/analytic_engine/src/instance/engine.rs @@ -23,29 +23,21 @@ use crate::{ #[derive(Debug, Snafu)] #[snafu(visibility(pub(crate)))] pub enum Error { - #[snafu(display( - "The space of the table does not exist, space_id:{}, table:{}.\nBacktrace:\n{}", - space_id, - table, - backtrace, - ))] + #[snafu(display("The space of the table does not exist, space_id:{space_id}, table:{table}.\nBacktrace:\n{backtrace}"))] SpaceNotExist { space_id: SpaceId, table: String, backtrace: Backtrace, }, - #[snafu(display("Failed to read meta update, table_id:{}, err:{}", table_id, source))] + #[snafu(display("Failed to read meta update, table_id:{table_id}, err:{source}"))] ReadMetaUpdate { table_id: TableId, source: GenericError, }, #[snafu(display( - "Failed to recover table data, space_id:{}, table:{}, err:{}", - space_id, - table, - source + "Failed to recover table data, space_id:{space_id}, table:{table}, err:{source}" ))] RecoverTableData { space_id: SpaceId, @@ -53,14 +45,11 @@ pub enum Error { source: crate::table::data::Error, }, - #[snafu(display("Failed to read wal, err:{}", source))] + #[snafu(display("Failed to read wal, err:{source}"))] ReadWal { source: wal::manager::Error }, #[snafu(display( - "Failed to apply log entry to memtable, table:{}, table_id:{}, err:{}", - table, - table_id, - source + "Failed to apply log entry to memtable, table:{table}, table_id:{table_id}, err:{source}", ))] ApplyMemTable { space_id: SpaceId, @@ -70,11 +59,7 @@ pub enum Error { }, #[snafu(display( - "Flush failed, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Flush failed, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] FlushTable { space_id: SpaceId, @@ -84,11 +69,7 @@ pub enum Error { }, #[snafu(display( - "Failed to persist meta update to manifest, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Failed to persist meta update to manifest, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] WriteManifest { space_id: SpaceId, @@ -98,11 +79,7 @@ pub enum Error { }, #[snafu(display( - "Failed to persist meta update to WAL, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Failed to persist meta update to WAL, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] WriteWal { space_id: SpaceId, @@ -112,11 +89,7 @@ pub enum Error { }, #[snafu(display( - "Invalid options, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Invalid options, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] InvalidOptions { space_id: SpaceId, @@ -126,11 +99,7 @@ pub enum Error { }, #[snafu(display( - "Failed to create table data, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Failed to create table data, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] CreateTableData { space_id: SpaceId, @@ -140,11 +109,8 @@ pub enum Error { }, #[snafu(display( - "Try to update schema to elder version, table:{}, current_version:{}, given_version:{}.\nBacktrace:\n{}", - table, - current_version, - given_version, - backtrace, + "Try to update schema to elder version, table:{table}, current_version:{current_version}, \ + given_version:{given_version}.\nBacktrace:\n{backtrace}", ))] InvalidSchemaVersion { table: String, @@ -154,11 +120,8 @@ pub enum Error { }, #[snafu(display( - "Invalid previous schema version, table:{}, current_version:{}, pre_version:{}.\nBacktrace:\n{}", - table, - current_version, - pre_version, - backtrace, + "Invalid previous schema version, table:{table}, current_version:{current_version}, \ + pre_version:{pre_version}.\nBacktrace:\n{backtrace}", ))] InvalidPreVersion { table: String, @@ -167,21 +130,14 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display( - "Alter schema of a dropped table:{}.\nBacktrace:\n{}", - table, - backtrace - ))] + #[snafu(display("Alter schema of a dropped table:{table}.\nBacktrace:\n{backtrace}"))] AlterDroppedTable { table: String, backtrace: Backtrace }, - #[snafu(display("Failed to store version edit, err:{}", source))] + #[snafu(display("Failed to store version edit, err:{source}"))] StoreVersionEdit { source: GenericError }, #[snafu(display( - "Failed to encode payloads, table:{}, wal_location:{:?}, err:{}", - table, - wal_location, - source + "Failed to encode payloads, table:{table}, wal_location:{wal_location:?}, err:{source}" ))] EncodePayloads { table: String, @@ -190,10 +146,7 @@ pub enum Error { }, #[snafu(display( - "Failed to do manifest snapshot for table, space_id:{}, table:{}, err:{}", - space_id, - table, - source + "Failed to do manifest snapshot for table, space_id:{space_id}, table:{table}, err:{source}", ))] DoManifestSnapshot { space_id: SpaceId, @@ -202,30 +155,31 @@ pub enum Error { }, #[snafu(display( - "Table open failed and can not be created again, table:{}.\nBacktrace:\n{}", - table, - backtrace, + "Table open failed and can not be created again, table:{table}.\nBacktrace:\n{backtrace}", ))] CreateOpenFailedTable { table: String, backtrace: Backtrace }, - #[snafu(display("Failed to open manifest, err:{}", source))] + #[snafu(display("Failed to open manifest, err:{source}"))] OpenManifest { source: crate::manifest::details::Error, }, - #[snafu(display("Failed to find table, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Failed to find table, msg:{msg}.\nBacktrace:\n{backtrace}"))] TableNotExist { msg: String, backtrace: Backtrace }, - #[snafu(display("Failed to open shard, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Failed to open shard, msg:{msg}.\nBacktrace:\n{backtrace}"))] OpenTablesOfShard { msg: String, backtrace: Backtrace }, - #[snafu(display("Failed to replay wal, msg:{:?}, err:{}", msg, source))] + #[snafu(display("Try to operate a closed table.\nBacktrace:\n{backtrace}"))] + OperateClosedTable { backtrace: Backtrace }, + + #[snafu(display("Failed to replay wal, msg:{msg:?}, err:{source}"))] ReplayWalWithCause { msg: Option, source: GenericError, }, - #[snafu(display("Failed to replay wal, msg:{:?}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Failed to replay wal, msg:{msg:?}.\nBacktrace:\n{backtrace}"))] ReplayWalNoCause { msg: Option, backtrace: Backtrace, @@ -264,6 +218,7 @@ impl From for table_engine::engine::Error { | Error::TableNotExist { .. } | Error::OpenTablesOfShard { .. } | Error::ReplayWalNoCause { .. } + | Error::OperateClosedTable { .. } | Error::ReplayWalWithCause { .. } => Self::Unexpected { source: Box::new(err), }, diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 1faf254f08..492178b41c 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -28,14 +28,14 @@ use common_util::{ }; use log::{error, info}; use mem_collector::MemUsageCollector; -use snafu::{ResultExt, Snafu}; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{engine::EngineRuntimes, table::FlushRequest}; use tokio::sync::oneshot::{self, error::RecvError}; use wal::manager::{WalLocation, WalManagerRef}; -use self::flush_compaction::{Flusher, TableFlushOptions}; use crate::{ compaction::{scheduler::CompactionSchedulerRef, TableCompactionRequest}, + instance::flush_compaction::{Flusher, TableFlushOptions}, manifest::ManifestRef, row_iter::IterOptions, space::{SpaceId, SpaceRef, SpacesRef}, @@ -66,6 +66,9 @@ pub enum Error { source: GenericError, }, + #[snafu(display("Try to operate a closed table, table:{table}.\nBacktrace:\n{backtrace}"))] + OperateClosedTable { table: String, backtrace: Backtrace }, + #[snafu(display("Failed to receive {} result, table:{}, err:{}", op, table, source))] RecvManualOpResult { op: String, @@ -195,7 +198,13 @@ impl Instance { }; let flusher = self.make_flusher(); - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec = + table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable { + table: &table_data.name, + })?; let flush_scheduler = serial_exec.flush_scheduler(); flusher .schedule_flush(flush_scheduler, table_data, flush_opts) diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index 5d494450e4..0144833ec6 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -12,7 +12,7 @@ use async_trait::async_trait; use common_types::{schema::IndexInWriterSchema, table::ShardId}; use common_util::error::BoxError; use log::{debug, error, info, trace}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table_engine::table::TableId; use tokio::sync::MutexGuard; use wal::{ @@ -25,13 +25,13 @@ use wal::{ use crate::{ instance::{ self, - engine::{Error, ReplayWalWithCause, Result}, + engine::{Error, OperateClosedTable, ReplayWalWithCause, Result}, flush_compaction::{Flusher, TableFlushOptions}, serial_executor::TableOpSerialExecutor, write::MemTableWriter, }, payload::{ReadPayload, WalDecoder}, - table::data::TableDataRef, + table::data::{SerialExecContext, TableDataRef}, }; /// Wal replayer supporting both table based and region based @@ -182,7 +182,10 @@ impl TableBasedReplay { .box_err() .context(ReplayWalWithCause { msg: None })?; - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); loop { // fetch entries to log_entry_buf @@ -264,14 +267,17 @@ impl RegionBasedReplay { let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); // Lock all related tables. - let mut serial_exec_ctxs = HashMap::with_capacity(table_datas.len()); + let mut replay_table_ctxs = HashMap::with_capacity(table_datas.len()); for table_data in table_datas { - let serial_exec = table_data.serial_exec.lock().await; - let serial_exec_ctx = SerialExecContext { + let serial_exec_ctx = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; + let replay_table_ctx = TableReplayContext { table_data: table_data.clone(), - serial_exec, + serial_exec_ctx, }; - serial_exec_ctxs.insert(table_data.id, serial_exec_ctx); + replay_table_ctxs.insert(table_data.id, replay_table_ctx); } // Split and replay logs. @@ -287,7 +293,7 @@ impl RegionBasedReplay { break; } - Self::replay_single_batch(context, &log_entry_buf, &mut serial_exec_ctxs, faileds) + Self::replay_single_batch(context, &log_entry_buf, &mut replay_table_ctxs, faileds) .await?; } @@ -297,7 +303,7 @@ impl RegionBasedReplay { async fn replay_single_batch( context: &ReplayContext, log_batch: &VecDeque>, - serial_exec_ctxs: &mut HashMap>, + serial_exec_ctxs: &mut HashMap>, faileds: &mut FailedTables, ) -> Result<()> { let mut table_batches = Vec::new(); @@ -317,7 +323,7 @@ impl RegionBasedReplay { let result = replay_table_log_entries( &context.flusher, context.max_retry_flush_limit, - &mut ctx.serial_exec, + &mut ctx.serial_exec_ctx, &ctx.table_data, log_batch.range(table_batch.range), ) @@ -391,9 +397,9 @@ struct TableBatch { range: Range, } -struct SerialExecContext<'a> { +struct TableReplayContext<'a> { table_data: TableDataRef, - serial_exec: MutexGuard<'a, TableOpSerialExecutor>, + serial_exec_ctx: MutexGuard<'a, SerialExecContext>, } /// Replay all log entries into memtable and flush if necessary diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index 219e86102c..6017686b9a 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -608,9 +608,9 @@ impl<'a> Writer<'a> { "Try to trigger flush of other table:{} from the write procedure of table:{}", table_data.name, self.table_data.name ); - match table_data.serial_exec.try_lock() { - Ok(mut serial_exec) => { - let flush_scheduler = serial_exec.flush_scheduler(); + match table_data.try_acquire_serial_exec_ctx() { + Some(mut serial_exec_ctx) => { + let flush_scheduler = serial_exec_ctx.flush_scheduler(); // Set `block_on_write_thread` to false and let flush do in background. flusher .schedule_flush(flush_scheduler, table_data, opts) @@ -619,7 +619,7 @@ impl<'a> Writer<'a> { table: &table_data.name, }) } - Err(_) => { + None => { warn!( "Failed to acquire write lock for flush table:{}", table_data.name, diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index 49d9d6cae8..01ba153fd8 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -7,6 +7,7 @@ use std::{ convert::TryInto, fmt, fmt::Formatter, + ops::{Deref, DerefMut}, sync::{ atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, Arc, Mutex, @@ -28,6 +29,7 @@ use log::{debug, info}; use object_store::Path; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table::TableId; +use tokio::sync::MutexGuard; use crate::{ instance::serial_executor::TableOpSerialExecutor, @@ -86,6 +88,36 @@ impl TableShardInfo { } } +/// The context for execution of serial operation on the table. +pub struct SerialExecContext { + /// Denotes whether `serial_exec` is valid. + /// + /// The `serial_exec` will be invalidated if the table is closed. + invalid: bool, + serial_exec: TableOpSerialExecutor, +} + +impl SerialExecContext { + #[inline] + pub fn invalidate(&mut self) { + self.invalid = true; + } +} + +impl Deref for SerialExecContext { + type Target = TableOpSerialExecutor; + + fn deref(&self) -> &Self::Target { + &self.serial_exec + } +} + +impl DerefMut for SerialExecContext { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.serial_exec + } +} + /// Data of a table pub struct TableData { /// Id of this table @@ -143,14 +175,13 @@ pub struct TableData { /// No write/alter is allowed if the table is dropped. dropped: AtomicBool, + serial_exec_ctx: tokio::sync::Mutex, + /// Metrics of this table pub metrics: Metrics, /// Shard info of the table pub shard_info: TableShardInfo, - - /// The table operation serial_exec - pub serial_exec: tokio::sync::Mutex, } impl fmt::Debug for TableData { @@ -217,6 +248,10 @@ impl TableData { preflush_write_buffer_size_ratio, )); + let serial_exec_ctx = tokio::sync::Mutex::new(SerialExecContext { + invalid: false, + serial_exec: TableOpSerialExecutor::new(table_id), + }); Ok(Self { id: table_id, name: table_name, @@ -235,7 +270,7 @@ impl TableData { dropped: AtomicBool::new(false), metrics, shard_info: TableShardInfo::new(shard_id), - serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(table_id)), + serial_exec_ctx, }) } @@ -249,35 +284,17 @@ impl TableData { preflush_write_buffer_size_ratio: f32, mem_usage_collector: CollectorRef, ) -> Result { - let memtable_factory = Arc::new(SkiplistMemTableFactory); - let purge_queue = purger.create_purge_queue(add_meta.space_id, add_meta.table_id); - let current_version = TableVersion::new(purge_queue); - let metrics = Metrics::default(); - let mutable_limit = AtomicU32::new(compute_mutable_limit( - add_meta.opts.write_buffer_size, + Self::new( + add_meta.space_id, + add_meta.table_id, + add_meta.table_name, + add_meta.schema, + shard_id, + add_meta.opts, + purger, preflush_write_buffer_size_ratio, - )); - - Ok(Self { - id: add_meta.table_id, - name: add_meta.table_name, - schema: Mutex::new(add_meta.schema), - space_id: add_meta.space_id, - mutable_limit, - mutable_limit_write_buffer_ratio: preflush_write_buffer_size_ratio, - opts: ArcSwap::new(Arc::new(add_meta.opts)), - memtable_factory, mem_usage_collector, - current_version, - last_sequence: AtomicU64::new(0), - last_memtable_id: AtomicU64::new(0), - last_file_id: AtomicU64::new(0), - last_flush_time_ms: AtomicU64::new(0), - dropped: AtomicBool::new(false), - metrics, - shard_info: TableShardInfo::new(shard_id), - serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(add_meta.table_id)), - }) + ) } /// Get current schema of the table. @@ -352,6 +369,34 @@ impl TableData { self.dropped.store(true, Ordering::SeqCst); } + /// Acquire the [`SerialExecContext`] if the table is not closed. + pub async fn acquire_serial_exec_ctx(&self) -> Option> { + let v = self.serial_exec_ctx.lock().await; + if v.invalid { + None + } else { + Some(v) + } + } + + /// Try to acquire the [SerialExecContext]. + /// + /// [None] will be returned if the serial_exec_ctx has been acquired + /// already, or the table is closed. + pub fn try_acquire_serial_exec_ctx(&self) -> Option> { + let v = self.serial_exec_ctx.try_lock(); + match v { + Ok(ctx) => { + if ctx.invalid { + None + } else { + Some(ctx) + } + } + Err(_) => None, + } + } + /// Returns total memtable memory usage in bytes. #[inline] pub fn memtable_memory_usage(&self) -> usize { diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index 82b2b54b9c..780b9cafcc 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -21,9 +21,9 @@ use table_engine::{ stream::{PartitionedStreams, SendableRecordBatchStream}, table::{ AlterOptions, AlterSchema, AlterSchemaRequest, Compact, Flush, FlushRequest, Get, - GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, ReadOptions, ReadOrder, - ReadRequest, Result, Scan, Table, TableId, TableStats, TooManyPendingWrites, - WaitForPendingWrites, Write, WriteRequest, + GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, OperateClosedTable, + ReadOptions, ReadOrder, ReadRequest, Result, Scan, Table, TableId, TableStats, + TooManyPendingWrites, WaitForPendingWrites, Write, WriteRequest, }, ANALYTIC_ENGINE_TYPE, }; @@ -255,7 +255,7 @@ impl TableImpl { // This is the first request in the queue, and we should // take responsibilities for merging and writing the // requests in the queue. - let serial_exec = self.table_data.serial_exec.lock().await; + let serial_exec_ctx = self.table_data.acquire_serial_exec_ctx().await; // The `serial_exec` is acquired, let's merge the pending requests and write // them all. let pending_writes = { @@ -266,9 +266,22 @@ impl TableImpl { !pending_writes.is_empty(), "The pending writes should contain at least the one just pushed." ); - let merged_write_request = - merge_pending_write_requests(pending_writes.writes, pending_writes.num_rows); - (merged_write_request, serial_exec, pending_writes.notifiers) + match serial_exec_ctx { + Some(v) => { + let merged_write_request = merge_pending_write_requests( + pending_writes.writes, + pending_writes.num_rows, + ); + (merged_write_request, v, pending_writes.notifiers) + } + None => { + // The table has been closed, notify all the waiters in + // the queue. + let write_err = OperateClosedTable.fail(); + self.notify_waiters(pending_writes.notifiers, &write_err); + return write_err; + } + } } QueueResult::Waiter(rx) => { // The request is successfully pushed into the queue, and just wait for the @@ -303,12 +316,18 @@ impl TableImpl { .box_err() .context(Write { table: self.name() }); - // There is no waiter for pending writes, return the write result. - if notifiers.is_empty() { - return write_res; - } - // Notify the waiters for the pending writes. + self.notify_waiters(notifiers, &write_res); + + write_res.map(|_| num_rows) + } + + #[inline] + fn should_queue_write_request(&self, request: &WriteRequest) -> bool { + request.row_group.num_rows() < self.instance.max_rows_in_write_queue + } + + fn notify_waiters(&self, notifiers: Vec>>, write_res: &Result) { match write_res { Ok(_) => { for notifier in notifiers { @@ -319,7 +338,6 @@ impl TableImpl { ); } } - Ok(num_rows) } Err(e) => { let err_msg = format!("Failed to do merge write, err:{e}"); @@ -332,15 +350,9 @@ impl TableImpl { ); } } - Err(e) } } } - - #[inline] - fn should_queue_write_request(&self, request: &WriteRequest) -> bool { - request.row_group.num_rows() < self.instance.max_rows_in_write_queue - } } #[async_trait] @@ -384,11 +396,15 @@ impl Table for TableImpl { return self.write_with_pending_queue(request).await; } - let mut serial_exec = self.table_data.serial_exec.lock().await; + let mut serial_exec_ctx = self + .table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let mut writer = Writer::new( self.instance.clone(), self.space_table.clone(), - &mut serial_exec, + &mut serial_exec_ctx, ); writer .write(request) @@ -501,10 +517,14 @@ impl Table for TableImpl { } async fn alter_schema(&self, request: AlterSchemaRequest) -> Result { - let mut serial_exec = self.table_data.serial_exec.lock().await; + let mut serial_exec_ctx = self + .table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let mut alterer = Alterer::new( self.table_data.clone(), - &mut serial_exec, + &mut serial_exec_ctx, self.instance.clone(), ) .await; @@ -518,10 +538,14 @@ impl Table for TableImpl { } async fn alter_options(&self, options: HashMap) -> Result { - let mut serial_exec = self.table_data.serial_exec.lock().await; + let mut serial_exec_ctx = self + .table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let alterer = Alterer::new( self.table_data.clone(), - &mut serial_exec, + &mut serial_exec_ctx, self.instance.clone(), ) .await; diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 821bdb6195..aea9f64cf5 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -135,6 +135,9 @@ pub enum Error { source: GenericError, }, + #[snafu(display("Try to operate a closed table.\nBacktrace:\n{backtrace}"))] + OperateClosedTable { backtrace: Backtrace }, + #[snafu(display( "Failed to wait for pending writes, table:{table}.\nBacktrace:\n{backtrace}" ))]