Skip to content

Commit

Permalink
Revert "fix: avoid any updates after table is closed (#998)" (#1034)
Browse files Browse the repository at this point in the history
This reverts commit 85eb0b7.

## Rationale
The changes introduced by #998 are not reasonable. Another fix will
address #990.

## Detailed Changes
Revert #998
  • Loading branch information
ShiKaiWi authored Jun 26, 2023
1 parent cb7c907 commit f8471d2
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 219 deletions.
11 changes: 1 addition & 10 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,16 +650,7 @@ impl ScheduleWorker {
self.max_unflushed_duration,
);

let mut serial_exec = if let Some(v) = table_data.acquire_serial_exec_ctx().await {
v
} else {
warn!(
"Table is closed, ignore this periodical flush, table:{}",
table_data.name
);
continue;
};

let mut serial_exec = table_data.serial_exec.lock().await;
let flush_scheduler = serial_exec.flush_scheduler();
// Instance flush the table asynchronously.
if let Err(e) = flusher
Expand Down
16 changes: 6 additions & 10 deletions analytic_engine/src/instance/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
//! Close table logic of instance

use log::{info, warn};
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use table_engine::engine::CloseTableRequest;

use crate::{
instance::{
engine::{DoManifestSnapshot, FlushTable, OperateClosedTable, Result},
engine::{DoManifestSnapshot, FlushTable, Result},
flush_compaction::{Flusher, TableFlushOptions},
},
manifest::{ManifestRef, SnapshotRequest},
Expand Down Expand Up @@ -37,11 +37,8 @@ impl Closer {

// Flush table.
let opts = TableFlushOptions::default();
let mut serial_exec_ctx = table_data
.acquire_serial_exec_ctx()
.await
.context(OperateClosedTable)?;
let flush_scheduler = serial_exec_ctx.flush_scheduler();
let mut serial_exec = table_data.serial_exec.lock().await;
let flush_scheduler = serial_exec.flush_scheduler();

self.flusher
.do_flush(flush_scheduler, &table_data, opts)
Expand Down Expand Up @@ -70,10 +67,9 @@ impl Closer {
let removed_table = self.space.remove_table(&request.table_name);
assert!(removed_table.is_some());

serial_exec_ctx.invalidate();
info!(
"table:{} has been removed from the space_id:{}, table_id:{}",
table_data.name, self.space.id, table_data.id,
"table:{}-{} has been removed from the space_id:{}",
table_data.name, table_data.id, self.space.id
);
Ok(())
}
Expand Down
11 changes: 4 additions & 7 deletions analytic_engine/src/instance/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
//! Drop table logic of instance

use log::{info, warn};
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use table_engine::engine::DropTableRequest;

use crate::{
instance::{
engine::{FlushTable, OperateClosedTable, Result, WriteManifest},
engine::{FlushTable, Result, WriteManifest},
flush_compaction::{Flusher, TableFlushOptions},
SpaceStoreRef,
},
Expand Down Expand Up @@ -36,10 +36,7 @@ impl Dropper {
}
};

let mut serial_exec_ctx = table_data
.acquire_serial_exec_ctx()
.await
.context(OperateClosedTable)?;
let mut serial_exec = table_data.serial_exec.lock().await;

if table_data.is_dropped() {
warn!(
Expand All @@ -54,7 +51,7 @@ impl Dropper {
// be avoided.

let opts = TableFlushOptions::default();
let flush_scheduler = serial_exec_ctx.flush_scheduler();
let flush_scheduler = serial_exec.flush_scheduler();
self.flusher
.do_flush(flush_scheduler, &table_data, opts)
.await
Expand Down
101 changes: 73 additions & 28 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,44 @@ use crate::{
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum Error {
#[snafu(display("The space of the table does not exist, space_id:{space_id}, table:{table}.\nBacktrace:\n{backtrace}"))]
#[snafu(display(
"The space of the table does not exist, space_id:{}, table:{}.\nBacktrace:\n{}",
space_id,
table,
backtrace,
))]
SpaceNotExist {
space_id: SpaceId,
table: String,
backtrace: Backtrace,
},

#[snafu(display("Failed to read meta update, table_id:{table_id}, err:{source}"))]
#[snafu(display("Failed to read meta update, table_id:{}, err:{}", table_id, source))]
ReadMetaUpdate {
table_id: TableId,
source: GenericError,
},

#[snafu(display(
"Failed to recover table data, space_id:{space_id}, table:{table}, err:{source}"
"Failed to recover table data, space_id:{}, table:{}, err:{}",
space_id,
table,
source
))]
RecoverTableData {
space_id: SpaceId,
table: String,
source: crate::table::data::Error,
},

#[snafu(display("Failed to read wal, err:{source}"))]
#[snafu(display("Failed to read wal, err:{}", source))]
ReadWal { source: wal::manager::Error },

#[snafu(display(
"Failed to apply log entry to memtable, table:{table}, table_id:{table_id}, err:{source}",
"Failed to apply log entry to memtable, table:{}, table_id:{}, err:{}",
table,
table_id,
source
))]
ApplyMemTable {
space_id: SpaceId,
Expand All @@ -59,7 +70,11 @@ pub enum Error {
},

#[snafu(display(
"Flush failed, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
"Flush failed, space_id:{}, table:{}, table_id:{}, err:{}",
space_id,
table,
table_id,
source
))]
FlushTable {
space_id: SpaceId,
Expand All @@ -69,7 +84,11 @@ pub enum Error {
},

#[snafu(display(
"Failed to persist meta update to manifest, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
"Failed to persist meta update to manifest, space_id:{}, table:{}, table_id:{}, err:{}",
space_id,
table,
table_id,
source
))]
WriteManifest {
space_id: SpaceId,
Expand All @@ -79,7 +98,11 @@ pub enum Error {
},

#[snafu(display(
"Failed to persist meta update to WAL, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
"Failed to persist meta update to WAL, space_id:{}, table:{}, table_id:{}, err:{}",
space_id,
table,
table_id,
source
))]
WriteWal {
space_id: SpaceId,
Expand All @@ -89,7 +112,11 @@ pub enum Error {
},

#[snafu(display(
"Invalid options, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
"Invalid options, space_id:{}, table:{}, table_id:{}, err:{}",
space_id,
table,
table_id,
source
))]
InvalidOptions {
space_id: SpaceId,
Expand All @@ -99,7 +126,11 @@ pub enum Error {
},

#[snafu(display(
"Failed to create table data, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
"Failed to create table data, space_id:{}, table:{}, table_id:{}, err:{}",
space_id,
table,
table_id,
source
))]
CreateTableData {
space_id: SpaceId,
Expand All @@ -109,8 +140,11 @@ pub enum Error {
},

#[snafu(display(
"Try to update schema to elder version, table:{table}, current_version:{current_version}, \
given_version:{given_version}.\nBacktrace:\n{backtrace}",
"Try to update schema to elder version, table:{}, current_version:{}, given_version:{}.\nBacktrace:\n{}",
table,
current_version,
given_version,
backtrace,
))]
InvalidSchemaVersion {
table: String,
Expand All @@ -120,8 +154,11 @@ pub enum Error {
},

#[snafu(display(
"Invalid previous schema version, table:{table}, current_version:{current_version}, \
pre_version:{pre_version}.\nBacktrace:\n{backtrace}",
"Invalid previous schema version, table:{}, current_version:{}, pre_version:{}.\nBacktrace:\n{}",
table,
current_version,
pre_version,
backtrace,
))]
InvalidPreVersion {
table: String,
Expand All @@ -130,14 +167,21 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display("Alter schema of a dropped table:{table}.\nBacktrace:\n{backtrace}"))]
#[snafu(display(
"Alter schema of a dropped table:{}.\nBacktrace:\n{}",
table,
backtrace
))]
AlterDroppedTable { table: String, backtrace: Backtrace },

#[snafu(display("Failed to store version edit, err:{source}"))]
#[snafu(display("Failed to store version edit, err:{}", source))]
StoreVersionEdit { source: GenericError },

#[snafu(display(
"Failed to encode payloads, table:{table}, wal_location:{wal_location:?}, err:{source}"
"Failed to encode payloads, table:{}, wal_location:{:?}, err:{}",
table,
wal_location,
source
))]
EncodePayloads {
table: String,
Expand All @@ -146,7 +190,10 @@ pub enum Error {
},

#[snafu(display(
"Failed to do manifest snapshot for table, space_id:{space_id}, table:{table}, err:{source}",
"Failed to do manifest snapshot for table, space_id:{}, table:{}, err:{}",
space_id,
table,
source
))]
DoManifestSnapshot {
space_id: SpaceId,
Expand All @@ -155,31 +202,30 @@ pub enum Error {
},

#[snafu(display(
"Table open failed and can not be created again, table:{table}.\nBacktrace:\n{backtrace}",
"Table open failed and can not be created again, table:{}.\nBacktrace:\n{}",
table,
backtrace,
))]
CreateOpenFailedTable { table: String, backtrace: Backtrace },

#[snafu(display("Failed to open manifest, err:{source}"))]
#[snafu(display("Failed to open manifest, err:{}", source))]
OpenManifest {
source: crate::manifest::details::Error,
},

#[snafu(display("Failed to find table, msg:{msg}.\nBacktrace:\n{backtrace}"))]
#[snafu(display("Failed to find table, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
TableNotExist { msg: String, backtrace: Backtrace },

#[snafu(display("Failed to open shard, msg:{msg}.\nBacktrace:\n{backtrace}"))]
#[snafu(display("Failed to open shard, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
OpenTablesOfShard { msg: String, backtrace: Backtrace },

#[snafu(display("Try to operate a closed table.\nBacktrace:\n{backtrace}"))]
OperateClosedTable { backtrace: Backtrace },

#[snafu(display("Failed to replay wal, msg:{msg:?}, err:{source}"))]
#[snafu(display("Failed to replay wal, msg:{:?}, err:{}", msg, source))]
ReplayWalWithCause {
msg: Option<String>,
source: GenericError,
},

#[snafu(display("Failed to replay wal, msg:{msg:?}.\nBacktrace:\n{backtrace}"))]
#[snafu(display("Failed to replay wal, msg:{:?}.\nBacktrace:\n{}", msg, backtrace))]
ReplayWalNoCause {
msg: Option<String>,
backtrace: Backtrace,
Expand Down Expand Up @@ -218,7 +264,6 @@ impl From<Error> for table_engine::engine::Error {
| Error::TableNotExist { .. }
| Error::OpenTablesOfShard { .. }
| Error::ReplayWalNoCause { .. }
| Error::OperateClosedTable { .. }
| Error::ReplayWalWithCause { .. } => Self::Unexpected {
source: Box::new(err),
},
Expand Down
15 changes: 3 additions & 12 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ use common_util::{
};
use log::{error, info};
use mem_collector::MemUsageCollector;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use snafu::{ResultExt, Snafu};
use table_engine::{engine::EngineRuntimes, table::FlushRequest};
use tokio::sync::oneshot::{self, error::RecvError};
use wal::manager::{WalLocation, WalManagerRef};

use self::flush_compaction::{Flusher, TableFlushOptions};
use crate::{
compaction::{scheduler::CompactionSchedulerRef, TableCompactionRequest},
instance::flush_compaction::{Flusher, TableFlushOptions},
manifest::ManifestRef,
row_iter::IterOptions,
space::{SpaceId, SpaceRef, SpacesRef},
Expand Down Expand Up @@ -66,9 +66,6 @@ pub enum Error {
source: GenericError,
},

#[snafu(display("Try to operate a closed table, table:{table}.\nBacktrace:\n{backtrace}"))]
OperateClosedTable { table: String, backtrace: Backtrace },

#[snafu(display("Failed to receive {} result, table:{}, err:{}", op, table, source))]
RecvManualOpResult {
op: String,
Expand Down Expand Up @@ -198,13 +195,7 @@ impl Instance {
};

let flusher = self.make_flusher();
let mut serial_exec =
table_data
.acquire_serial_exec_ctx()
.await
.context(OperateClosedTable {
table: &table_data.name,
})?;
let mut serial_exec = table_data.serial_exec.lock().await;
let flush_scheduler = serial_exec.flush_scheduler();
flusher
.schedule_flush(flush_scheduler, table_data, flush_opts)
Expand Down
Loading

0 comments on commit f8471d2

Please sign in to comment.