Skip to content

Commit

Permalink
feat: support multi-statement transactions (#14562)
Browse files Browse the repository at this point in the history
* init

* state machine

* init txn catalog

* replace default catalog

* fix is_ddl

* revert

* modify catalog interface

* make lint

* fix ut

* fix state

* fix mysql check

* refactor

* get table meta from local buffer

* commit

* fix todo

* fix todo

* rr stream

* make lint

* fix header

* fix mysql

* rename function

* fix todo

* fix error code

* fix mysql error capture

* remove debug info

* adjust syntax

* try fix stream

* fix

* make lint

* make lint

* fix refresh table

* respect pr #14647

* support dedup label

* fix merge

* refactor txn buffer

* add sql logic tests (provided by @SkyFan2002)

* fix: mv sqlogic test txn_stream into ee directory

* get_table_copied_file_info from txn buffer

* tweak file structure

* delay purge files

* clear table cache in ddl

* use parking_lot::Mutex

---------

Co-authored-by: dantengsky <dantengsky@gmail.com>
  • Loading branch information
SkyFan2002 and dantengsky authored Feb 26, 2024
1 parent 451d463 commit b7a7432
Show file tree
Hide file tree
Showing 176 changed files with 2,065 additions and 70 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ build_exceptions! {
InvalidOperation(3905),
StorageOther(4000),
UnresolvableConflict(4001),

//transaction error codes
CurrentTransactionIsAborted(4002),
}

// Service errors [5001,6000].
Expand Down
4 changes: 4 additions & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ use databend_common_meta_app::schema::UndropTableReply;
use databend_common_meta_app::schema::UndropTableReq;
use databend_common_meta_app::schema::UpdateIndexReply;
use databend_common_meta_app::schema::UpdateIndexReq;
use databend_common_meta_app::schema::UpdateMultiTableMetaReq;
use databend_common_meta_app::schema::UpdateTableMetaReply;
use databend_common_meta_app::schema::UpdateTableMetaReq;
use databend_common_meta_app::schema::UpdateVirtualColumnReply;
Expand Down Expand Up @@ -230,6 +231,9 @@ pub trait SchemaApi: Send + Sync {
req: UpdateTableMetaReq,
) -> Result<UpdateTableMetaReply, KVAppError>;

async fn update_multi_table_meta(&self, req: UpdateMultiTableMetaReq)
-> Result<(), KVAppError>;

async fn set_table_column_mask_policy(
&self,
req: SetTableColumnMaskPolicyReq,
Expand Down
111 changes: 111 additions & 0 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use databend_common_meta_app::app_error::DropTableWithDropTime;
use databend_common_meta_app::app_error::DuplicatedUpsertFiles;
use databend_common_meta_app::app_error::GetIndexWithDropTime;
use databend_common_meta_app::app_error::IndexAlreadyExists;
use databend_common_meta_app::app_error::MultiStmtTxnCommitFailed;
use databend_common_meta_app::app_error::ShareHasNoGrantedPrivilege;
use databend_common_meta_app::app_error::StreamAlreadyExists;
use databend_common_meta_app::app_error::StreamVersionMismatched;
Expand Down Expand Up @@ -159,6 +160,7 @@ use databend_common_meta_app::schema::UndropTableReply;
use databend_common_meta_app::schema::UndropTableReq;
use databend_common_meta_app::schema::UpdateIndexReply;
use databend_common_meta_app::schema::UpdateIndexReq;
use databend_common_meta_app::schema::UpdateMultiTableMetaReq;
use databend_common_meta_app::schema::UpdateTableMetaReply;
use databend_common_meta_app::schema::UpdateTableMetaReq;
use databend_common_meta_app::schema::UpdateVirtualColumnReply;
Expand Down Expand Up @@ -2919,6 +2921,115 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

async fn update_multi_table_meta(
&self,
req: UpdateMultiTableMetaReq,
) -> Result<(), KVAppError> {
let UpdateMultiTableMetaReq {
update_table_metas,
copied_files,
update_stream_metas,
deduplicated_labels,
} = req;
let mut tbl_seqs = HashMap::new();
let mut txn_req = TxnRequest {
condition: vec![],
if_then: vec![],
else_then: vec![],
};
for req in update_table_metas {
let tbid = TableId {
table_id: req.table_id,
};
let req_seq = req.seq;

let (tb_meta_seq, table_meta): (_, Option<TableMeta>) =
get_pb_value(self, &tbid).await?;

if tb_meta_seq == 0 || table_meta.is_none() {
return Err(KVAppError::AppError(AppError::UnknownTableId(
UnknownTableId::new(req.table_id, "update_table_meta"),
)));
}
if req_seq.match_seq(tb_meta_seq).is_err() {
return Err(KVAppError::AppError(AppError::from(
TableVersionMismatched::new(
req.table_id,
req.seq,
tb_meta_seq,
"update_table_meta",
),
)));
}
tbl_seqs.insert(req.table_id, tb_meta_seq);
txn_req.condition.push(txn_cond_seq(&tbid, Eq, tb_meta_seq));
txn_req
.if_then
.push(txn_op_put(&tbid, serialize_struct(&req.new_table_meta)?));
}
for (tbid, req) in copied_files {
let tbid = TableId { table_id: tbid };
let (conditions, match_operations) = build_upsert_table_copied_file_info_conditions(
&tbid,
&req,
tbl_seqs[&tbid.table_id],
req.fail_if_duplicated,
)?;
txn_req.condition.extend(conditions);
txn_req.if_then.extend(match_operations)
}

for req in &update_stream_metas {
let stream_id = TableId {
table_id: req.stream_id,
};
let (stream_meta_seq, stream_meta): (_, Option<TableMeta>) =
get_pb_value(self, &stream_id).await?;

if stream_meta_seq == 0 || stream_meta.is_none() {
return Err(KVAppError::AppError(AppError::UnknownStreamId(
UnknownStreamId::new(req.stream_id, "update_table_meta"),
)));
}

if req.seq.match_seq(stream_meta_seq).is_err() {
return Err(KVAppError::AppError(AppError::from(
StreamVersionMismatched::new(
req.stream_id,
req.seq,
stream_meta_seq,
"update_table_meta",
),
)));
}

let mut new_stream_meta = stream_meta.unwrap();
new_stream_meta.options = req.options.clone();
new_stream_meta.updated_on = Utc::now();

txn_req
.condition
.push(txn_cond_seq(&stream_id, Eq, stream_meta_seq));
txn_req
.if_then
.push(txn_op_put(&stream_id, serialize_struct(&new_stream_meta)?));
}

for deduplicated_label in deduplicated_labels {
txn_req
.if_then
.push(build_upsert_table_deduplicated_label(deduplicated_label));
}
let (succ, _) = send_txn(self, txn_req).await?;

if succ {
return Ok(());
}
Err(KVAppError::AppError(AppError::from(
MultiStmtTxnCommitFailed::new("update_multi_table_meta"),
)))
}

#[logcall::logcall("debug")]
#[minitrace::trace]
async fn set_table_column_mask_policy(
Expand Down
22 changes: 22 additions & 0 deletions src/meta/app/src/app_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,20 @@ impl UnknownStreamId {
}
}

#[derive(thiserror::Error, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[error("MultiStmtTxnCommitFailed: {context}")]
pub struct MultiStmtTxnCommitFailed {
context: String,
}

impl MultiStmtTxnCommitFailed {
pub fn new(context: impl Into<String>) -> MultiStmtTxnCommitFailed {
Self {
context: context.into(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, thiserror::Error)]
#[error("DuplicatedUpsertFiles: {table_id} , in operation `{context}`")]
pub struct DuplicatedUpsertFiles {
Expand Down Expand Up @@ -1032,6 +1046,9 @@ pub enum AppError {

#[error(transparent)]
UnknownStreamId(#[from] UnknownStreamId),

#[error(transparent)]
MultiStatementTxnCommitFailed(#[from] MultiStmtTxnCommitFailed),
}

impl AppErrorMessage for UnknownBackgroundJob {
Expand Down Expand Up @@ -1104,6 +1121,8 @@ impl AppErrorMessage for StreamVersionMismatched {}

impl AppErrorMessage for UnknownStreamId {}

impl AppErrorMessage for MultiStmtTxnCommitFailed {}

impl AppErrorMessage for DuplicatedUpsertFiles {}

impl AppErrorMessage for TableAlreadyExists {
Expand Down Expand Up @@ -1450,6 +1469,9 @@ impl From<AppError> for ErrorCode {
AppError::VirtualColumnAlreadyExists(err) => {
ErrorCode::VirtualColumnAlreadyExists(err.message())
}
AppError::MultiStatementTxnCommitFailed(err) => {
ErrorCode::UnresolvableConflict(err.message())
}
}
}
}
1 change: 1 addition & 0 deletions src/meta/app/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pub use table::TruncateTableReply;
pub use table::TruncateTableReq;
pub use table::UndropTableReply;
pub use table::UndropTableReq;
pub use table::UpdateMultiTableMetaReq;
pub use table::UpdateStreamMetaReq;
pub use table::UpdateTableMetaReply;
pub use table::UpdateTableMetaReq;
Expand Down
7 changes: 7 additions & 0 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,13 @@ pub struct UpdateTableMetaReq {
pub deduplicated_label: Option<String>,
}

pub struct UpdateMultiTableMetaReq {
pub update_table_metas: Vec<UpdateTableMetaReq>,
pub copied_files: Vec<(u64, UpsertTableCopiedFileReq)>,
pub update_stream_metas: Vec<UpdateStreamMetaReq>,
pub deduplicated_labels: Vec<String>,
}

impl UpsertTableOptionReq {
pub fn new(
table_ident: &TableIdent,
Expand Down
8 changes: 8 additions & 0 deletions src/query/ast/src/ast/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ pub enum Statement {
DescribePipe(DescribePipeStmt),
DropPipe(DropPipeStmt),
AlterPipe(AlterPipeStmt),

// Transactions
Begin,
Commit,
Abort,
}

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -641,6 +646,9 @@ impl Display for Statement {
Statement::DropConnection(stmt) => write!(f, "{stmt}")?,
Statement::DescribeConnection(stmt) => write!(f, "{stmt}")?,
Statement::ShowConnections(stmt) => write!(f, "{stmt}")?,
Statement::Begin => write!(f, "BEGIN")?,
Statement::Commit => write!(f, "COMMIT")?,
Statement::Abort => write!(f, "ABORT")?,
}
Ok(())
}
Expand Down
12 changes: 12 additions & 0 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1874,6 +1874,15 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
},
);

let begin = map(rule!(BEGIN), |_| Statement::Begin);
let commit = map(rule!(COMMIT), |_| Statement::Commit);
let abort = map(
rule! {
(ABORT | ROLLBACK)
},
|_| Statement::Abort,
);

let statement_body = alt((
// query, explain,show
rule!(
Expand Down Expand Up @@ -1924,6 +1933,9 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
rule!(
#set_variable : "`SET <variable> = <value>`"
| #unset_variable : "`UNSET <variable>`"
|#begin : "`BEGIN`"
| #commit : "`COMMIT`"
| #abort : "`ABORT`"
),
rule!(
#show_tables : "`SHOW [FULL] TABLES [FROM <database>] [<show_limit>]`"
Expand Down
8 changes: 8 additions & 0 deletions src/query/ast/src/parser/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,14 @@ pub enum TokenKind {
PREFIX,
#[token("MODIFIED_AFTER", ignore(ascii_case))]
MODIFIED_AFTER,
#[token("BEGIN", ignore(ascii_case))]
BEGIN,
#[token("COMMIT", ignore(ascii_case))]
COMMIT,
#[token("ABORT", ignore(ascii_case))]
ABORT,
#[token("ROLLBACK", ignore(ascii_case))]
ROLLBACK,
#[token("TEMPORARY", ignore(ascii_case))]
TEMPORARY,
#[token("SECONDS", ignore(ascii_case))]
Expand Down
3 changes: 3 additions & 0 deletions src/query/ast/src/visitors/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,5 +529,8 @@ pub fn walk_statement<'a, V: Visitor<'a>>(visitor: &mut V, statement: &'a Statem
Statement::AlterPipe(_) => todo!(),
Statement::DropPipe(_) => todo!(),
Statement::DescribePipe(_) => todo!(),
Statement::Begin => {}
Statement::Commit => {}
Statement::Abort => {}
}
}
3 changes: 3 additions & 0 deletions src/query/ast/src/visitors/walk_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,5 +537,8 @@ pub fn walk_statement_mut<V: VisitorMut>(visitor: &mut V, statement: &mut Statem
Statement::AlterPipe(_) => todo!(),
Statement::DropPipe(_) => todo!(),
Statement::DescribePipe(_) => todo!(),
Statement::Begin => {}
Statement::Commit => {}
Statement::Abort => {}
}
}
1 change: 1 addition & 0 deletions src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ databend-common-settings = { path = "../settings" }
databend-common-storage = { path = "../../common/storage" }
databend-common-users = { path = "../users" }
databend-storages-common-table-meta = { path = "../storages/common/table_meta" }
databend-storages-common-txn = { path = "../storages/common/txn" }

arrow-schema = { workspace = true }
async-backtrace = { workspace = true }
Expand Down
19 changes: 19 additions & 0 deletions src/query/catalog/src/catalog/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use databend_common_meta_app::schema::UndropTableReply;
use databend_common_meta_app::schema::UndropTableReq;
use databend_common_meta_app::schema::UpdateIndexReply;
use databend_common_meta_app::schema::UpdateIndexReq;
use databend_common_meta_app::schema::UpdateMultiTableMetaReq;
use databend_common_meta_app::schema::UpdateTableMetaReply;
use databend_common_meta_app::schema::UpdateTableMetaReq;
use databend_common_meta_app::schema::UpdateVirtualColumnReply;
Expand Down Expand Up @@ -252,6 +253,12 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
req: UpdateTableMetaReq,
) -> Result<UpdateTableMetaReply>;

async fn update_multi_table_meta(&self, _req: UpdateMultiTableMetaReq) -> Result<()> {
Err(ErrorCode::Unimplemented(
"'update_multi_table_meta' not implemented",
))
}

async fn set_table_column_mask_policy(
&self,
req: SetTableColumnMaskPolicyReq,
Expand Down Expand Up @@ -310,4 +317,16 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
fn get_table_engines(&self) -> Vec<StorageDescription> {
unimplemented!()
}

async fn stream_source_table(
&self,
_stream_desc: &str,
_tenant: &str,
_db_name: &str,
_source_table_name: &str,
) -> Result<Arc<dyn Table>> {
Err(ErrorCode::Unimplemented(
"'stream_source_table' not implemented",
))
}
}
Loading

0 comments on commit b7a7432

Please sign in to comment.