From b1711e5ccf94d56d6a24586696d351bfe7bba9ea Mon Sep 17 00:00:00 2001 From: tom1484 Date: Tue, 19 Mar 2024 10:31:17 +0800 Subject: [PATCH] use transaction in mutations --- editor-server/src/graphql/mutations/color.rs | 176 +++++++++--------- .../src/graphql/mutations/control_frame.rs | 157 +++++----------- .../src/graphql/mutations/control_map.rs | 34 ++-- editor-server/src/graphql/mutations/dancer.rs | 32 +++- editor-server/src/graphql/mutations/led.rs | 54 +++--- editor-server/src/graphql/mutations/model.rs | 20 +- editor-server/src/graphql/mutations/part.rs | 136 +++----------- .../src/graphql/mutations/position_frame.rs | 133 ++++--------- .../src/graphql/mutations/position_map.rs | 27 ++- .../src/graphql/mutations/request_edit.rs | 85 +++++---- editor-server/src/graphql/mutations/shift.rs | 125 ++++--------- editor-server/src/utils/data.rs | 56 +++--- editor-server/src/utils/revision.rs | 16 +- 13 files changed, 434 insertions(+), 617 deletions(-) diff --git a/editor-server/src/graphql/mutations/color.rs b/editor-server/src/graphql/mutations/color.rs index a9116cc33..c4d0f9d6d 100644 --- a/editor-server/src/graphql/mutations/color.rs +++ b/editor-server/src/graphql/mutations/color.rs @@ -64,6 +64,7 @@ impl ColorMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; let _ = sqlx::query!( r#" @@ -76,22 +77,24 @@ impl ColorMutation { data.color_code.set[2], id ) - .execute(mysql) + .execute(&mut *tx) .await?; + update_revision(&mut *tx).await?; + + // commit the transaction + tx.commit().await?; + let color_payload = ColorPayload { mutation: ColorMutationMode::Updated, id, color: Some(data.color.set.clone()), color_code: Some(data.color_code.set.clone()), edit_by: context.user_id, - // edit_by: 0, }; Subscriptor::publish(color_payload); - update_revision(mysql).await?; - let color = Color { id, color: data.color.set, @@ -106,6 +109,7 @@ impl ColorMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; let id = sqlx::query!( r#" @@ -117,21 +121,11 @@ impl ColorMutation { color.color_code.set[1], color.color_code.set[2] ) - .execute(mysql) + .execute(&mut *tx) .await? .last_insert_id() as i32; - let color_payload = ColorPayload { - mutation: ColorMutationMode::Created, - id, - color: Some(color.color.clone()), - color_code: Some(color.color_code.set.clone()), - edit_by: context.user_id, - }; - - Subscriptor::publish(color_payload); - - if color.auto_create_effect.unwrap_or(false) { + let led_payload = if color.auto_create_effect.unwrap_or(false) { let model_parts = sqlx::query!( r#" SELECT @@ -145,7 +139,7 @@ impl ColorMutation { WHERE Part.type = 'LED'; "# ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await? .into_iter() .map(|row| { @@ -154,49 +148,43 @@ impl ColorMutation { }) .collect_vec(); - let create_effect_futures = model_parts.iter().map(|model_part| { - sqlx::query!( - r#" - INSERT INTO LEDEffect (name, model_id, part_id) - VALUES (?, ?, ?); - "#, - color.color.clone(), - model_part.0, - model_part.1 - ) - .execute(mysql) - }); - - let create_effect_results = futures::future::join_all(create_effect_futures) - .await - .into_iter() - .filter_map(|result| result.ok()) - .collect_vec(); - - let mut create_states_futures = Vec::new(); - create_effect_results - .iter() - .zip(&model_parts) - .for_each(|(result, model_part)| { - for pos in 0..model_part.2 { - create_states_futures.push( - sqlx::query!( - r#" - INSERT INTO LEDEffectState (effect_id, position, color_id, alpha) - VALUES (?, ?, ?, ?) - ON DUPLICATE KEY UPDATE color_id = VALUES(color_id), alpha = VALUES(alpha); - "#, - result.last_insert_id() as i32, - pos, - id, - 255 - ) - .execute(mysql), - ) - } - }); - - futures::future::join_all(create_states_futures).await; + let mut create_effect_results = Vec::new(); + for model_part in model_parts.iter() { + create_effect_results.push( + sqlx::query!( + r#" + INSERT INTO LEDEffect (name, model_id, part_id) + VALUES (?, ?, ?); + "#, + color.color.clone(), + model_part.0, + model_part.1 + ) + .execute(&mut *tx) + .await?, + ); + } + + let mut create_states_results = Vec::new(); + for (result, model_part) in create_effect_results.iter().zip(&model_parts) { + for pos in 0..model_part.2 { + create_states_results.push( + sqlx::query!( + r#" + INSERT INTO LEDEffectState (effect_id, position, color_id, alpha) + VALUES (?, ?, ?, ?) + ON DUPLICATE KEY UPDATE color_id = VALUES(color_id), alpha = VALUES(alpha); + "#, + result.last_insert_id() as i32, + pos, + id, + 255 + ) + .execute(&mut *tx) + .await?, + ) + } + } let create_effects = create_effect_results .iter() @@ -222,16 +210,33 @@ impl ColorMutation { }) .collect_vec(); - let led_payload = LEDPayload { + Some(LEDPayload { create_effects, update_effects: Vec::new(), delete_effects: Vec::new(), - }; + }) + } else { + None + }; + + update_revision(&mut *tx).await?; + // commit the transaction + tx.commit().await?; + + if let Some(led_payload) = led_payload { Subscriptor::publish(led_payload); } - update_revision(mysql).await?; + let color_payload = ColorPayload { + mutation: ColorMutationMode::Created, + id, + color: Some(color.color.clone()), + color_code: Some(color.color_code.set.clone()), + edit_by: context.user_id, + }; + + Subscriptor::publish(color_payload); let color = Color { id, @@ -248,6 +253,7 @@ impl ColorMutation { let app_state = &context.clients; let mysql = app_state.mysql_pool(); + let mut tx = mysql.begin().await?; let color = sqlx::query_as!( ColorData, @@ -257,7 +263,7 @@ impl ColorMutation { "#, id ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await?; let check_color = sqlx::query!( @@ -268,7 +274,7 @@ impl ColorMutation { "#, id ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await? .count; @@ -282,7 +288,7 @@ impl ColorMutation { "#, id ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await? .count; @@ -301,19 +307,9 @@ impl ColorMutation { "#, id ) - .execute(mysql) + .execute(&mut *tx) .await?; - let color_payload = ColorPayload { - mutation: ColorMutationMode::Deleted, - id, - color: None, - color_code: None, - edit_by: context.user_id, - }; - - Subscriptor::publish(color_payload); - let delete_effects = sqlx::query!( r#" SELECT @@ -329,10 +325,10 @@ impl ColorMutation { "#, id ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; - let delete_effect_futures = delete_effects.iter().map(|effect| { + for effect in delete_effects.iter() { sqlx::query!( r#" DELETE FROM LEDEffect @@ -340,10 +336,24 @@ impl ColorMutation { "#, effect.id ) - .execute(mysql) - }); + .execute(&mut *tx) + .await?; + } + + update_revision(&mut *tx).await?; - futures::future::join_all(delete_effect_futures).await; + // commit the transaction + tx.commit().await?; + + let color_payload = ColorPayload { + mutation: ColorMutationMode::Deleted, + id, + color: None, + color_code: None, + edit_by: context.user_id, + }; + + Subscriptor::publish(color_payload); let delete_effects = delete_effects .iter() @@ -365,8 +375,6 @@ impl ColorMutation { Subscriptor::publish(led_payload); - update_revision(mysql).await?; - Ok(ColorResponse { id, msg: "Color deleted.".to_string(), diff --git a/editor-server/src/graphql/mutations/control_frame.rs b/editor-server/src/graphql/mutations/control_frame.rs index 12ef2b5e9..4c3d005ac 100644 --- a/editor-server/src/graphql/mutations/control_frame.rs +++ b/editor-server/src/graphql/mutations/control_frame.rs @@ -61,7 +61,9 @@ impl ControlFrameMutation { ) -> FieldResult { let context = ctx.data::()?; let clients = context.clients; + let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; // check if the control frame already exists on the start time let exist = sqlx::query!( @@ -75,7 +77,7 @@ impl ControlFrameMutation { "#, start ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await? .exist; @@ -102,7 +104,7 @@ impl ControlFrameMutation { ORDER BY Dancer.id ASC, Part.id ASC; "#, ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; // Use the partition_by_field function to group raw_dancer_data by dancer_id @@ -139,7 +141,7 @@ impl ControlFrameMutation { SELECT id FROM Color ORDER BY id ASC; "#, ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await? .iter() .map(|color_id| color_id.id) @@ -150,7 +152,7 @@ impl ControlFrameMutation { SELECT id FROM LEDEffect ORDER BY id ASC; "#, ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await? .into_iter() .map(|effect_id| effect_id.id) @@ -255,7 +257,7 @@ impl ControlFrameMutation { start, fade ) - .execute(mysql) + .execute(&mut *tx) .await?; let new_control_frame_id = new_control_frame.last_insert_id() as i32; @@ -290,7 +292,7 @@ impl ControlFrameMutation { _data[0], _data[1], ) - .execute(mysql) + .execute(&mut *tx) .await?; } PartType::LED => { @@ -312,7 +314,7 @@ impl ControlFrameMutation { effect_id, alpha, ) - .execute(mysql) + .execute(&mut *tx) .await?; } else { sqlx::query!( @@ -327,7 +329,7 @@ impl ControlFrameMutation { "EFFECT": ControlDataType, alpha, ) - .execute(mysql) + .execute(&mut *tx) .await?; } } @@ -362,7 +364,7 @@ impl ControlFrameMutation { -1, 0, ) - .execute(mysql) + .execute(&mut *tx) .await?; } PartType::LED => { @@ -380,7 +382,7 @@ impl ControlFrameMutation { -1, 0, ) - .execute(mysql) + .execute(&mut *tx) .await?; } } @@ -388,8 +390,14 @@ impl ControlFrameMutation { } } + update_revision(&mut *tx).await?; + // update redis control - update_redis_control(mysql, &clients.redis_client, new_control_frame_id).await?; + update_redis_control(&mut *tx, &clients.redis_client, new_control_frame_id).await?; + + // commit the transaction + tx.commit().await?; + let redis_control = get_redis_control(&clients.redis_client, new_control_frame_id).await?; let create_frames = HashMap::from([(new_control_frame_id.to_string(), redis_control)]); @@ -411,50 +419,6 @@ impl ControlFrameMutation { // publish control map Subscriptor::publish(control_map_payload); - // below is the code for publishing control record - // first, get the index of the new control frame - - let mut index = -1; - - // get all control frames - let all_control_frames = sqlx::query!( - r#" - SELECT id - FROM ControlFrame - ORDER BY start ASC - "# - ) - .fetch_all(mysql) - .await?; - - // get the index of the new control frame - for (i, control_frame) in all_control_frames.iter().enumerate() { - if control_frame.id == new_control_frame_id { - index = i as i32; - break; - } - } - - // if the index is not found, return error - if index == -1 { - return Err(Error::new("Index of the new control frame is not found")); - } - - // create control map payload - let control_record_payload = ControlRecordPayload { - mutation: ControlRecordMutationMode::Created, - index, - add_id: vec![new_control_frame_id], - update_id: vec![], - delete_id: vec![], - edit_by: context.user_id, - }; - - // publish control record - Subscriptor::publish(control_record_payload); - - update_revision(mysql).await?; - // return Ok("ok".to_string()) } @@ -469,7 +433,9 @@ impl ControlFrameMutation { // get the context and clients let context = ctx.data::()?; let clients = context.clients; + let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; // get the input data let frame_id = input.frame_id; @@ -496,7 +462,7 @@ impl ControlFrameMutation { start, frame_id ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await? .exist; @@ -531,7 +497,7 @@ impl ControlFrameMutation { "#, frame_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await; // if the original frame is not found, return error @@ -555,7 +521,7 @@ impl ControlFrameMutation { "#, frame_id, ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if is_editing.is_some() { @@ -596,7 +562,7 @@ impl ControlFrameMutation { fade, frame_id ) - .execute(mysql) + .execute(&mut *tx) .await?; // update revision of the frame @@ -608,11 +574,17 @@ impl ControlFrameMutation { "#, frame_id ) - .execute(mysql) + .execute(&mut *tx) .await?; + update_revision(&mut *tx).await?; + // update redis control - update_redis_control(mysql, &clients.redis_client, frame_id).await?; + update_redis_control(&mut *tx, &clients.redis_client, frame_id).await?; + + // commit the transaction + tx.commit().await?; + let redis_control = get_redis_control(&clients.redis_client, frame_id).await?; let update_frames = HashMap::from([(frame_id.to_string(), redis_control)]); @@ -634,52 +606,6 @@ impl ControlFrameMutation { // publish control map Subscriptor::publish(control_map_payload); - // below is the code for publishing control record - // first, get the index of the updated control frame - - let mut index = -1; - - // get all control frames - let all_control_frames = sqlx::query!( - r#" - SELECT id - FROM ControlFrame - ORDER BY start ASC - "# - ) - .fetch_all(mysql) - .await?; - - // get the index of the updated control frame - for (i, control_frame) in all_control_frames.iter().enumerate() { - if control_frame.id == frame_id { - index = i as i32; - break; - } - } - - // if the index is not found, return error - if index == -1 { - return Err(Error::new( - "Index of the updated control frame is not found", - )); - } - - // create control map payload - let control_record_payload = ControlRecordPayload { - mutation: ControlRecordMutationMode::Updated, - index, - add_id: vec![], - update_id: vec![frame_id], - delete_id: vec![], - edit_by: context.user_id, - }; - - // publish control record - Subscriptor::publish(control_record_payload); - - update_revision(mysql).await?; - // return Ok("ok".to_string()) } @@ -693,7 +619,9 @@ impl ControlFrameMutation { // get the context and clients let context = ctx.data::()?; let clients = context.clients; + let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; // get the input data let frame_id = input.frame_id; @@ -719,7 +647,7 @@ impl ControlFrameMutation { "#, frame_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await; // if the original frame is not found, return error @@ -743,7 +671,7 @@ impl ControlFrameMutation { "#, frame_id, ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if is_editing.is_some() { @@ -766,11 +694,16 @@ impl ControlFrameMutation { "#, frame_id ) - .execute(mysql) + .execute(&mut *tx) .await?; + update_revision(&mut *tx).await?; + // update redis control - update_redis_control(mysql, &clients.redis_client, frame_id).await?; + update_redis_control(&mut *tx, &clients.redis_client, frame_id).await?; + + // commit the transaction + tx.commit().await?; // below is the code for publishing control map @@ -805,8 +738,6 @@ impl ControlFrameMutation { // publish control record Subscriptor::publish(control_record_payload); - update_revision(mysql).await?; - // return Ok("ok".to_string()) } diff --git a/editor-server/src/graphql/mutations/control_map.rs b/editor-server/src/graphql/mutations/control_map.rs index f57f4d702..e829f614a 100644 --- a/editor-server/src/graphql/mutations/control_map.rs +++ b/editor-server/src/graphql/mutations/control_map.rs @@ -62,7 +62,9 @@ impl ControlMapMutation { // get the context and clients let context = ctx.data::()?; let clients = context.clients; + let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; // get the input data let frame_id = input.frame_id; @@ -93,7 +95,7 @@ impl ControlMapMutation { "#, frame_id, ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; // if the frame does not exist, return error @@ -112,7 +114,7 @@ impl ControlMapMutation { "#, frame_id, ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if is_editing.is_some() { @@ -152,7 +154,7 @@ impl ControlMapMutation { "#, frame_id ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; // Use the partition_by_field function to group raw_dancer_data by dancer_id @@ -191,7 +193,7 @@ impl ControlMapMutation { SELECT id FROM Color ORDER BY id ASC; "#, ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await? .iter() .map(|color_id| color_id.id) @@ -202,7 +204,7 @@ impl ControlMapMutation { SELECT id FROM LEDEffect ORDER BY id ASC; "#, ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await? .into_iter() .map(|effect_id| effect_id.id) @@ -333,7 +335,7 @@ impl ControlMapMutation { part.part_id, dancer_id, ) - .execute(mysql) + .execute(&mut *tx) .await?; } // if the part is LED, update the effect and alpha @@ -354,7 +356,7 @@ impl ControlMapMutation { part.part_id, dancer_id, ) - .execute(mysql) + .execute(&mut *tx) .await?; } else { sqlx::query!( @@ -368,7 +370,7 @@ impl ControlMapMutation { part.part_id, dancer_id, ) - .execute(mysql) + .execute(&mut *tx) .await?; } } @@ -396,7 +398,7 @@ impl ControlMapMutation { fade, frame_id, ) - .execute(mysql) + .execute(&mut *tx) .await?; } None => {} @@ -414,7 +416,7 @@ impl ControlMapMutation { "#, context.user_id, ) - .execute(mysql) + .execute(&mut *tx) .await?; // update revision of the frame @@ -426,11 +428,17 @@ impl ControlMapMutation { "#, frame_id, ) - .execute(mysql) + .execute(&mut *tx) .await?; + update_revision(&mut *tx).await?; + // update redis - update_redis_control(mysql, &clients.redis_client, frame_id).await?; + update_redis_control(&mut *tx, &clients.redis_client, frame_id).await?; + + // commit the transaction + tx.commit().await?; + let redis_control = get_redis_control(&clients.redis_client, frame_id).await?; let update_frames = HashMap::from([(frame_id.to_string(), redis_control)]); @@ -452,8 +460,6 @@ impl ControlMapMutation { // publish control map Subscriptor::publish(control_map_payload); - update_revision(mysql).await?; - // TODO: check the necessity of publishing the control record (similar to the code in control_frame.rs) // the previous code doesn't implement this diff --git a/editor-server/src/graphql/mutations/dancer.rs b/editor-server/src/graphql/mutations/dancer.rs index 2752ba84d..0236e978b 100644 --- a/editor-server/src/graphql/mutations/dancer.rs +++ b/editor-server/src/graphql/mutations/dancer.rs @@ -48,6 +48,8 @@ impl DancerMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; + let redis = clients.redis_client(); let dancer_name = input.name.clone(); @@ -59,7 +61,7 @@ impl DancerMutation { "#, &dancer_name ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await; if let Ok(dancer) = dancer_result { @@ -75,7 +77,7 @@ impl DancerMutation { "#, &input.model ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await?; let dancer_id = sqlx::query!( @@ -85,10 +87,13 @@ impl DancerMutation { &dancer_name, raw_model.id ) - .execute(mysql) + .execute(&mut *tx) .await? .last_insert_id() as i32; + // Commit the transaction + tx.commit().await?; + init_redis_control(mysql, redis).await?; init_redis_position(mysql, redis).await?; @@ -121,6 +126,7 @@ impl DancerMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; let dancer_id = input.id; let dancer_name = input.name.clone(); @@ -132,7 +138,7 @@ impl DancerMutation { "#, &dancer_id, ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await; if let Err(_) = raw_dancer { @@ -159,9 +165,12 @@ impl DancerMutation { &dancer_name, &dancer_id.id ) - .execute(mysql) + .execute(&mut *tx) .await?; + // Commit the transaction + tx.commit().await?; + let dancer_payload = DancerPayload { mutation: DancerMutationMode::Updated, dancer_data: Some(Dancer { @@ -191,6 +200,8 @@ impl DancerMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; + let redis = clients.redis_client(); let dancer_id = input.id; @@ -202,7 +213,7 @@ impl DancerMutation { "#, &dancer_id ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await; let _dancer = match raw_dancer { @@ -221,11 +232,14 @@ impl DancerMutation { "#, &dancer_id ) - .execute(mysql) + .execute(&mut *tx) .await?; - let _ = init_redis_control(mysql, redis).await; - let _ = init_redis_position(mysql, redis).await; + // Commit the transaction + tx.commit().await?; + + init_redis_control(mysql, redis).await; + init_redis_position(mysql, redis).await; let dancer_payload = DancerPayload { mutation: DancerMutationMode::Deleted, diff --git a/editor-server/src/graphql/mutations/led.rs b/editor-server/src/graphql/mutations/led.rs index 1cc8af5a2..c604a8f21 100644 --- a/editor-server/src/graphql/mutations/led.rs +++ b/editor-server/src/graphql/mutations/led.rs @@ -63,6 +63,7 @@ impl LEDMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; let effect_name = input.name.clone(); let model_name = input.model_name.clone(); @@ -91,7 +92,7 @@ impl LEDMutation { &part_name, &model_name, ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await { Ok(row) => (row.model_id, row.part_id), @@ -115,7 +116,7 @@ impl LEDMutation { SELECT id FROM Color; "# ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; // check if all color ids in frames are in db @@ -153,7 +154,7 @@ impl LEDMutation { part_id, model_id ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await { Ok(_) => { @@ -177,7 +178,7 @@ impl LEDMutation { model_id, part_id ) - .execute(mysql) + .execute(&mut *tx) .await? .last_insert_id() as i32, }; @@ -195,11 +196,16 @@ impl LEDMutation { led[0], led[1], ) - .execute(mysql) + .execute(&mut *tx) .await?; } } + update_revision(&mut *tx).await?; + + // Commit the transaction + tx.commit().await?; + // publish to subscribers let led_payload = LEDPayload { create_effects: vec![LEDEffectData { @@ -216,8 +222,6 @@ impl LEDMutation { Subscriptor::publish(led_payload); - update_revision(mysql).await?; - Ok(LEDEffectResponse { id: effect_id, model_name, @@ -240,6 +244,7 @@ impl LEDMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; let id = input.id; let effect_name = input.name.clone(); @@ -268,7 +273,7 @@ impl LEDMutation { "#, id ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await { Ok(led_effect) => led_effect, @@ -293,7 +298,7 @@ impl LEDMutation { "#, id ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await { if effect.user_id != context.user_id { @@ -310,7 +315,7 @@ impl LEDMutation { SELECT id FROM Color; "# ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; // check if all color ids in frames are in db @@ -348,7 +353,7 @@ impl LEDMutation { &effect_name, id, ) - .execute(mysql) + .execute(&mut *tx) .await?; // update LEDEffectStates @@ -365,11 +370,16 @@ impl LEDMutation { i as i32, id ) - .execute(mysql) + .execute(&mut *tx) .await?; } } + update_revision(&mut *tx).await?; + + // Commit the transaction + tx.commit().await?; + let led_payload = LEDPayload { create_effects: Vec::new(), update_effects: vec![LEDEffectData { @@ -385,8 +395,6 @@ impl LEDMutation { Subscriptor::publish(led_payload); - update_revision(mysql).await?; - Ok(LEDEffectResponse { id, model_name: led_effect.model_name, @@ -409,6 +417,7 @@ impl LEDMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; // check if effect exists let led_effect = match sqlx::query!( @@ -424,7 +433,7 @@ impl LEDMutation { "#, id ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await { Ok(led_effect) => led_effect, @@ -443,7 +452,7 @@ impl LEDMutation { "#, id ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; if !control_frames.is_empty() { @@ -470,7 +479,7 @@ impl LEDMutation { "#, id ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await { if effect.user_id != context.user_id { @@ -489,7 +498,7 @@ impl LEDMutation { "#, id ) - .execute(mysql) + .execute(&mut *tx) .await?; // delete from LEDEffect @@ -500,9 +509,14 @@ impl LEDMutation { "#, id ) - .execute(mysql) + .execute(&mut *tx) .await?; + update_revision(&mut *tx).await?; + + // Commit the transaction + tx.commit().await?; + // publish to subscribers let led_payload = LEDPayload { create_effects: Vec::new(), @@ -519,8 +533,6 @@ impl LEDMutation { Subscriptor::publish(led_payload); - update_revision(mysql).await?; - Ok(DeleteLEDEffectResponse { ok: true, msg: "successfully deleted LED effect".to_string(), diff --git a/editor-server/src/graphql/mutations/model.rs b/editor-server/src/graphql/mutations/model.rs index 369d03b9d..f22ffe695 100644 --- a/editor-server/src/graphql/mutations/model.rs +++ b/editor-server/src/graphql/mutations/model.rs @@ -41,6 +41,8 @@ impl ModelMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; + let redis = clients.redis_client(); let model_name = input.name.clone(); @@ -68,11 +70,14 @@ impl ModelMutation { "#, &model_name ) - .execute(mysql) + .execute(&mut *tx) .await? .last_insert_id() as i32, }; + // Commit the transaction + tx.commit().await?; + Ok(ModelMutationResponse { ok: true, msg: "Model added".to_string(), @@ -89,6 +94,7 @@ impl ModelMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; let model_id = input.id; let model_name = input.name.clone(); @@ -122,9 +128,12 @@ impl ModelMutation { &model_name, &model_id ) - .execute(mysql) + .execute(&mut *tx) .await?; + // Commit the transaction + tx.commit().await?; + Ok(ModelMutationResponse { ok: true, msg: "Model updated".to_string(), @@ -141,6 +150,8 @@ impl ModelMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; + let redis = clients.redis_client(); let model_id = input.id; @@ -171,9 +182,12 @@ impl ModelMutation { "#, &model_id ) - .execute(mysql) + .execute(&mut *tx) .await?; + // Commit the transaction + tx.commit().await?; + Ok(ModelMutationResponse { ok: true, msg: "Model deleted".to_string(), diff --git a/editor-server/src/graphql/mutations/part.rs b/editor-server/src/graphql/mutations/part.rs index 2744b47be..d348c1a3e 100644 --- a/editor-server/src/graphql/mutations/part.rs +++ b/editor-server/src/graphql/mutations/part.rs @@ -2,9 +2,7 @@ use crate::db::types::{ model::ModelData, part::{PartData, PartType}, - position::PositionData, }; -use crate::graphql::types::dancer::Part; use crate::types::global::UserContext; use crate::utils::data::{init_redis_control, init_redis_position}; @@ -48,6 +46,8 @@ impl PartMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; + let redis = clients.redis_client(); let _check = match input.part_type { @@ -68,7 +68,7 @@ impl PartMutation { "#, input.model_name ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; let exist_model = match exist_model { @@ -99,7 +99,7 @@ impl PartMutation { input.name, exist_model.id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if let Some(_part) = exist_part { @@ -120,7 +120,7 @@ impl PartMutation { input.name, "FIBER" ) - .execute(mysql) + .execute(&mut *tx) .await? .last_insert_id() as i32, PartType::LED => sqlx::query!( @@ -135,53 +135,17 @@ impl PartMutation { .length .ok_or("length of LED part must be positive number")? ) - .execute(mysql) + .execute(&mut *tx) .await? .last_insert_id() as i32, }; - //find dancer parts - let _all_parts = sqlx::query_as!( - Part, - r#" - SELECT * FROM Part - WHERE model_id = ? - ORDER BY id ASC; - "#, - exist_model.id - ) - .fetch_all(mysql) - .await?; - - let _all_dancer_pos = sqlx::query_as!( - PositionData, - r#" - SELECT * FROM PositionData - WHERE dancer_id = ? - ORDER BY frame_id ASC; - "#, - exist_model.id - ) - .fetch_all(mysql) - .await?; + // Commit the transaction + tx.commit().await?; init_redis_control(mysql, redis).await?; init_redis_position(mysql, redis).await?; - // TODO: Query all dancers and notify about the new part. - // let dancer_payload = DancerPayload { - // mutation: DancerMutationMode::Updated, - // dancer_data: Some(Dancer { - // id: dancer.id, - // name: dancer.name.clone(), - // parts: Some(all_parts), - // position_datas: None, - // }), - // edit_by: context.user_id, - // }; - // - // Subscriptor::publish(dancer_payload); - Ok(PartResponse { ok: true, msg: Some("successfully add part".to_string()), @@ -204,6 +168,8 @@ impl PartMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; + let redis = clients.redis_client(); let deleted_part = sqlx::query_as!( @@ -213,7 +179,7 @@ impl PartMutation { "#, input.id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; match deleted_part { @@ -227,59 +193,22 @@ impl PartMutation { } } - let model_data = sqlx::query_as!( - ModelData, - r#" - SELECT Model.* - FROM Part - INNER JOIN Model ON Part.model_id = Model.id - WHERE Part.id = ?; - "#, - deleted_part.unwrap().id - ) - .fetch_optional(mysql) - .await?; - - let _ = sqlx::query!( + sqlx::query!( r#" DELETE FROM Part WHERE id = ?; "#, input.id ) - .execute(mysql) + .execute(&mut *tx) .await?; - //find model parts - let _all_parts = sqlx::query_as!( - Part, - r#" - SELECT * FROM Part - WHERE model_id = ? - ORDER BY id ASC; - "#, - model_data.clone().unwrap().id - ) - .fetch_all(mysql) - .await?; + // Commit the transaction + tx.commit().await?; init_redis_control(mysql, redis).await?; init_redis_position(mysql, redis).await?; - // TODO: Query all dancers and notify about the deleted part. - // let dancer_payload = DancerPayload { - // mutation: DancerMutationMode::Updated, - // dancer_data: Some(Dancer { - // id: dancer.clone().unwrap().id, - // name: dancer.clone().unwrap().name, - // parts: Some(all_parts), - // position_datas: None, - // }), - // edit_by: context.user_id, - // }; - // - // Subscriptor::publish(dancer_payload); - Ok(PartResponse { ok: true, msg: Some("successfully delete part".to_string()), @@ -295,6 +224,7 @@ impl PartMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; let _check = match input.part_type { PartType::FIBER | PartType::LED => true, @@ -314,7 +244,7 @@ impl PartMutation { "#, input.id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; let edited_part = match edited_part { @@ -335,7 +265,7 @@ impl PartMutation { "#, edited_part.model_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; let model_data = match model_data { @@ -360,35 +290,11 @@ impl PartMutation { input.length, input.id ) - .execute(mysql) - .await?; - - //find dancer parts - let _all_parts = sqlx::query_as!( - Part, - r#" - SELECT * FROM Part - WHERE model_id = ? - ORDER BY id ASC; - "#, - model_data.id - ) - .fetch_all(mysql) + .execute(&mut *tx) .await?; - // TODO: Query all dancers and notify about the edited part. - // let dancer_payload = DancerPayload { - // mutation: DancerMutationMode::Updated, - // dancer_data: Some(Dancer { - // id: dancer.clone().unwrap().id, - // name: dancer.clone().unwrap().name, - // parts: Some(all_parts), - // position_datas: Some(all_dancer_pos), - // }), - // edit_by: context.user_id, - // }; - // - // Subscriptor::publish(dancer_payload); + // Commit the transaction + tx.commit().await?; Ok(PartResponse { ok: true, diff --git a/editor-server/src/graphql/mutations/position_frame.rs b/editor-server/src/graphql/mutations/position_frame.rs index 7a41c04a0..a58c99d96 100644 --- a/editor-server/src/graphql/mutations/position_frame.rs +++ b/editor-server/src/graphql/mutations/position_frame.rs @@ -4,7 +4,6 @@ use crate::db::types::{ dancer::DancerData, editing_position_frame::EditingPositionFrameData, position_frame::PositionFrameData, }; -use crate::graphql::position_record::{PositionRecordMutationMode, PositionRecordPayload}; use crate::graphql::subscriptions::position_map::PositionMapPayload; use crate::graphql::subscriptor::Subscriptor; use crate::graphql::types::pos_data::{FrameData, PosDataScalar}; @@ -43,6 +42,8 @@ impl PositionFrameMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; + let redis = clients.redis_client(); let check = sqlx::query_as!( @@ -53,7 +54,7 @@ impl PositionFrameMutation { "#, start ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if let Some(check) = check { @@ -71,7 +72,7 @@ impl PositionFrameMutation { ORDER BY id ASC; "# ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; match &position_data { @@ -108,7 +109,7 @@ impl PositionFrameMutation { "#, start ) - .execute(mysql) + .execute(&mut *tx) .await? .last_insert_id() as i32; @@ -128,7 +129,7 @@ impl PositionFrameMutation { coor[1], coor[2] ) - .execute(mysql) + .execute(&mut *tx) .await?; } @@ -158,13 +159,18 @@ impl PositionFrameMutation { 0.0, 0.0 ) - .execute(mysql) + .execute(&mut *tx) .await?; } } } - update_redis_position(mysql, redis, id).await?; + update_revision(&mut *tx).await?; + + update_redis_position(&mut *tx, redis, id).await?; + + // Commit the transaction + tx.commit().await?; // subscription let map_payload = PositionMapPayload { @@ -178,37 +184,6 @@ impl PositionFrameMutation { Subscriptor::publish(map_payload); - let all_position_frames = sqlx::query_as!( - PositionFrameData, - r#" - SELECT * FROM PositionFrame - ORDER BY start ASC; - "# - ) - .fetch_all(mysql) - .await?; - - let mut index = -1; - for (idx, frame) in all_position_frames.iter().enumerate() { - if frame.id == id { - index = idx as i32; - break; - } - } - - let record_payload = PositionRecordPayload { - mutation: PositionRecordMutationMode::Created, - edit_by: context.user_id, - add_id: vec![id], - update_id: vec![], - delete_id: vec![], - index, - }; - - Subscriptor::publish(record_payload); - - update_revision(mysql).await?; - Ok(PositionFrame { id, start, @@ -224,6 +199,8 @@ impl PositionFrameMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; + let redis = clients.redis_client(); let check = sqlx::query_as!( @@ -234,7 +211,7 @@ impl PositionFrameMutation { "#, input.start ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if let Some(check) = check { @@ -255,7 +232,7 @@ impl PositionFrameMutation { "#, input.frame_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if let Some(frame_to_edit) = frame_to_edit { @@ -274,7 +251,7 @@ impl PositionFrameMutation { "#, input.frame_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; match position_frame { @@ -294,7 +271,7 @@ impl PositionFrameMutation { input.start, input.frame_id ) - .execute(mysql) + .execute(&mut *tx) .await?; let _ = sqlx::query_as!( @@ -306,7 +283,7 @@ impl PositionFrameMutation { "#, context.user_id ) - .execute(mysql) + .execute(&mut *tx) .await?; // update revision of the frame @@ -319,11 +296,16 @@ impl PositionFrameMutation { "#, input.frame_id ) - .execute(mysql) + .execute(&mut *tx) .await?; + update_revision(&mut *tx).await?; + let position_frame = position_frame.unwrap(); - update_redis_position(mysql, redis, position_frame.id).await?; + update_redis_position(&mut *tx, redis, position_frame.id).await?; + + // Commit the transaction + tx.commit().await?; let redis_position = get_redis_position(redis, position_frame.id).await?; let update_frames = HashMap::from([(position_frame.id.to_string(), redis_position)]); @@ -340,37 +322,6 @@ impl PositionFrameMutation { Subscriptor::publish(payload); - let all_position_frames = sqlx::query_as!( - PositionFrameData, - r#" - SELECT * FROM PositionFrame - ORDER BY start ASC; - "#, - ) - .fetch_all(mysql) - .await?; - - let mut index = -1; - for (idx, frame) in all_position_frames.iter().enumerate() { - if frame.id == position_frame.id { - index = idx as i32; - break; - } - } - - let record_payload = PositionRecordPayload { - mutation: PositionRecordMutationMode::Updated, - edit_by: context.user_id, - add_id: vec![], - update_id: vec![position_frame.id], - delete_id: vec![], - index, - }; - - Subscriptor::publish(record_payload); - - update_revision(mysql).await?; - Ok(PositionFrame { id: input.frame_id, start: input.start, @@ -389,6 +340,8 @@ impl PositionFrameMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; + let redis = clients.redis_client(); let frame_to_delete = sqlx::query_as!( @@ -399,7 +352,7 @@ impl PositionFrameMutation { "#, input.frame_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if let Some(frame_to_delete) = frame_to_delete { @@ -418,7 +371,7 @@ impl PositionFrameMutation { "#, input.frame_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if deleted_frame.is_none() { @@ -433,7 +386,7 @@ impl PositionFrameMutation { "#, input.frame_id ) - .execute(mysql) + .execute(&mut *tx) .await?; let _ = sqlx::query_as!( @@ -445,9 +398,16 @@ impl PositionFrameMutation { "#, context.user_id ) - .execute(mysql) + .execute(&mut *tx) .await?; + update_revision(&mut *tx).await?; + + // Commit the transaction + tx.commit().await?; + + delete_redis_position(redis, input.frame_id).await?; + // subscription let map_payload = PositionMapPayload { edit_by: context.user_id, @@ -457,21 +417,8 @@ impl PositionFrameMutation { update_frames: HashMap::new(), }), }; - delete_redis_position(redis, input.frame_id).await?; Subscriptor::publish(map_payload); - let record_payload = PositionRecordPayload { - mutation: PositionRecordMutationMode::Deleted, - add_id: vec![], - update_id: vec![], - delete_id: vec![input.frame_id], - edit_by: context.user_id, - index: -1, - }; - Subscriptor::publish(record_payload); - - update_revision(mysql).await?; - let deleted_frame = deleted_frame.unwrap(); Ok(PositionFrame { diff --git a/editor-server/src/graphql/mutations/position_map.rs b/editor-server/src/graphql/mutations/position_map.rs index 60b111ea3..d59400b87 100644 --- a/editor-server/src/graphql/mutations/position_map.rs +++ b/editor-server/src/graphql/mutations/position_map.rs @@ -38,6 +38,8 @@ impl PositionMapMutation { let clients = context.clients; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; + let redis = clients.redis_client(); //check payload correctness @@ -49,7 +51,7 @@ impl PositionMapMutation { "#, input.frame_id ) - .fetch_one(mysql) + .fetch_one(&mut *tx) .await?; let editing = sqlx::query_as!( @@ -60,7 +62,7 @@ impl PositionMapMutation { "#, frame_to_edit.id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; // to check if the frame is editing by other user @@ -77,7 +79,7 @@ impl PositionMapMutation { ORDER BY id ASC; "# ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; if input.position_data.len() != dancers.len() { @@ -103,7 +105,7 @@ impl PositionMapMutation { dancer.id, frame_to_edit.id ) - .execute(mysql) + .execute(&mut *tx) .await?; } @@ -116,10 +118,16 @@ impl PositionMapMutation { "#, context.user_id ) - .execute(mysql) + .execute(&mut *tx) .await?; - update_redis_position(mysql, redis, frame_to_edit.id).await?; + update_revision(&mut *tx).await?; + + update_redis_position(&mut *tx, redis, frame_to_edit.id).await?; + + // Commit the transaction + tx.commit().await?; + let redis_position = get_redis_position(redis, frame_to_edit.id).await?; let update_frames = HashMap::from([(frame_to_edit.id.to_string(), redis_position)]); @@ -138,11 +146,12 @@ impl PositionMapMutation { let frame_ids = sqlx::query_as!( MapID, r#" - SELECT id FROM PositionFrame; - "# + SELECT id FROM PositionFrame; + "# ) .fetch_all(mysql) .await?; + let mut result: HashMap = HashMap::new(); for frame_id in frame_ids { result.insert( @@ -151,8 +160,6 @@ impl PositionMapMutation { ); } - update_revision(mysql).await?; - Ok(PositionMap { frame_ids: PositionMapScalar(result), }) diff --git a/editor-server/src/graphql/mutations/request_edit.rs b/editor-server/src/graphql/mutations/request_edit.rs index ed54d0b8a..4d9a174ff 100644 --- a/editor-server/src/graphql/mutations/request_edit.rs +++ b/editor-server/src/graphql/mutations/request_edit.rs @@ -29,8 +29,7 @@ impl RequestEditMutation { let clients = context.clients; let mysql = clients.mysql_pool(); - #[allow(unused_variables)] - let redis = clients.redis_client(); + let mut tx = mysql.begin().await?; let check_editing_control_frame = sqlx::query_as!( EditingControlFrameData, @@ -40,7 +39,7 @@ impl RequestEditMutation { "#, frame_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; let exist_frame = sqlx::query!( @@ -50,7 +49,7 @@ impl RequestEditMutation { "#, frame_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if exist_frame.is_none() { @@ -58,7 +57,7 @@ impl RequestEditMutation { } // update EditingControlFrame - match check_editing_control_frame { + let response = match check_editing_control_frame { Some(editing) => { if editing.user_id != context.user_id { return Ok(RequestEditResponse { @@ -81,15 +80,21 @@ impl RequestEditMutation { frame_id, context.user_id ) - .execute(mysql) + .execute(&mut *tx) .await?; Ok(RequestEditResponse { editing: Some(context.user_id), ok: true, }) } - } + }; + + // Commit the transaction + tx.commit().await?; + + response } + #[graphql(name = "RequestEditPosition")] async fn request_edit_position( &self, @@ -100,8 +105,7 @@ impl RequestEditMutation { let clients = context.clients; let mysql = clients.mysql_pool(); - #[allow(unused_variables)] - let redis = clients.redis_client(); + let mut tx = mysql.begin().await?; let check_editing_position_frame = sqlx::query_as!( EditingPositionFrameData, @@ -111,7 +115,7 @@ impl RequestEditMutation { "#, frame_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; let exist_frame = sqlx::query_as!( @@ -122,7 +126,7 @@ impl RequestEditMutation { "#, frame_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if exist_frame.is_none() { @@ -130,7 +134,7 @@ impl RequestEditMutation { } // update EditingPositionFrame - match check_editing_position_frame { + let response = match check_editing_position_frame { Some(editing) => { if editing.user_id != context.user_id { return Ok(RequestEditResponse { @@ -153,14 +157,19 @@ impl RequestEditMutation { frame_id, context.user_id ) - .execute(mysql) + .execute(&mut *tx) .await?; Ok(RequestEditResponse { editing: Some(context.user_id), ok: true, }) } - } + }; + + // Commit the transaction + tx.commit().await?; + + response } #[graphql(name = "RequestEditLEDEffect")] @@ -173,8 +182,7 @@ impl RequestEditMutation { let clients = context.clients; let mysql = clients.mysql_pool(); - #[allow(unused_variables)] - let redis = clients.redis_client(); + let mut tx = mysql.begin().await?; let check_editing_led_effect = sqlx::query_as!( EditingLEDEffectData, @@ -184,7 +192,7 @@ impl RequestEditMutation { "#, led_effect_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; let exist_led_effect = sqlx::query_as!( @@ -195,13 +203,13 @@ impl RequestEditMutation { "#, led_effect_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if exist_led_effect.is_none() { return Err(format!("frame id {} not found", led_effect_id).into()); } - match check_editing_led_effect { + let response = match check_editing_led_effect { Some(editing) => { if editing.user_id != context.user_id { return Ok(RequestEditResponse { @@ -224,14 +232,19 @@ impl RequestEditMutation { led_effect_id, context.user_id ) - .execute(mysql) + .execute(&mut *tx) .await?; Ok(RequestEditResponse { editing: Some(context.user_id), ok: true, }) } - } + }; + + // Commit the transaction + tx.commit().await?; + + response } #[graphql(name = "CancelEditPosition")] @@ -244,8 +257,7 @@ impl RequestEditMutation { let clients = context.clients; let mysql = clients.mysql_pool(); - #[allow(unused_variables)] - let redis = clients.redis_client(); + let mut tx = mysql.begin().await?; let exist_frame = sqlx::query_as!( PositionFrameData, @@ -255,7 +267,7 @@ impl RequestEditMutation { "#, frame_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if exist_frame.is_none() { @@ -270,9 +282,12 @@ impl RequestEditMutation { "#, frame_id ) - .execute(mysql) + .execute(&mut *tx) .await?; + // Commit the transaction + tx.commit().await?; + Ok(RequestEditResponse { editing: None, ok: true, @@ -289,8 +304,7 @@ impl RequestEditMutation { let clients = context.clients; let mysql = clients.mysql_pool(); - #[allow(unused_variables)] - let redis = clients.redis_client(); + let mut tx = mysql.begin().await?; let exist_frame = sqlx::query!( r#" @@ -299,7 +313,7 @@ impl RequestEditMutation { "#, frame_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if exist_frame.is_none() { @@ -314,9 +328,12 @@ impl RequestEditMutation { "#, frame_id ) - .execute(mysql) + .execute(&mut *tx) .await?; + // Commit the transaction + tx.commit().await?; + Ok(RequestEditResponse { editing: None, ok: true, @@ -333,8 +350,7 @@ impl RequestEditMutation { let clients = context.clients; let mysql = clients.mysql_pool(); - #[allow(unused_variables)] - let redis = clients.redis_client(); + let mut tx = mysql.begin().await?; let exist_led_effect = sqlx::query_as!( LEDEffectData, @@ -344,7 +360,7 @@ impl RequestEditMutation { "#, led_effect_id ) - .fetch_optional(mysql) + .fetch_optional(&mut *tx) .await?; if exist_led_effect.is_none() { @@ -359,9 +375,12 @@ impl RequestEditMutation { "#, led_effect_id ) - .execute(mysql) + .execute(&mut *tx) .await?; + // Commit the transaction + tx.commit().await?; + Ok(RequestEditResponse { editing: None, ok: true, diff --git a/editor-server/src/graphql/mutations/shift.rs b/editor-server/src/graphql/mutations/shift.rs index 8b520e178..97283169b 100644 --- a/editor-server/src/graphql/mutations/shift.rs +++ b/editor-server/src/graphql/mutations/shift.rs @@ -44,6 +44,9 @@ impl FrameMutation { let redis_client = &clients.redis_client; let mysql = clients.mysql_pool(); + let mut tx = mysql.begin().await?; + + println!("Shift start: {}, end: {}, mv: {}", start, end, mv); //check negative if start + mv < 0 { @@ -87,7 +90,7 @@ impl FrameMutation { check_start, check_end ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; if !editing_control_frame.is_empty() { for editing in editing_control_frame { @@ -116,7 +119,7 @@ impl FrameMutation { check_start, check_end ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; if !editing_position_frame.is_empty() { for editing in editing_position_frame { @@ -133,7 +136,7 @@ impl FrameMutation { } } - if shift_control { + let control_map_payload = if shift_control { //clear overlap interval let delete_control_frames = sqlx::query_as!( ControlFrameData, @@ -151,7 +154,7 @@ impl FrameMutation { overlap_start, overlap_end ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; let _ = sqlx::query!( r#" @@ -162,7 +165,7 @@ impl FrameMutation { overlap_start, overlap_end ) - .execute(mysql) + .execute(&mut *tx) .await?; //get source data @@ -185,7 +188,7 @@ impl FrameMutation { start, end ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; } else { update_control_frames = sqlx::query_as!( @@ -205,7 +208,7 @@ impl FrameMutation { start, end ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; } //update database and redis @@ -221,9 +224,9 @@ impl FrameMutation { control_frame.start + mv, control_frame.start ) - .execute(mysql) + .execute(&mut *tx) .await?; - let result = update_redis_control(mysql, redis_client, control_frame.id).await; + let result = update_redis_control(&mut *tx, redis_client, control_frame.id).await; match result { Ok(_) => (), Err(msg) => return Err(GQLError::new(msg)), @@ -256,52 +259,19 @@ impl FrameMutation { update_control_frames.insert(id.to_string(), redis_control); } - let control_map_payload = ControlMapPayload { + Some(ControlMapPayload { edit_by: context.user_id, frame: ControlFramesSubDatScalar(ControlFramesSubData { create_frames: HashMap::new(), delete_frames: delete_control_list.clone(), update_frames: update_control_frames, }), - }; - Subscriptor::publish(control_map_payload); - - // NOTE: Not used in frontend - // let all_control_frames = sqlx::query_as!( - // ControlFrameData, - // r#" - // SELECT - // id, - // start, - // fade as "fade: bool", - // meta_rev, - // data_rev - // FROM ControlFrame - // ORDER BY start ASC; - // "# - // ) - // .fetch_all(mysql) - // .await?; - // let mut index = -1; - // - // for (idx, frame) in all_control_frames.iter().enumerate() { - // if frame.id == update_control_ids.clone()[0] { - // index = idx as i32; - // } - // } - // - // let control_record_payload = ControlRecordPayload { - // mutation: ControlRecordMutationMode::UpdatedDeleted, - // edit_by: context.user_id, - // add_id: Vec::new(), - // delete_id: delete_control_list.clone(), - // update_id: update_control_ids.clone(), - // index, - // }; - // Subscriptor::publish(control_record_payload); - } + }) + } else { + None + }; - if shift_position { + let position_map_payload = if shift_position { //clear overlap interval let delete_position_frames = sqlx::query_as!( PositionFrameData, @@ -313,7 +283,7 @@ impl FrameMutation { overlap_start, overlap_end ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; let _ = sqlx::query!( r#" @@ -324,7 +294,7 @@ impl FrameMutation { overlap_start, overlap_end ) - .execute(mysql) + .execute(&mut *tx) .await?; //get source data @@ -341,7 +311,7 @@ impl FrameMutation { start, end ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; } else { update_position_frames = sqlx::query_as!( @@ -355,7 +325,7 @@ impl FrameMutation { start, end ) - .fetch_all(mysql) + .fetch_all(&mut *tx) .await?; } //update database and redis @@ -371,9 +341,10 @@ impl FrameMutation { position_frame.start + mv, position_frame.start ) - .execute(mysql) + .execute(&mut *tx) .await?; - let result = update_redis_position(mysql, redis_client, position_frame.id).await; + + let result = update_redis_position(&mut *tx, redis_client, position_frame.id).await; match result { Ok(_) => (), Err(msg) => return Err(GQLError::new(msg)), @@ -408,45 +379,31 @@ impl FrameMutation { }; update_position_frames.insert(id.to_string(), redis_position); } + //subscription - let position_map_payload = PositionMapPayload { + Some(PositionMapPayload { edit_by: context.user_id, frame: PosDataScalar(FrameData { create_frames: HashMap::new(), delete_frames: delete_position_ids, update_frames: update_position_frames, }), - }; - Subscriptor::publish(position_map_payload); + }) + } else { + None + }; - // NOTE: Not used in frontend - // let all_position_frames = sqlx::query_as!( - // PositionFrameData, - // r#" - // SELECT * FROM PositionFrame - // ORDER BY start ASC; - // "# - // ) - // .fetch_all(mysql) - // .await?; - // let mut index = -1; - // for (idx, frame) in all_position_frames.iter().enumerate() { - // if frame.id == update_position_ids.clone()[0] { - // index = idx as i32; - // } - // } - // let position_record_payload = PositionRecordPayload { - // mutation: PositionRecordMutationMode::UpdatedDeleted, - // edit_by: context.user_id, - // add_id: Vec::new(), - // delete_id: delete_position_list.clone(), - // update_id: update_position_ids.clone(), - // index, - // }; - // Subscriptor::publish(position_record_payload); - } + update_revision(&mut *tx).await?; + + // Commit the transaction + tx.commit().await?; - update_revision(mysql).await?; + if let Some(control_map_payload) = control_map_payload { + Subscriptor::publish(control_map_payload); + } + if let Some(position_map_payload) = position_map_payload { + Subscriptor::publish(position_map_payload); + } Ok(ShiftResponse { msg: "Shift success".to_string(), diff --git a/editor-server/src/utils/data.rs b/editor-server/src/utils/data.rs index 61a3915b1..4baae1a5f 100644 --- a/editor-server/src/utils/data.rs +++ b/editor-server/src/utils/data.rs @@ -10,22 +10,7 @@ use redis::AsyncCommands; use redis::Client; use sqlx::{MySql, Pool}; -pub async fn init_data(mysql: &Pool) -> Result<(), sqlx::Error> { - sqlx::query!( - r#" - DELETE FROM User; - "#, - ) - .execute(mysql) - .await?; - - Ok(()) -} - -pub async fn init_redis_control( - mysql_pool: &Pool, - redis_client: &Client, -) -> Result<(), String> { +pub async fn init_redis_control(mysql: &Pool, redis_client: &Client) -> Result<(), String> { let envs = global::envs::get(); let frames = sqlx::query!( @@ -38,7 +23,7 @@ pub async fn init_redis_control( ON EditingControlFrame.user_id = User.id; "#, ) - .fetch_all(mysql_pool) + .fetch_all(mysql) .await .map_err(|e| e.to_string())?; @@ -68,7 +53,7 @@ pub async fn init_redis_control( ORDER BY ControlData.frame_id, Dancer.id ASC, Part.id ASC; "#, ) - .fetch_all(mysql_pool) + .fetch_all(mysql) .await .map_err(|e| e.to_string())?; @@ -133,10 +118,7 @@ pub async fn init_redis_control( Ok(()) } -pub async fn init_redis_position( - mysql_pool: &Pool, - redis_client: &Client, -) -> Result<(), String> { +pub async fn init_redis_position(mysql: &Pool, redis_client: &Client) -> Result<(), String> { let envs = global::envs::get(); let frames = sqlx::query!( @@ -149,7 +131,7 @@ pub async fn init_redis_position( ON EditingPositionFrame.user_id = User.id; "#, ) - .fetch_all(mysql_pool) + .fetch_all(mysql) .await .map_err(|e| e.to_string())?; @@ -170,7 +152,7 @@ pub async fn init_redis_position( ORDER BY Dancer.id ASC; "#, ) - .fetch_all(mysql_pool) + .fetch_all(mysql) .await .map_err(|e| e.to_string())?; @@ -215,11 +197,14 @@ pub async fn init_redis_position( Ok(()) } -pub async fn update_redis_control( - mysql_pool: &Pool, +pub async fn update_redis_control( + executor: &mut E, redis_client: &Client, frame_id: i32, -) -> Result<(), String> { +) -> Result<(), String> +where + for<'e> &'e mut E: sqlx::Executor<'e, Database = sqlx::MySql>, +{ let envs = global::envs::get(); let frame = sqlx::query!( @@ -234,7 +219,7 @@ pub async fn update_redis_control( "#, frame_id ) - .fetch_optional(mysql_pool) + .fetch_optional(&mut *executor) .await .map_err(|e| e.to_string())?; @@ -266,7 +251,7 @@ pub async fn update_redis_control( "#, frame.id ) - .fetch_all(mysql_pool) + .fetch_all(&mut *executor) .await .map_err(|e| e.to_string())?; @@ -312,11 +297,14 @@ pub async fn update_redis_control( Ok(()) } -pub async fn update_redis_position( - mysql_pool: &Pool, +pub async fn update_redis_position( + executor: &mut E, redis_client: &Client, frame_id: i32, -) -> Result<(), String> { +) -> Result<(), String> +where + for<'e> &'e mut E: sqlx::Executor<'e, Database = sqlx::MySql>, +{ let envs = global::envs::get(); let frame = sqlx::query!( @@ -331,7 +319,7 @@ pub async fn update_redis_position( "#, frame_id ) - .fetch_optional(mysql_pool) + .fetch_optional(&mut *executor) .await .map_err(|e| e.to_string())?; @@ -357,7 +345,7 @@ pub async fn update_redis_position( "#, frame_id ) - .fetch_all(mysql_pool) + .fetch_all(&mut *executor) .await .map_err(|e| e.to_string())?; diff --git a/editor-server/src/utils/revision.rs b/editor-server/src/utils/revision.rs index 8834e7e33..a3b7fe87c 100644 --- a/editor-server/src/utils/revision.rs +++ b/editor-server/src/utils/revision.rs @@ -2,7 +2,11 @@ use crate::types::global::DBRevision; use uuid::Uuid; -pub async fn update_revision(mysql: &sqlx::MySqlPool) -> Result<(), sqlx::Error> { +pub async fn update_revision<'e, 'c, E>(executor: E) -> Result<(), sqlx::Error> +where + 'c: 'e, + E: sqlx::Executor<'c, Database = sqlx::MySql>, +{ let uuid = Uuid::new_v4(); sqlx::query!( @@ -12,13 +16,17 @@ pub async fn update_revision(mysql: &sqlx::MySqlPool) -> Result<(), sqlx::Error> "#, uuid.to_string(), ) - .execute(mysql) + .execute(executor) .await?; Ok(()) } -pub async fn get_revision(mysql: &sqlx::MySqlPool) -> Result { +pub async fn get_revision<'e, 'c, E>(executor: E) -> Result +where + 'c: 'e, + E: sqlx::Executor<'c, Database = sqlx::MySql>, +{ let revision = sqlx::query!( r#" SELECT uuid, time @@ -26,7 +34,7 @@ pub async fn get_revision(mysql: &sqlx::MySqlPool) -> Result