Skip to content

Commit

Permalink
use transaction in mutations
Browse files Browse the repository at this point in the history
  • Loading branch information
tom1484 committed Mar 19, 2024
1 parent ef18fb4 commit b1711e5
Show file tree
Hide file tree
Showing 13 changed files with 434 additions and 617 deletions.
176 changes: 92 additions & 84 deletions editor-server/src/graphql/mutations/color.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl ColorMutation {
let clients = context.clients;

let mysql = clients.mysql_pool();
let mut tx = mysql.begin().await?;

let _ = sqlx::query!(
r#"
Expand All @@ -76,22 +77,24 @@ impl ColorMutation {
data.color_code.set[2],
id
)
.execute(mysql)
.execute(&mut *tx)
.await?;

update_revision(&mut *tx).await?;

// commit the transaction
tx.commit().await?;

let color_payload = ColorPayload {
mutation: ColorMutationMode::Updated,
id,
color: Some(data.color.set.clone()),
color_code: Some(data.color_code.set.clone()),
edit_by: context.user_id,
// edit_by: 0,
};

Subscriptor::publish(color_payload);

update_revision(mysql).await?;

let color = Color {
id,
color: data.color.set,
Expand All @@ -106,6 +109,7 @@ impl ColorMutation {
let clients = context.clients;

let mysql = clients.mysql_pool();
let mut tx = mysql.begin().await?;

let id = sqlx::query!(
r#"
Expand All @@ -117,21 +121,11 @@ impl ColorMutation {
color.color_code.set[1],
color.color_code.set[2]
)
.execute(mysql)
.execute(&mut *tx)
.await?
.last_insert_id() as i32;

let color_payload = ColorPayload {
mutation: ColorMutationMode::Created,
id,
color: Some(color.color.clone()),
color_code: Some(color.color_code.set.clone()),
edit_by: context.user_id,
};

Subscriptor::publish(color_payload);

if color.auto_create_effect.unwrap_or(false) {
let led_payload = if color.auto_create_effect.unwrap_or(false) {
let model_parts = sqlx::query!(
r#"
SELECT
Expand All @@ -145,7 +139,7 @@ impl ColorMutation {
WHERE Part.type = 'LED';
"#
)
.fetch_all(mysql)
.fetch_all(&mut *tx)
.await?
.into_iter()
.map(|row| {
Expand All @@ -154,49 +148,43 @@ impl ColorMutation {
})
.collect_vec();

let create_effect_futures = model_parts.iter().map(|model_part| {
sqlx::query!(
r#"
INSERT INTO LEDEffect (name, model_id, part_id)
VALUES (?, ?, ?);
"#,
color.color.clone(),
model_part.0,
model_part.1
)
.execute(mysql)
});

let create_effect_results = futures::future::join_all(create_effect_futures)
.await
.into_iter()
.filter_map(|result| result.ok())
.collect_vec();

let mut create_states_futures = Vec::new();
create_effect_results
.iter()
.zip(&model_parts)
.for_each(|(result, model_part)| {
for pos in 0..model_part.2 {
create_states_futures.push(
sqlx::query!(
r#"
INSERT INTO LEDEffectState (effect_id, position, color_id, alpha)
VALUES (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE color_id = VALUES(color_id), alpha = VALUES(alpha);
"#,
result.last_insert_id() as i32,
pos,
id,
255
)
.execute(mysql),
)
}
});

futures::future::join_all(create_states_futures).await;
let mut create_effect_results = Vec::new();
for model_part in model_parts.iter() {
create_effect_results.push(
sqlx::query!(
r#"
INSERT INTO LEDEffect (name, model_id, part_id)
VALUES (?, ?, ?);
"#,
color.color.clone(),
model_part.0,
model_part.1
)
.execute(&mut *tx)
.await?,
);
}

let mut create_states_results = Vec::new();
for (result, model_part) in create_effect_results.iter().zip(&model_parts) {
for pos in 0..model_part.2 {
create_states_results.push(
sqlx::query!(
r#"
INSERT INTO LEDEffectState (effect_id, position, color_id, alpha)
VALUES (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE color_id = VALUES(color_id), alpha = VALUES(alpha);
"#,
result.last_insert_id() as i32,
pos,
id,
255
)
.execute(&mut *tx)
.await?,
)
}
}

let create_effects = create_effect_results
.iter()
Expand All @@ -222,16 +210,33 @@ impl ColorMutation {
})
.collect_vec();

let led_payload = LEDPayload {
Some(LEDPayload {
create_effects,
update_effects: Vec::new(),
delete_effects: Vec::new(),
};
})
} else {
None
};

update_revision(&mut *tx).await?;

// commit the transaction
tx.commit().await?;

if let Some(led_payload) = led_payload {
Subscriptor::publish(led_payload);
}

update_revision(mysql).await?;
let color_payload = ColorPayload {
mutation: ColorMutationMode::Created,
id,
color: Some(color.color.clone()),
color_code: Some(color.color_code.set.clone()),
edit_by: context.user_id,
};

Subscriptor::publish(color_payload);

let color = Color {
id,
Expand All @@ -248,6 +253,7 @@ impl ColorMutation {
let app_state = &context.clients;

let mysql = app_state.mysql_pool();
let mut tx = mysql.begin().await?;

let color = sqlx::query_as!(
ColorData,
Expand All @@ -257,7 +263,7 @@ impl ColorMutation {
"#,
id
)
.fetch_one(mysql)
.fetch_one(&mut *tx)
.await?;

let check_color = sqlx::query!(
Expand All @@ -268,7 +274,7 @@ impl ColorMutation {
"#,
id
)
.fetch_one(mysql)
.fetch_one(&mut *tx)
.await?
.count;

Expand All @@ -282,7 +288,7 @@ impl ColorMutation {
"#,
id
)
.fetch_one(mysql)
.fetch_one(&mut *tx)
.await?
.count;

Expand All @@ -301,19 +307,9 @@ impl ColorMutation {
"#,
id
)
.execute(mysql)
.execute(&mut *tx)
.await?;

let color_payload = ColorPayload {
mutation: ColorMutationMode::Deleted,
id,
color: None,
color_code: None,
edit_by: context.user_id,
};

Subscriptor::publish(color_payload);

let delete_effects = sqlx::query!(
r#"
SELECT
Expand All @@ -329,21 +325,35 @@ impl ColorMutation {
"#,
id
)
.fetch_all(mysql)
.fetch_all(&mut *tx)
.await?;

let delete_effect_futures = delete_effects.iter().map(|effect| {
for effect in delete_effects.iter() {
sqlx::query!(
r#"
DELETE FROM LEDEffect
WHERE id = ?;
"#,
effect.id
)
.execute(mysql)
});
.execute(&mut *tx)
.await?;
}

update_revision(&mut *tx).await?;

futures::future::join_all(delete_effect_futures).await;
// commit the transaction
tx.commit().await?;

let color_payload = ColorPayload {
mutation: ColorMutationMode::Deleted,
id,
color: None,
color_code: None,
edit_by: context.user_id,
};

Subscriptor::publish(color_payload);

let delete_effects = delete_effects
.iter()
Expand All @@ -365,8 +375,6 @@ impl ColorMutation {

Subscriptor::publish(led_payload);

update_revision(mysql).await?;

Ok(ColorResponse {
id,
msg: "Color deleted.".to_string(),
Expand Down
Loading

0 comments on commit b1711e5

Please sign in to comment.