Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor/fix(torii-core): correctly queue entity deletions #2428

Merged
merged 7 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/torii/core/src/processors/store_del_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ where
let entity_id = event.data[ENTITY_ID_INDEX];
let entity = model.schema;

db.delete_entity(entity_id, entity, event_id, block_timestamp).await?;
db.delete_entity(entity_id, selector, entity, event_id, block_timestamp).await?;

Ok(())
}
Expand Down
57 changes: 48 additions & 9 deletions crates/torii/core/src/query_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,18 @@ pub struct QueryQueue {
pub publish_queue: VecDeque<BrokerMessage>,
}

#[derive(Debug, Clone)]
pub struct DeleteEntityQuery {
pub entity_id: String,
pub event_id: String,
pub block_timestamp: String,
pub entity: Ty,
}

#[derive(Debug, Clone)]
pub enum QueryType {
SetEntity(Ty),
DeleteEntity(DeleteEntityQuery),
Other,
}

Expand All @@ -57,15 +66,6 @@ impl QueryQueue {
self.queue.push_back((statement.into(), arguments, query_type));
}

pub fn push_front<S: Into<String>>(
&mut self,
statement: S,
arguments: Vec<Argument>,
query_type: QueryType,
) {
self.queue.push_front((statement.into(), arguments, query_type));
}

pub fn push_publish(&mut self, value: BrokerMessage) {
self.publish_queue.push_back(value);
}
Expand Down Expand Up @@ -97,6 +97,45 @@ impl QueryQueue {
let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.push_publish(broker_message);
}
QueryType::DeleteEntity(entity) => {
let delete_model = query.execute(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
if delete_model.rows_affected() == 0 {
continue;
}

let row = sqlx::query(
"UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, \
event_id=? WHERE id = ? RETURNING *",
)
.bind(entity.block_timestamp)
.bind(entity.event_id)
.bind(entity.entity_id)
.fetch_one(&mut *tx)
.await?;
let mut entity_updated = EntityUpdated::from_row(&row)?;
entity_updated.updated_model = Some(entity.entity);

let count = sqlx::query_scalar::<_, i64>(
"SELECT count(*) FROM entity_model WHERE entity_id = ?",
)
.bind(entity_updated.id.clone())
.fetch_one(&mut *tx)
.await?;
Comment on lines +120 to +125
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To have the code organized around that we should move (in the future, not this PR) to some functions that could make this less verbose and easier to re-use, especially counting or generic stuff like so.


// Delete entity if all of its models are deleted
if count == 0 {
sqlx::query("DELETE FROM entities WHERE id = ?")
.bind(entity_updated.id.clone())
.execute(&mut *tx)
.await?;
entity_updated.deleted = true;
}

let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.push_publish(broker_message);
}
QueryType::Other => {
query.execute(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
Expand Down
67 changes: 18 additions & 49 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use chrono::Utc;
use dojo_types::primitive::Primitive;
use dojo_types::schema::{EnumOption, Member, Ty};
use dojo_world::contracts::abi::model::Layout;
use dojo_world::contracts::naming::{compute_selector_from_names, compute_selector_from_tag};
use dojo_world::contracts::naming::compute_selector_from_names;
use dojo_world::metadata::WorldMetadata;
use sqlx::pool::PoolConnection;
use sqlx::{Pool, Sqlite};
Expand All @@ -16,10 +16,9 @@ use starknet_crypto::poseidon_hash_many;
use tracing::debug;

use crate::cache::{Model, ModelCache};
use crate::query_queue::{Argument, BrokerMessage, QueryQueue, QueryType};
use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType};
use crate::types::{
Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated,
Model as ModelRegistered,
Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered,
};
use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp};

Expand Down Expand Up @@ -281,6 +280,7 @@ impl Sql {
pub async fn delete_entity(
&mut self,
entity_id: Felt,
model_id: Felt,
entity: Ty,
event_id: &str,
block_timestamp: u64,
Expand All @@ -289,49 +289,18 @@ impl Sql {
let path = vec![entity.name()];
// delete entity models data
self.build_delete_entity_queries_recursive(path, &entity_id, &entity);
self.execute().await?;

let deleted_entity_model =
sqlx::query("DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?")
.bind(&entity_id)
.bind(format!("{:#x}", compute_selector_from_tag(&entity.name())))
.execute(&self.pool)
.await?;
if deleted_entity_model.rows_affected() == 0 {
// fail silently. we have no entity-model relation to delete.
// this can happen if a entity model that doesnt exist
// got deleted
return Ok(());
}

let mut update_entity = sqlx::query_as::<_, EntityUpdated>(
"UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, event_id=? WHERE id \
= ? RETURNING *",
)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.bind(event_id)
.bind(&entity_id)
.fetch_one(&self.pool)
.await?;
update_entity.updated_model = Some(entity.clone());

let models_count =
sqlx::query_scalar::<_, u32>("SELECT count(*) FROM entity_model WHERE entity_id = ?")
.bind(&entity_id)
.fetch_one(&self.pool)
.await?;

if models_count == 0 {
// delete entity
sqlx::query("DELETE FROM entities WHERE id = ?")
.bind(&entity_id)
.execute(&self.pool)
.await?;

update_entity.deleted = true;
}
self.query_queue.enqueue(
"DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?",
vec![Argument::String(entity_id.clone()), Argument::String(format!("{:#x}", model_id))],
QueryType::DeleteEntity(DeleteEntityQuery {
entity_id: entity_id.clone(),
event_id: event_id.to_string(),
block_timestamp: utc_dt_string_from_timestamp(block_timestamp),
entity: entity.clone(),
}),
);

self.query_queue.push_publish(BrokerMessage::EntityUpdated(update_entity));
Ok(())
}

Expand Down Expand Up @@ -797,7 +766,7 @@ impl Sql {
Ty::Struct(s) => {
let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue.push_front(
self.query_queue.enqueue(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
Expand All @@ -818,7 +787,7 @@ impl Sql {

let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue.push_front(
self.query_queue.enqueue(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
Expand All @@ -839,7 +808,7 @@ impl Sql {
Ty::Array(array) => {
let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue.push_front(
self.query_queue.enqueue(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
Expand All @@ -854,7 +823,7 @@ impl Sql {
Ty::Tuple(t) => {
let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue.push_front(
self.query_queue.enqueue(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
Expand Down
Loading