From 1265022ca3028654ad8bc810edd7be46dc932471 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 13:50:30 +0200 Subject: [PATCH 01/28] Prepare pending_tasks method --- aquadoggo/src/materializer/service.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index befaef69f..5112c2b06 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -13,6 +13,12 @@ use crate::materializer::TaskInput; const CHANNEL_CAPACITY: usize = 1024; +/// The materializer service waits for incoming new operations to transform them into actual useful +/// application and system data, like document views or schemas. +/// +/// Internally the service uses a task queue which gives us the right architecture to deal with +/// operations coming in random order and avoid race-conditions which would occure otherwise when +/// working on the same data in separate threads. pub async fn materializer_service( context: Context, shutdown: Shutdown, @@ -33,6 +39,11 @@ pub async fn materializer_service( // Subscribe to communication bus let mut rx = tx.subscribe(); + // Reschedule tasks from last time which did not complete + pending_tasks().await?.iter().for_each(|task| { + factory.queue(task.to_owned()); + }); + // Listen to incoming new entries and operations and move them into task queue let handle = task::spawn(async move { while let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await { @@ -67,3 +78,12 @@ pub async fn materializer_service( Ok(()) } + +/// Retreives a list of pending tasks from the database and returns them as inputs for the task +/// queue. +/// +/// This list represents all tasks which were not completed during the last runtime, as the node +/// exited before. +async fn pending_tasks() -> Result>> { + Ok(vec![]) +} From ed25e109fab690d4d76e4a5aa5333e7931e435ec Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 14:05:15 +0200 Subject: [PATCH 02/28] Add tasks table --- .../migrations/20220617115933_create-tasks.sql | 7 +++++++ aquadoggo/src/db/models/mod.rs | 2 ++ aquadoggo/src/db/models/task.rs | 17 +++++++++++++++++ aquadoggo/src/materializer/service.rs | 9 ++++++--- 4 files changed, 32 insertions(+), 3 deletions(-) create mode 100644 aquadoggo/migrations/20220617115933_create-tasks.sql create mode 100644 aquadoggo/src/db/models/task.rs diff --git a/aquadoggo/migrations/20220617115933_create-tasks.sql b/aquadoggo/migrations/20220617115933_create-tasks.sql new file mode 100644 index 000000000..296f74de8 --- /dev/null +++ b/aquadoggo/migrations/20220617115933_create-tasks.sql @@ -0,0 +1,7 @@ +-- SPDX-License-Identifier: AGPL-3.0-or-later + +CREATE TABLE IF NOT EXISTS tasks ( + name TEXT NOT NULL, + operation_id TEXT NOT NULL, + PRIMARY KEY (name, operation_id) +); diff --git a/aquadoggo/src/db/models/mod.rs b/aquadoggo/src/db/models/mod.rs index 469eb1311..5b59c46fb 100644 --- a/aquadoggo/src/db/models/mod.rs +++ b/aquadoggo/src/db/models/mod.rs @@ -4,7 +4,9 @@ pub mod document; mod entry; mod log; mod operation; +mod task; pub use self::log::LogRow; pub use entry::EntryRow; pub use operation::{OperationFieldsJoinedRow, OperationRow}; +pub use task::TaskRow; diff --git a/aquadoggo/src/db/models/task.rs b/aquadoggo/src/db/models/task.rs new file mode 100644 index 000000000..368ba3181 --- /dev/null +++ b/aquadoggo/src/db/models/task.rs @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use serde::Serialize; +use sqlx::FromRow; + +/// Representation of a row from the `tasks` table as stored in the database. +/// +/// This table holds all "pending" tasks of the materialization service worker. +#[derive(FromRow, Debug, Serialize, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct TaskRow { + /// Name of the task worker. + pub name: String, + + /// Related `OperationId` of the pending task. + pub operation_id: String, +} diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index 5112c2b06..6382a9396 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use anyhow::Result; +use log::debug; use p2panda_rs::storage_provider::traits::OperationStore; use tokio::task; @@ -14,10 +15,10 @@ use crate::materializer::TaskInput; const CHANNEL_CAPACITY: usize = 1024; /// The materializer service waits for incoming new operations to transform them into actual useful -/// application and system data, like document views or schemas. +/// application- and system data, like document views or schemas. /// /// Internally the service uses a task queue which gives us the right architecture to deal with -/// operations coming in random order and avoid race-conditions which would occure otherwise when +/// operations coming in random order and avoid race-conditions which would occur otherwise when /// working on the same data in separate threads. pub async fn materializer_service( context: Context, @@ -40,7 +41,9 @@ pub async fn materializer_service( let mut rx = tx.subscribe(); // Reschedule tasks from last time which did not complete - pending_tasks().await?.iter().for_each(|task| { + let tasks = pending_tasks().await?; + debug!("Dispatch {} pending tasks from last runtime", tasks.len()); + tasks.iter().for_each(|task| { factory.queue(task.to_owned()); }); From 2dd9e937afad523dda88a98f7b9cc122a8393958 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 15:21:38 +0200 Subject: [PATCH 03/28] Add channel to inform about task status changes --- .../20220617115933_create-tasks.sql | 7 ++- aquadoggo/src/db/models/task.rs | 7 ++- aquadoggo/src/db/provider.rs | 6 +- aquadoggo/src/db/stores/mod.rs | 1 + aquadoggo/src/db/stores/task.rs | 26 ++++++++ aquadoggo/src/materializer/mod.rs | 1 + aquadoggo/src/materializer/service.rs | 59 ++++++++++++++----- aquadoggo/src/materializer/worker.rs | 55 ++++++++++++++++- 8 files changed, 138 insertions(+), 24 deletions(-) create mode 100644 aquadoggo/src/db/stores/task.rs diff --git a/aquadoggo/migrations/20220617115933_create-tasks.sql b/aquadoggo/migrations/20220617115933_create-tasks.sql index 296f74de8..257184bc5 100644 --- a/aquadoggo/migrations/20220617115933_create-tasks.sql +++ b/aquadoggo/migrations/20220617115933_create-tasks.sql @@ -1,7 +1,10 @@ -- SPDX-License-Identifier: AGPL-3.0-or-later +-- NOTE: On this level we can not assure eventual task duplicates, this is why we do +-- not have any SQL UNIQUE constraints. + CREATE TABLE IF NOT EXISTS tasks ( name TEXT NOT NULL, - operation_id TEXT NOT NULL, - PRIMARY KEY (name, operation_id) + document_id TEXT NULL, + document_view_id TEXT NULL, ); diff --git a/aquadoggo/src/db/models/task.rs b/aquadoggo/src/db/models/task.rs index 368ba3181..990bdb1ff 100644 --- a/aquadoggo/src/db/models/task.rs +++ b/aquadoggo/src/db/models/task.rs @@ -12,6 +12,9 @@ pub struct TaskRow { /// Name of the task worker. pub name: String, - /// Related `OperationId` of the pending task. - pub operation_id: String, + /// `DocumentId` of the task input. + pub document_id: String, + + /// `DocumentViewId` of the task input. + pub document_view_id: String, } diff --git a/aquadoggo/src/db/provider.rs b/aquadoggo/src/db/provider.rs index 6c75ffd58..5d5c7f81c 100644 --- a/aquadoggo/src/db/provider.rs +++ b/aquadoggo/src/db/provider.rs @@ -1,14 +1,12 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use async_trait::async_trait; -use sqlx::query_scalar; - use p2panda_rs::document::DocumentId; use p2panda_rs::hash::Hash; use p2panda_rs::storage_provider::traits::StorageProvider; +use sqlx::query_scalar; -use crate::db::stores::StorageEntry; -use crate::db::stores::StorageLog; +use crate::db::stores::{StorageEntry, StorageLog}; use crate::db::Pool; use crate::errors::StorageProviderResult; use crate::graphql::client::{ diff --git a/aquadoggo/src/db/stores/mod.rs b/aquadoggo/src/db/stores/mod.rs index 3c75de8a9..e459fa229 100644 --- a/aquadoggo/src/db/stores/mod.rs +++ b/aquadoggo/src/db/stores/mod.rs @@ -5,6 +5,7 @@ mod entry; mod log; mod operation; mod schema; +mod task; #[cfg(test)] pub mod test_utils; diff --git a/aquadoggo/src/db/stores/task.rs b/aquadoggo/src/db/stores/task.rs new file mode 100644 index 000000000..c87063d7c --- /dev/null +++ b/aquadoggo/src/db/stores/task.rs @@ -0,0 +1,26 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use anyhow::Result; +use p2panda_rs::document::{DocumentId, DocumentViewId}; + +use crate::db::models::TaskRow; +use crate::db::provider::SqlStorage; +use crate::materializer::{Task, TaskInput}; + +/// Methods to interact with the `tasks` table in the database. +impl SqlStorage { + /// Inserts a "pending" task into the database. + pub async fn insert_task(&self, task: &Task) -> Result<()> { + Ok(()) + } + + /// Removes a "pending" task from the database. + pub async fn remove_task(&self, task: &Task) -> Result<()> { + Ok(()) + } + + /// Returns "pending" tasks of the materialization service worker. + pub async fn get_tasks(&self) -> Result>> { + Ok(vec![]) + } +} diff --git a/aquadoggo/src/materializer/mod.rs b/aquadoggo/src/materializer/mod.rs index 36c818868..40a646cfc 100644 --- a/aquadoggo/src/materializer/mod.rs +++ b/aquadoggo/src/materializer/mod.rs @@ -7,3 +7,4 @@ mod worker; pub use input::TaskInput; pub use service::materializer_service; +pub use worker::Task; diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index 6382a9396..34c55ae31 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -9,7 +9,7 @@ use crate::bus::{ServiceMessage, ServiceSender}; use crate::context::Context; use crate::manager::Shutdown; use crate::materializer::tasks::{dependency_task, reduce_task, schema_task}; -use crate::materializer::worker::{Factory, Task}; +use crate::materializer::worker::{Factory, Task, TaskStatus}; use crate::materializer::TaskInput; const CHANNEL_CAPACITY: usize = 1024; @@ -37,16 +37,51 @@ pub async fn materializer_service( // Get a listener for error signal from factory let on_error = factory.on_error(); - // Subscribe to communication bus - let mut rx = tx.subscribe(); + // Subscribe to status changes of tasks + let mut on_update = factory.on_update(); + let store = context.store.clone(); + + let status_handle = task::spawn(async move { + loop { + match on_update.recv().await { + Ok(TaskStatus::Pending(task)) => { + store + .insert_task(&task) + .await + // @TODO + .expect("Failed inserting task"); + } + Ok(TaskStatus::Completed(task)) => { + store + .remove_task(&task) + .await + // @TODO + .expect("Failed removing task"); + } + Err(_) => { + // @TODO + panic!("Failed") + } + } + } + }); // Reschedule tasks from last time which did not complete - let tasks = pending_tasks().await?; + let tasks = context + .store + .get_tasks() + .await + .expect("Failed retreiving pending tasks from database"); + debug!("Dispatch {} pending tasks from last runtime", tasks.len()); + tasks.iter().for_each(|task| { factory.queue(task.to_owned()); }); + // Subscribe to communication bus + let mut rx = tx.subscribe(); + // Listen to incoming new entries and operations and move them into task queue let handle = task::spawn(async move { while let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await { @@ -55,8 +90,10 @@ pub async fn materializer_service( .store .get_document_by_operation_id(&operation_id) .await - .unwrap() - { + .expect(&format!( + "Failed database query when retreiving document for operation_id {}", + operation_id + )) { Some(document_id) => { // Dispatch "reduce" task which will materialize the regarding document factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None))); @@ -75,18 +112,10 @@ pub async fn materializer_service( // Wait until we received the application shutdown signal or handle closed tokio::select! { _ = handle => (), + _ = status_handle => (), _ = shutdown => (), _ = on_error => (), } Ok(()) } - -/// Retreives a list of pending tasks from the database and returns them as inputs for the task -/// queue. -/// -/// This list represents all tasks which were not completed during the last runtime, as the node -/// exited before. -async fn pending_tasks() -> Result>> { - Ok(vec![]) -} diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index 871790de6..9eb077938 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -85,7 +85,7 @@ use std::sync::{Arc, Mutex}; use deadqueue::unlimited::Queue; use log::{error, info}; use tokio::sync::broadcast::error::RecvError; -use tokio::sync::broadcast::{channel, Sender}; +use tokio::sync::broadcast::{channel, Receiver, Sender}; use tokio::task; use triggered::{Listener, Trigger}; @@ -117,6 +117,16 @@ pub enum TaskError { Failure, } +/// Enum representing status of a task. +#[derive(Debug, Clone)] +pub enum TaskStatus { + /// Task just got scheduled and waiting to be processed. + Pending(Task), + + /// Task completed successfully. + Completed(Task), +} + /// Workers are identified by simple string values. pub type WorkerName = String; @@ -235,6 +245,9 @@ where /// Broadcast channel to inform worker pools about new tasks. tx: Sender>, + /// Broadcast channel to inform callbacks about pending or completed tasks. + tx_status: Sender>, + /// Sender of error signal. error_signal: Trigger, @@ -259,12 +272,14 @@ where /// incoming tasks. pub fn new(context: D, capacity: usize) -> Self { let (tx, _) = channel(capacity); + let (tx_status, _) = channel(capacity); let (error_signal, error_handle) = triggered::trigger(); Self { context, managers: HashMap::new(), tx, + tx_status, error_signal, error_handle, } @@ -324,6 +339,11 @@ where self.error_handle.clone() } + /// Subscribe to status changes of tasks. + pub fn on_update(&self) -> Receiver> { + self.tx_status.subscribe() + } + /// Spawns a task which listens to broadcast channel for incoming new tasks which might be /// added to the worker queue. fn spawn_dispatcher(&self, name: &str) { @@ -333,6 +353,9 @@ where // Subscribe to the broadcast channel let mut rx = self.tx.subscribe(); + // Create handle to send task status updates + let tx_status = self.tx_status.clone(); + // Initialise a new counter to provide unique task ids let counter = AtomicU64::new(0); @@ -345,6 +368,15 @@ where let error_signal = self.error_signal.clone(); task::spawn(async move { + // Inform status subscribers that we've just scheduled a new task + let on_pending = |task: Task| match tx_status.send(TaskStatus::Pending(task)) { + Err(err) => { + error!("Error while sending task status: {}", err); + error_signal.trigger(); + } + _ => (), + }; + loop { match rx.recv().await { // A new task got announced in the broadcast channel! @@ -359,6 +391,9 @@ where if index.contains(&task.1) { continue; // Task already exists } else { + // Trigger status update + on_pending(task.clone()); + // Generate a unique id for this new task and add it to queue let next_id = counter.fetch_add(1, Ordering::Relaxed); queue.push(QueueItem::new(next_id, task.1.clone())); @@ -403,11 +438,26 @@ where let queue = manager.queue.clone(); let input_index = manager.input_index.clone(); let tx = self.tx.clone(); + let name = name.to_string(); // Create handle for error signal let error_signal = self.error_signal.clone(); + // Create handle to send task status updates + let tx_status = self.tx_status.clone(); + task::spawn(async move { + // Inform status subscribers that we just completed a task + let on_complete = |input: IN| match tx_status + .send(TaskStatus::Completed(Task::new(&name, input))) + { + Err(err) => { + error!("Error while sending task status: {}", err); + error_signal.trigger(); + } + _ => (), + }; + loop { // Wait until there is a new task arriving in the queue let item = queue.pop().await; @@ -429,6 +479,9 @@ where // .. check the task result .. match result { Ok(Some(list)) => { + // Trigger status update + on_complete(item.input()); + // Tasks succeeded and dispatches new, subsequent tasks for task in list { if let Err(err) = tx.send(task) { From cf772bab5aeb58c62925a6c8f8ce849fc62d666e Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 15:22:22 +0200 Subject: [PATCH 04/28] Fix SQL syntax error --- aquadoggo/migrations/20220617115933_create-tasks.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/migrations/20220617115933_create-tasks.sql b/aquadoggo/migrations/20220617115933_create-tasks.sql index 257184bc5..20768ed1c 100644 --- a/aquadoggo/migrations/20220617115933_create-tasks.sql +++ b/aquadoggo/migrations/20220617115933_create-tasks.sql @@ -6,5 +6,5 @@ CREATE TABLE IF NOT EXISTS tasks ( name TEXT NOT NULL, document_id TEXT NULL, - document_view_id TEXT NULL, + document_view_id TEXT NULL ); From 511c722ab5a247df8082b6e0cb1faf7e346b040a Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 15:26:12 +0200 Subject: [PATCH 05/28] Nicer error messages --- aquadoggo/src/materializer/service.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index 34c55ae31..c3e4e0106 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -41,6 +41,8 @@ pub async fn materializer_service( let mut on_update = factory.on_update(); let store = context.store.clone(); + // Keep track of status changes and persist it in the database. This allows us to pick up + // uncompleted tasks next time we come back here. let status_handle = task::spawn(async move { loop { match on_update.recv().await { @@ -48,19 +50,16 @@ pub async fn materializer_service( store .insert_task(&task) .await - // @TODO - .expect("Failed inserting task"); + .expect("Failed inserting pending task into database"); } Ok(TaskStatus::Completed(task)) => { store .remove_task(&task) .await - // @TODO - .expect("Failed removing task"); + .expect("Failed removing completed task from database"); } - Err(_) => { - // @TODO - panic!("Failed") + Err(err) => { + panic!("Failed receiving task status updates: {}", err) } } } @@ -117,5 +116,8 @@ pub async fn materializer_service( _ = on_error => (), } + // @TODO: Wait until all pending tasks have been completed. Related issue: + // https://github.com/p2panda/aquadoggo/issues/164 + Ok(()) } From e61a1c60dac9e8e33c135ce293706b13704e9d0c Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 15:27:27 +0200 Subject: [PATCH 06/28] Move TODO comment into right match arm --- aquadoggo/src/materializer/service.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index c3e4e0106..e4ce7f452 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -112,12 +112,12 @@ pub async fn materializer_service( tokio::select! { _ = handle => (), _ = status_handle => (), - _ = shutdown => (), + _ = shutdown => { + // @TODO: Wait until all pending tasks have been completed during graceful shutdown. + // Related issue: https://github.com/p2panda/aquadoggo/issues/164 + }, _ = on_error => (), } - // @TODO: Wait until all pending tasks have been completed. Related issue: - // https://github.com/p2panda/aquadoggo/issues/164 - Ok(()) } From 5e02d4f18b1dba50aa0aa760e47461d6ea05bd07 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 15:35:29 +0200 Subject: [PATCH 07/28] Adding some more docstrings --- aquadoggo/src/materializer/input.rs | 10 ++++++++++ aquadoggo/src/materializer/service.rs | 11 ++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/aquadoggo/src/materializer/input.rs b/aquadoggo/src/materializer/input.rs index 036ac33d4..78eca34af 100644 --- a/aquadoggo/src/materializer/input.rs +++ b/aquadoggo/src/materializer/input.rs @@ -2,13 +2,23 @@ use p2panda_rs::document::{DocumentId, DocumentViewId}; +/// Input of every task worker containing all information we need to process. +/// +/// The workers are designed such that they EITHER await a `DocumentId` OR a `DocumentViewId`. +/// Setting both values `None` or both values `Some` will be rejected. #[derive(Clone, Eq, PartialEq, Debug, Hash)] pub struct TaskInput { + /// Specifying a `DocumentId`, indicating that we're interested in processing the "latest" + /// state of that document. pub document_id: Option, + + /// Specifying a `DocumentViewId`, indicating that we're interested in processing the state of + /// that document view at this point. pub document_view_id: Option, } impl TaskInput { + /// Returns a new instance of `TaskInput`. pub fn new(document_id: Option, document_view_id: Option) -> Self { Self { document_id, diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index e4ce7f452..78700c183 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -12,6 +12,10 @@ use crate::materializer::tasks::{dependency_task, reduce_task, schema_task}; use crate::materializer::worker::{Factory, Task, TaskStatus}; use crate::materializer::TaskInput; +/// Capacity of the internal broadcast channels used inside the worker factory. +/// +/// This gives an upper bound to maximum status messages and incoming tasks being moved into worker +/// queues the channels can handle at once. const CHANNEL_CAPACITY: usize = 1024; /// The materializer service waits for incoming new operations to transform them into actual useful @@ -42,11 +46,16 @@ pub async fn materializer_service( let store = context.store.clone(); // Keep track of status changes and persist it in the database. This allows us to pick up - // uncompleted tasks next time we come back here. + // uncompleted tasks next time we start the node. let status_handle = task::spawn(async move { loop { match on_update.recv().await { Ok(TaskStatus::Pending(task)) => { + debug!( + "Scheduled new task for worker {} with input {}", + task.0, task.1 + ); + store .insert_task(&task) .await From 4270a36f91f11de681016212dc83ba6157d29347 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 15:38:06 +0200 Subject: [PATCH 08/28] Remove debug statement --- aquadoggo/src/materializer/service.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index 78700c183..ae72f0266 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -51,11 +51,6 @@ pub async fn materializer_service( loop { match on_update.recv().await { Ok(TaskStatus::Pending(task)) => { - debug!( - "Scheduled new task for worker {} with input {}", - task.0, task.1 - ); - store .insert_task(&task) .await From cc36001eca676e1fec995e3531f7672753a5eabd Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 16:23:02 +0200 Subject: [PATCH 09/28] Implement SQL queries to deal with tasks in database --- aquadoggo/src/db/errors.rs | 17 +++-- aquadoggo/src/db/models/task.rs | 4 +- aquadoggo/src/db/stores/task.rs | 104 +++++++++++++++++++++++++-- aquadoggo/src/materializer/worker.rs | 10 +++ 4 files changed, 124 insertions(+), 11 deletions(-) diff --git a/aquadoggo/src/db/errors.rs b/aquadoggo/src/db/errors.rs index 2e43ba4c8..41dc602ed 100644 --- a/aquadoggo/src/db/errors.rs +++ b/aquadoggo/src/db/errors.rs @@ -1,9 +1,18 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use p2panda_rs::{ - document::{DocumentId, DocumentViewId}, - schema::{system::SystemSchemaError, SchemaError, SchemaIdError}, -}; +use p2panda_rs::document::{DocumentId, DocumentViewId}; +use p2panda_rs::schema::system::SystemSchemaError; +use p2panda_rs::schema::{SchemaError, SchemaIdError}; + +/// `SQLStorage` errors. +#[derive(thiserror::Error, Debug)] +pub enum SqlStorageError { + #[error("SQL query failed: {0}")] + TransactionFailed(String), + + #[error("Insertion of row into table {0} did not show any effect")] + InsertionFailed(String), +} /// `DocumentStore` errors. #[derive(thiserror::Error, Debug)] diff --git a/aquadoggo/src/db/models/task.rs b/aquadoggo/src/db/models/task.rs index 990bdb1ff..4bc4d544d 100644 --- a/aquadoggo/src/db/models/task.rs +++ b/aquadoggo/src/db/models/task.rs @@ -13,8 +13,8 @@ pub struct TaskRow { pub name: String, /// `DocumentId` of the task input. - pub document_id: String, + pub document_id: Option, /// `DocumentViewId` of the task input. - pub document_view_id: String, + pub document_view_id: Option, } diff --git a/aquadoggo/src/db/stores/task.rs b/aquadoggo/src/db/stores/task.rs index c87063d7c..455792880 100644 --- a/aquadoggo/src/db/stores/task.rs +++ b/aquadoggo/src/db/stores/task.rs @@ -2,7 +2,9 @@ use anyhow::Result; use p2panda_rs::document::{DocumentId, DocumentViewId}; +use sqlx::{query, query_as}; +use crate::db::errors::SqlStorageError; use crate::db::models::TaskRow; use crate::db::provider::SqlStorage; use crate::materializer::{Task, TaskInput}; @@ -10,17 +12,109 @@ use crate::materializer::{Task, TaskInput}; /// Methods to interact with the `tasks` table in the database. impl SqlStorage { /// Inserts a "pending" task into the database. - pub async fn insert_task(&self, task: &Task) -> Result<()> { - Ok(()) + pub async fn insert_task(&self, task: &Task) -> Result<(), SqlStorageError> { + // Convert task input to correct database types + let task_input = task.input(); + let document_id = task_input.document_id.as_ref().map(|id| id.to_string()); + let document_view_id = task_input + .document_view_id + .as_ref() + .map(|view_id| view_id.to_string()); + + // Insert task into database + let result = query( + " + INSERT INTO + tasks ( + name, + document_id, + document_view_id + ) + VALUES + ($1, $2, $3) + ", + ) + .bind(task.worker_name()) + .bind(document_id) + .bind(document_view_id) + .execute(&self.pool) + .await + .map_err(|err| SqlStorageError::TransactionFailed(err.to_string()))?; + + if result.rows_affected() != 1 { + Err(SqlStorageError::InsertionFailed("tasks".into())) + } else { + Ok(()) + } } /// Removes a "pending" task from the database. - pub async fn remove_task(&self, task: &Task) -> Result<()> { + pub async fn remove_task(&self, task: &Task) -> Result<(), SqlStorageError> { + // Convert task input to correct database types + let task_input = task.input(); + let document_id = task_input.document_id.as_ref().map(|id| id.to_string()); + let document_view_id = task_input + .document_view_id + .as_ref() + .map(|view_id| view_id.to_string()); + + // Remove task from database + query( + " + DELETE + FROM + tasks + WHERE + name = $1 + AND document_id = $2 + AND document_view_id = $3 + ", + ) + .bind(task.worker_name()) + .bind(document_id) + .bind(document_view_id) + .execute(&self.pool) + .await + .map_err(|err| SqlStorageError::TransactionFailed(err.to_string()))?; + Ok(()) } /// Returns "pending" tasks of the materialization service worker. - pub async fn get_tasks(&self) -> Result>> { - Ok(vec![]) + pub async fn get_tasks(&self) -> Result>, SqlStorageError> { + let task_rows = query_as::<_, TaskRow>( + " + SELECT + name + document_id, + document_view_id + FROM + tasks + ", + ) + .fetch_all(&self.pool) + .await + .map_err(|err| SqlStorageError::TransactionFailed(err.to_string()))?; + + // Convert database rows into correct p2panda types + let mut tasks: Vec> = Vec::new(); + for task in task_rows { + let document_id: Option = task + .document_id + .map(|id| id.parse().expect("Invalid document id stored in database")); + + let document_view_id: Option = task.document_view_id.map(|view_id| { + view_id + .parse() + .expect("Invalid document view id stored in database") + }); + + tasks.push(Task::new( + &task.name, + TaskInput::new(document_id, document_view_id), + )); + } + + Ok(tasks) } } diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index 9eb077938..31d7f8485 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -99,6 +99,16 @@ impl Task { pub fn new(worker_name: &str, input: IN) -> Self { Self(worker_name.into(), input) } + + /// Returns worker name of task; + pub fn worker_name(&self) -> &WorkerName { + &self.0 + } + + /// Returns task input; + pub fn input(&self) -> &IN { + &self.1 + } } /// Return value of every processed task indicating if it succeeded or failed. From 9c04573d279e57e94e27b0ad1d151afc84a5f2b9 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 16:25:58 +0200 Subject: [PATCH 10/28] Clippy happy, coder happy --- aquadoggo/src/materializer/service.rs | 10 ++++++---- aquadoggo/src/materializer/worker.rs | 13 +++++-------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index ae72f0266..10a2a9e39 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -93,10 +93,12 @@ pub async fn materializer_service( .store .get_document_by_operation_id(&operation_id) .await - .expect(&format!( - "Failed database query when retreiving document for operation_id {}", - operation_id - )) { + .unwrap_or_else(|_| { + panic!( + "Failed database query when retreiving document for operation_id {}", + operation_id + ) + }) { Some(document_id) => { // Dispatch "reduce" task which will materialize the regarding document factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None))); diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index 31d7f8485..b1bb0f8aa 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -379,12 +379,11 @@ where task::spawn(async move { // Inform status subscribers that we've just scheduled a new task - let on_pending = |task: Task| match tx_status.send(TaskStatus::Pending(task)) { - Err(err) => { + let on_pending = |task: Task| { + if let Err(err) = tx_status.send(TaskStatus::Pending(task)) { error!("Error while sending task status: {}", err); error_signal.trigger(); } - _ => (), }; loop { @@ -458,14 +457,12 @@ where task::spawn(async move { // Inform status subscribers that we just completed a task - let on_complete = |input: IN| match tx_status - .send(TaskStatus::Completed(Task::new(&name, input))) - { - Err(err) => { + let on_complete = |input: IN| { + if let Err(err) = tx_status.send(TaskStatus::Completed(Task::new(&name, input))) + { error!("Error while sending task status: {}", err); error_signal.trigger(); } - _ => (), }; loop { From 1114cb1032cb57cd443d6b254e555f149370ee74 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 16:28:05 +0200 Subject: [PATCH 11/28] Minor change --- aquadoggo/src/materializer/worker.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index b1bb0f8aa..b3a2308f8 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -458,8 +458,8 @@ where task::spawn(async move { // Inform status subscribers that we just completed a task let on_complete = |input: IN| { - if let Err(err) = tx_status.send(TaskStatus::Completed(Task::new(&name, input))) - { + let status = TaskStatus::Completed(Task::new(&name, input)); + if let Err(err) = tx_status.send(status) { error!("Error while sending task status: {}", err); error_signal.trigger(); } From 72a454f8e025a5ac5cf3d6f3580e792ab7c350d3 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 17:10:03 +0200 Subject: [PATCH 12/28] Add tests for store, fix bugs --- aquadoggo/src/db/errors.rs | 7 ++- aquadoggo/src/db/stores/task.rs | 84 ++++++++++++++++++++------- aquadoggo/src/db/stores/test_utils.rs | 4 +- aquadoggo/src/db/utils.rs | 1 - aquadoggo/src/materializer/worker.rs | 2 +- 5 files changed, 72 insertions(+), 26 deletions(-) diff --git a/aquadoggo/src/db/errors.rs b/aquadoggo/src/db/errors.rs index 41dc602ed..124d8a221 100644 --- a/aquadoggo/src/db/errors.rs +++ b/aquadoggo/src/db/errors.rs @@ -8,10 +8,13 @@ use p2panda_rs::schema::{SchemaError, SchemaIdError}; #[derive(thiserror::Error, Debug)] pub enum SqlStorageError { #[error("SQL query failed: {0}")] - TransactionFailed(String), + Transaction(String), #[error("Insertion of row into table {0} did not show any effect")] - InsertionFailed(String), + Insertion(String), + + #[error("Deletion of row from table {0} did not show any effect")] + Deletion(String), } /// `DocumentStore` errors. diff --git a/aquadoggo/src/db/stores/task.rs b/aquadoggo/src/db/stores/task.rs index 455792880..378c0e456 100644 --- a/aquadoggo/src/db/stores/task.rs +++ b/aquadoggo/src/db/stores/task.rs @@ -19,7 +19,7 @@ impl SqlStorage { let document_view_id = task_input .document_view_id .as_ref() - .map(|view_id| view_id.to_string()); + .map(|view_id| view_id.as_str()); // Insert task into database let result = query( @@ -39,10 +39,10 @@ impl SqlStorage { .bind(document_view_id) .execute(&self.pool) .await - .map_err(|err| SqlStorageError::TransactionFailed(err.to_string()))?; + .map_err(|err| SqlStorageError::Transaction(err.to_string()))?; if result.rows_affected() != 1 { - Err(SqlStorageError::InsertionFailed("tasks".into())) + Err(SqlStorageError::Insertion("tasks".into())) } else { Ok(()) } @@ -56,18 +56,17 @@ impl SqlStorage { let document_view_id = task_input .document_view_id .as_ref() - .map(|view_id| view_id.to_string()); + .map(|view_id| view_id.as_str()); // Remove task from database - query( + let result = query( " - DELETE - FROM + DELETE FROM tasks WHERE - name = $1 - AND document_id = $2 - AND document_view_id = $3 + name IS $1 + AND document_id IS $2 + AND document_view_id IS $3 ", ) .bind(task.worker_name()) @@ -75,9 +74,13 @@ impl SqlStorage { .bind(document_view_id) .execute(&self.pool) .await - .map_err(|err| SqlStorageError::TransactionFailed(err.to_string()))?; + .map_err(|err| SqlStorageError::Transaction(err.to_string()))?; - Ok(()) + if result.rows_affected() != 1 { + Err(SqlStorageError::Deletion("tasks".into())) + } else { + Ok(()) + } } /// Returns "pending" tasks of the materialization service worker. @@ -85,7 +88,7 @@ impl SqlStorage { let task_rows = query_as::<_, TaskRow>( " SELECT - name + name, document_id, document_view_id FROM @@ -94,19 +97,20 @@ impl SqlStorage { ) .fetch_all(&self.pool) .await - .map_err(|err| SqlStorageError::TransactionFailed(err.to_string()))?; + .map_err(|err| SqlStorageError::Transaction(err.to_string()))?; // Convert database rows into correct p2panda types let mut tasks: Vec> = Vec::new(); for task in task_rows { - let document_id: Option = task - .document_id - .map(|id| id.parse().expect("Invalid document id stored in database")); + let document_id: Option = task.document_id.map(|id| { + id.parse() + .unwrap_or_else(|_| panic!("Invalid document id stored in database {}", id)) + }); let document_view_id: Option = task.document_view_id.map(|view_id| { - view_id - .parse() - .expect("Invalid document view id stored in database") + view_id.parse().unwrap_or_else(|_| { + panic!("Invalid document view id stored in database: {}", view_id) + }) }); tasks.push(Task::new( @@ -118,3 +122,43 @@ impl SqlStorage { Ok(tasks) } } + +#[cfg(test)] +mod tests { + use p2panda_rs::document::DocumentViewId; + use p2panda_rs::test_utils::fixtures::document_view_id; + use rstest::rstest; + + use crate::db::stores::test_utils::{test_db, TestSqlStore}; + use crate::materializer::{Task, TaskInput}; + + #[rstest] + #[tokio::test] + async fn insert_get_remove_tasks( + document_view_id: DocumentViewId, + #[from(test_db)] + #[future] + db: TestSqlStore, + ) { + let db = db.await; + + // Prepare test data + let task = Task::new("reduce", TaskInput::new(None, Some(document_view_id))); + + // Insert task + let result = db.store.insert_task(&task).await; + assert!(result.is_ok(), "{:?}", result); + + // Check if task exists in database + let result = db.store.get_tasks().await; + assert_eq!(result.unwrap(), vec![task.clone()]); + + // Remove task + let result = db.store.remove_task(&task).await; + assert!(result.is_ok(), "{:?}", result); + + // Check if all tasks got removed + let result = db.store.get_tasks().await; + assert_eq!(result.unwrap(), vec![]); + } +} diff --git a/aquadoggo/src/db/stores/test_utils.rs b/aquadoggo/src/db/stores/test_utils.rs index 3624a8fe6..9baf60cd4 100644 --- a/aquadoggo/src/db/stores/test_utils.rs +++ b/aquadoggo/src/db/stores/test_utils.rs @@ -168,8 +168,8 @@ pub async fn insert_entry_operation_and_view( (document_id, document_view_id) } -/// Container for `SqlStore` with access to the document ids and key_pairs -/// used in the pre-populated database for testing. +/// Container for `SqlStore` with access to the document ids and key_pairs used in the +/// pre-populated database for testing. pub struct TestSqlStore { pub store: SqlStorage, pub key_pairs: Vec, diff --git a/aquadoggo/src/db/utils.rs b/aquadoggo/src/db/utils.rs index 8ecaa182f..8619f5cb9 100644 --- a/aquadoggo/src/db/utils.rs +++ b/aquadoggo/src/db/utils.rs @@ -328,7 +328,6 @@ pub fn parse_document_view_field_rows( #[cfg(test)] mod tests { - use p2panda_rs::document::DocumentViewValue; use p2panda_rs::operation::{ AsOperation, OperationId, OperationValue, PinnedRelation, PinnedRelationList, Relation, diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index b3a2308f8..1d3e505db 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -91,7 +91,7 @@ use triggered::{Listener, Trigger}; /// A task holding a generic input value and the name of the worker which will process it /// eventually. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub struct Task(pub WorkerName, IN); impl Task { From a53e0a373430a30483c24fce760340344483928d Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 17:40:55 +0200 Subject: [PATCH 13/28] Add another test for on_update, fix bugs --- aquadoggo/src/materializer/worker.rs | 52 ++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index 1d3e505db..0c0cddd9e 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -128,7 +128,7 @@ pub enum TaskError { } /// Enum representing status of a task. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub enum TaskStatus { /// Task just got scheduled and waiting to be processed. Pending(Task), @@ -483,12 +483,14 @@ where } } - // .. check the task result .. + // Trigger status update when successful + if result.is_ok() { + on_complete(item.input()); + } + + // Check the result match result { Ok(Some(list)) => { - // Trigger status update - on_complete(item.input()); - // Tasks succeeded and dispatches new, subsequent tasks for task in list { if let Err(err) = tx.send(task) { @@ -522,7 +524,7 @@ mod tests { use rand::seq::SliceRandom; use rand::Rng; - use super::{Factory, Task, TaskError, TaskResult}; + use super::{Factory, Task, TaskError, TaskResult, TaskStatus}; #[tokio::test] async fn factory() { @@ -566,6 +568,44 @@ mod tests { assert!(factory.is_empty("second")); } + #[tokio::test] + async fn on_update_subscription() { + type Input = usize; + type Data = usize; + + // Initialise factory + let mut factory = Factory::::new(1, 1024); + + // Record all status changes in this array + let messages: Arc>>> = Arc::new(Mutex::new(Vec::new())); + + // Subscribe to updates and record them + let mut on_update = factory.on_update(); + let messages_clone = messages.clone(); + tokio::task::spawn(async move { + loop { + let message = on_update.recv().await.unwrap(); + messages_clone.lock().unwrap().push(message); + } + }); + + // Define worker and register it + factory.register("test", 1, |_, _| async { Ok(None) }); + + // Queue a couple of tasks + for i in 0..3 { + factory.queue(Task::new("test", i)); + } + + // Wait until work was done .. + tokio::time::sleep(Duration::from_millis(100)).await; + assert!(factory.is_empty("test")); + + // We expect a total of 6 recorded status messages: 3 tasks have been scheduled, and 3 + // completed + assert_eq!(messages.lock().unwrap().len(), 6); + } + #[tokio::test] async fn jigsaw() { // This test solves multiple jigsaw puzzles with our task queue implementation. From db73617bb39b99e9074ff60120d693228c621525 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 18:50:12 +0200 Subject: [PATCH 14/28] Silently fail when subscribers are missing --- aquadoggo/src/graphql/client/mutation.rs | 7 +++++-- aquadoggo/src/materializer/worker.rs | 12 ++++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/aquadoggo/src/graphql/client/mutation.rs b/aquadoggo/src/graphql/client/mutation.rs index 6a64dcd1b..8c0300bc6 100644 --- a/aquadoggo/src/graphql/client/mutation.rs +++ b/aquadoggo/src/graphql/client/mutation.rs @@ -65,9 +65,12 @@ impl ClientMutationRoot { // Send new operation on service communication bus, this will arrive eventually at // the materializer service - tx.send(ServiceMessage::NewOperation( + if let Err(_) = tx.send(ServiceMessage::NewOperation( verified_operation.operation_id().to_owned(), - ))?; + )) { + // Silently fail here as we don't mind if there are no subscribers. We have + // tests in other places to check if messages arrive. + } Ok(response) } diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index 0c0cddd9e..c50255d38 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -380,9 +380,9 @@ where task::spawn(async move { // Inform status subscribers that we've just scheduled a new task let on_pending = |task: Task| { - if let Err(err) = tx_status.send(TaskStatus::Pending(task)) { - error!("Error while sending task status: {}", err); - error_signal.trigger(); + if let Err(_) = tx_status.send(TaskStatus::Pending(task)) { + // Silently fail here since an error only occurs here when there are no + // subscribers, but we don't mind that. } }; @@ -459,9 +459,9 @@ where // Inform status subscribers that we just completed a task let on_complete = |input: IN| { let status = TaskStatus::Completed(Task::new(&name, input)); - if let Err(err) = tx_status.send(status) { - error!("Error while sending task status: {}", err); - error_signal.trigger(); + if let Err(_) = tx_status.send(status) { + // Silently fail here since an error only occurs here when there are no + // subscribers, but we don't mind that. } }; From fe5f97f5d3de28b6ce6eac1b7ea512a00e7091ed Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 18:50:21 +0200 Subject: [PATCH 15/28] Add a test for materializer service --- aquadoggo/src/materializer/service.rs | 96 +++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index 10a2a9e39..2f1a3fe2b 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -127,3 +127,99 @@ pub async fn materializer_service( Ok(()) } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use p2panda_rs::operation::{AsVerifiedOperation, OperationValue}; + use p2panda_rs::storage_provider::traits::OperationStore; + use p2panda_rs::test_utils::constants::TEST_SCHEMA_ID; + use rstest::rstest; + use tokio::sync::broadcast; + use tokio::task; + + use crate::context::Context; + use crate::db::stores::test_utils::{test_db, TestSqlStore}; + use crate::db::traits::DocumentStore; + use crate::Configuration; + + use super::materializer_service; + + #[rstest] + #[tokio::test] + async fn materialize_document_from_bus( + #[from(test_db)] + #[with(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), vec![("name", OperationValue::Text("panda".into()))])] + #[future] + db: TestSqlStore, + ) { + // Prepare database which inserts data for one document + let db = db.await; + + // Identify document and operation which was inserted for testing + let document_id = db.documents.first().unwrap(); + let verified_operation = db + .store + .get_operations_by_document_id(document_id) + .await + .unwrap() + .first() + .unwrap() + .to_owned(); + + // We expect that the database does not contain any materialized document yet + assert!(db + .store + .get_document_by_id(document_id) + .await + .unwrap() + .is_none()); + + // Prepare arguments for service + let context = Context::new(db.store.clone(), Configuration::default()); + let shutdown = task::spawn(async { + loop { + // Do this forever .. this means that the shutdown handler will never resolve + tokio::time::sleep(Duration::from_millis(100)).await; + } + }); + let (tx, _) = broadcast::channel(1024); + + // Start materializer service + let tx_clone = tx.clone(); + let handle = tokio::spawn(async move { + materializer_service(context, shutdown, tx_clone) + .await + .unwrap(); + }); + + // Wait for service to be ready .. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Send a message over the bus which kicks in materialization + tx.send(crate::bus::ServiceMessage::NewOperation( + verified_operation.operation_id().to_owned(), + )) + .unwrap(); + + // Wait a little bit for work being done .. + tokio::time::sleep(Duration::from_millis(100)).await; + + // Make sure the service did not crash and is still running + assert_eq!(handle.is_finished(), false); + + // Check database for materialized documents + let document = db + .store + .get_document_by_id(document_id) + .await + .unwrap() + .expect("We expect that the document is `Some`"); + assert_eq!(document.id().as_str(), document_id.as_str()); + assert_eq!( + document.fields().get("name").unwrap().value().to_owned(), + OperationValue::Text("panda".into()) + ); + } +} From 7e61ee459fa4e9a33d1bd8ef38e54df93f6c3f04 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 18:53:50 +0200 Subject: [PATCH 16/28] Happy clippy, happy developer --- aquadoggo/src/graphql/client/mutation.rs | 9 ++++++--- aquadoggo/src/materializer/worker.rs | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/aquadoggo/src/graphql/client/mutation.rs b/aquadoggo/src/graphql/client/mutation.rs index 8c0300bc6..6c6626a64 100644 --- a/aquadoggo/src/graphql/client/mutation.rs +++ b/aquadoggo/src/graphql/client/mutation.rs @@ -65,9 +65,12 @@ impl ClientMutationRoot { // Send new operation on service communication bus, this will arrive eventually at // the materializer service - if let Err(_) = tx.send(ServiceMessage::NewOperation( - verified_operation.operation_id().to_owned(), - )) { + if tx + .send(ServiceMessage::NewOperation( + verified_operation.operation_id().to_owned(), + )) + .is_err() + { // Silently fail here as we don't mind if there are no subscribers. We have // tests in other places to check if messages arrive. } diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index c50255d38..e7dbc17a8 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -380,7 +380,7 @@ where task::spawn(async move { // Inform status subscribers that we've just scheduled a new task let on_pending = |task: Task| { - if let Err(_) = tx_status.send(TaskStatus::Pending(task)) { + if tx_status.send(TaskStatus::Pending(task)).is_err() { // Silently fail here since an error only occurs here when there are no // subscribers, but we don't mind that. } @@ -459,7 +459,7 @@ where // Inform status subscribers that we just completed a task let on_complete = |input: IN| { let status = TaskStatus::Completed(Task::new(&name, input)); - if let Err(_) = tx_status.send(status) { + if tx_status.send(status).is_err() { // Silently fail here since an error only occurs here when there are no // subscribers, but we don't mind that. } From 26000526e237b6900a04f431aad8c7e8c2e1daa2 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 19:11:54 +0200 Subject: [PATCH 17/28] Make tasks unique, check duplicates before insertion --- .../20220617115933_create-tasks.sql | 6 ++--- aquadoggo/src/db/stores/task.rs | 24 +++++++++++++++++-- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/aquadoggo/migrations/20220617115933_create-tasks.sql b/aquadoggo/migrations/20220617115933_create-tasks.sql index 20768ed1c..db3371e47 100644 --- a/aquadoggo/migrations/20220617115933_create-tasks.sql +++ b/aquadoggo/migrations/20220617115933_create-tasks.sql @@ -1,10 +1,8 @@ -- SPDX-License-Identifier: AGPL-3.0-or-later --- NOTE: On this level we can not assure eventual task duplicates, this is why we do --- not have any SQL UNIQUE constraints. - CREATE TABLE IF NOT EXISTS tasks ( name TEXT NOT NULL, document_id TEXT NULL, - document_view_id TEXT NULL + document_view_id TEXT NULL, + PRIMARY KEY (name, document_id, document_view_id) ); diff --git a/aquadoggo/src/db/stores/task.rs b/aquadoggo/src/db/stores/task.rs index 378c0e456..5a0e2d2b8 100644 --- a/aquadoggo/src/db/stores/task.rs +++ b/aquadoggo/src/db/stores/task.rs @@ -15,12 +15,32 @@ impl SqlStorage { pub async fn insert_task(&self, task: &Task) -> Result<(), SqlStorageError> { // Convert task input to correct database types let task_input = task.input(); - let document_id = task_input.document_id.as_ref().map(|id| id.to_string()); + let document_id = task_input.document_id.as_ref().map(|id| id.as_str()); let document_view_id = task_input .document_view_id .as_ref() .map(|view_id| view_id.as_str()); + // Check first if this task already exists, to avoid duplicate rows + let task_row = query_as::<_, TaskRow>( + " + SELECT + name, + document_id, + document_view_id + FROM + tasks + ", + ) + .fetch_optional(&self.pool) + .await + .map_err(|err| SqlStorageError::Transaction(err.to_string()))?; + + // If yes, we are already done here + if task_row.is_some() { + return Ok(()) + } + // Insert task into database let result = query( " @@ -52,7 +72,7 @@ impl SqlStorage { pub async fn remove_task(&self, task: &Task) -> Result<(), SqlStorageError> { // Convert task input to correct database types let task_input = task.input(); - let document_id = task_input.document_id.as_ref().map(|id| id.to_string()); + let document_id = task_input.document_id.as_ref().map(|id| id.as_str()); let document_view_id = task_input .document_view_id .as_ref() From f27656e224990ea57722a1f52137b7bf46a99e77 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 19:12:05 +0200 Subject: [PATCH 18/28] Add a test for re-scheduled tasks --- aquadoggo/src/materializer/service.rs | 64 +++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index 2f1a3fe2b..72e019aaf 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -142,6 +142,7 @@ mod tests { use crate::context::Context; use crate::db::stores::test_utils::{test_db, TestSqlStore}; use crate::db::traits::DocumentStore; + use crate::materializer::{Task, TaskInput}; use crate::Configuration; use super::materializer_service; @@ -222,4 +223,67 @@ mod tests { OperationValue::Text("panda".into()) ); } + + #[rstest] + #[tokio::test] + async fn materialize_document_from_last_runtime( + #[from(test_db)] + #[with(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), vec![("name", OperationValue::Text("panda".into()))])] + #[future] + db: TestSqlStore, + ) { + // Prepare database which inserts data for one document + let db = db.await; + + // Identify document and operation which was inserted for testing + let document_id = db.documents.first().unwrap(); + + // Store a pending "reduce" task from last runtime in the database so it gets picked up by + // the materializer service + db.store + .insert_task(&Task::new( + "reduce", + TaskInput::new(Some(document_id.to_owned()), None), + )) + .await + .unwrap(); + + // Prepare arguments for service + let context = Context::new(db.store.clone(), Configuration::default()); + let shutdown = task::spawn(async { + loop { + // Do this forever .. this means that the shutdown handler will never resolve + tokio::time::sleep(Duration::from_millis(100)).await; + } + }); + let (tx, _) = broadcast::channel(1024); + + // Start materializer service + let tx_clone = tx.clone(); + let handle = tokio::spawn(async move { + materializer_service(context, shutdown, tx_clone) + .await + .unwrap(); + }); + + // Wait for service to be done .. it should materialize the document since it was waiting + // as a "pending" task in the database + tokio::time::sleep(Duration::from_millis(100)).await; + + // Make sure the service did not crash and is still running + assert_eq!(handle.is_finished(), false); + + // Check database for materialized documents + let document = db + .store + .get_document_by_id(document_id) + .await + .unwrap() + .expect("We expect that the document is `Some`"); + assert_eq!(document.id().as_str(), document_id.as_str()); + assert_eq!( + document.fields().get("name").unwrap().value().to_owned(), + OperationValue::Text("panda".into()) + ); + } } From e9966d8e85e86df2640512631cfc6807be13757c Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 19:12:32 +0200 Subject: [PATCH 19/28] Happy peppi --- aquadoggo/src/db/stores/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/db/stores/task.rs b/aquadoggo/src/db/stores/task.rs index 5a0e2d2b8..b4ed26afb 100644 --- a/aquadoggo/src/db/stores/task.rs +++ b/aquadoggo/src/db/stores/task.rs @@ -38,7 +38,7 @@ impl SqlStorage { // If yes, we are already done here if task_row.is_some() { - return Ok(()) + return Ok(()); } // Insert task into database From 85472b1f4679391e2d56c3d79c03e7b9dabb93aa Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 19:13:32 +0200 Subject: [PATCH 20/28] Add entry to CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b8f552712..b1c181da6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Reduce and dependency tasks [#144](https://github.com/p2panda/aquadoggo/pull/144) - GraphQL endpoints for replication [#100](https://github.com/p2panda/aquadoggo/pull/100) - Inform materialization service about new operations [#161](https://github.com/p2panda/aquadoggo/pull/161) +- Re-schedule pending tasks on startup [#168](https://github.com/p2panda/aquadoggo/pull/168) ### Changed From 4d950e819f650a93bbe262e306441613595e3f9e Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 19:25:29 +0200 Subject: [PATCH 21/28] Reassure myself that internally dispatched tasks are also reported --- aquadoggo/src/materializer/worker.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index e7dbc17a8..a0c91a0f2 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -589,21 +589,25 @@ mod tests { } }); - // Define worker and register it - factory.register("test", 1, |_, _| async { Ok(None) }); + // Define workers and register them + factory.register("one", 1, |_, input: Input| async move { + Ok(Some(vec![Task::new("two", input)])) + }); + factory.register("two", 1, |_, _| async { Ok(None) }); // Queue a couple of tasks for i in 0..3 { - factory.queue(Task::new("test", i)); + factory.queue(Task::new("one", i)); } // Wait until work was done .. tokio::time::sleep(Duration::from_millis(100)).await; - assert!(factory.is_empty("test")); + assert!(factory.is_empty("one")); - // We expect a total of 6 recorded status messages: 3 tasks have been scheduled, and 3 - // completed - assert_eq!(messages.lock().unwrap().len(), 6); + // We expect a total of 12 recorded status messages: + // - 3x "one" and 3x "two" tasks have been scheduled + // - 3x "one" and 3x "two" tasks have been completed + assert_eq!(messages.lock().unwrap().len(), 12); } #[tokio::test] From 88e7fc0f0dbff84aa79f98db20b9239100e650cc Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 19:31:18 +0200 Subject: [PATCH 22/28] Fix grammar --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b1c181da6..3a584836f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,7 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Reduce and dependency tasks [#144](https://github.com/p2panda/aquadoggo/pull/144) - GraphQL endpoints for replication [#100](https://github.com/p2panda/aquadoggo/pull/100) - Inform materialization service about new operations [#161](https://github.com/p2panda/aquadoggo/pull/161) -- Re-schedule pending tasks on startup [#168](https://github.com/p2panda/aquadoggo/pull/168) +- Reschedule pending tasks on startup [#168](https://github.com/p2panda/aquadoggo/pull/168) ### Changed From a45d453480f116a5f1006c10ea468b7781c492c8 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Fri, 17 Jun 2022 19:34:18 +0200 Subject: [PATCH 23/28] Add a test for checking for duplicates --- aquadoggo/src/db/stores/task.rs | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/aquadoggo/src/db/stores/task.rs b/aquadoggo/src/db/stores/task.rs index b4ed26afb..df790f8de 100644 --- a/aquadoggo/src/db/stores/task.rs +++ b/aquadoggo/src/db/stores/task.rs @@ -145,8 +145,8 @@ impl SqlStorage { #[cfg(test)] mod tests { - use p2panda_rs::document::DocumentViewId; - use p2panda_rs::test_utils::fixtures::document_view_id; + use p2panda_rs::document::{DocumentId, DocumentViewId}; + use p2panda_rs::test_utils::fixtures::{document_id, document_view_id}; use rstest::rstest; use crate::db::stores::test_utils::{test_db, TestSqlStore}; @@ -181,4 +181,30 @@ mod tests { let result = db.store.get_tasks().await; assert_eq!(result.unwrap(), vec![]); } + + #[rstest] + #[tokio::test] + async fn avoid_duplicates( + document_id: DocumentId, + #[from(test_db)] + #[future] + db: TestSqlStore, + ) { + let db = db.await; + + // Prepare test data + let task = Task::new("reduce", TaskInput::new(Some(document_id), None)); + + // Insert task + let result = db.store.insert_task(&task).await; + assert!(result.is_ok(), "{:?}", result); + + // Insert the same thing again, it should silently fail + let result = db.store.insert_task(&task).await; + assert!(result.is_ok(), "{:?}", result); + + // Check for duplicates + let result = db.store.get_tasks().await; + assert_eq!(result.unwrap().len(), 1); + } } From 225010db30f2da1d796547fe5f9afd2c65fe610e Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Sat, 18 Jun 2022 01:10:02 +0200 Subject: [PATCH 24/28] Make worker name private --- aquadoggo/src/materializer/tasks/dependency.rs | 4 ++-- aquadoggo/src/materializer/worker.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 3daa18f4c..e9f8c5309 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -177,7 +177,7 @@ mod tests { .unwrap(); assert_eq!(reduce_tasks.len(), expected_next_tasks); for task in reduce_tasks { - assert_eq!(task.0, "reduce") + assert_eq!(task.worker_name(), "reduce") } } } @@ -236,7 +236,7 @@ mod tests { .unwrap(); assert_eq!(tasks.len(), 1); - assert_eq!(tasks[0].0, "reduce"); + assert_eq!(tasks[0].worker_name(), "reduce"); } #[rstest] diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index a0c91a0f2..6b87ccb2c 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -92,7 +92,7 @@ use triggered::{Listener, Trigger}; /// A task holding a generic input value and the name of the worker which will process it /// eventually. #[derive(Debug, Clone, Eq, PartialEq)] -pub struct Task(pub WorkerName, IN); +pub struct Task(WorkerName, IN); impl Task { /// Returns a new task. @@ -390,7 +390,7 @@ where match rx.recv().await { // A new task got announced in the broadcast channel! Ok(task) => { - if task.0 != name { + if task.worker_name() != &name { continue; // This is not for us .. } From 709eacb403e4df611cb33a10be3c7e9a7514cb12 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Sat, 18 Jun 2022 01:16:44 +0200 Subject: [PATCH 25/28] Change equality of name in SQL query --- aquadoggo/src/db/stores/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/db/stores/task.rs b/aquadoggo/src/db/stores/task.rs index df790f8de..954bc3071 100644 --- a/aquadoggo/src/db/stores/task.rs +++ b/aquadoggo/src/db/stores/task.rs @@ -84,7 +84,7 @@ impl SqlStorage { DELETE FROM tasks WHERE - name IS $1 + name = $1 AND document_id IS $2 AND document_view_id IS $3 ", From 18e036714970e0145763ba96ece0c84f34cf32e6 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Mon, 20 Jun 2022 16:16:48 +0200 Subject: [PATCH 26/28] Change method name to on_task_status_change --- aquadoggo/src/materializer/service.rs | 4 ++-- aquadoggo/src/materializer/worker.rs | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index 72e019aaf..c323bf0d4 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -42,14 +42,14 @@ pub async fn materializer_service( let on_error = factory.on_error(); // Subscribe to status changes of tasks - let mut on_update = factory.on_update(); + let mut on_task_status_change = factory.on_task_status_change(); let store = context.store.clone(); // Keep track of status changes and persist it in the database. This allows us to pick up // uncompleted tasks next time we start the node. let status_handle = task::spawn(async move { loop { - match on_update.recv().await { + match on_task_status_change.recv().await { Ok(TaskStatus::Pending(task)) => { store .insert_task(&task) diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index 6b87ccb2c..6741691db 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -350,7 +350,7 @@ where } /// Subscribe to status changes of tasks. - pub fn on_update(&self) -> Receiver> { + pub fn on_task_status_change(&self) -> Receiver> { self.tx_status.subscribe() } @@ -569,7 +569,7 @@ mod tests { } #[tokio::test] - async fn on_update_subscription() { + async fn on_task_status_change_subscription() { type Input = usize; type Data = usize; @@ -580,11 +580,11 @@ mod tests { let messages: Arc>>> = Arc::new(Mutex::new(Vec::new())); // Subscribe to updates and record them - let mut on_update = factory.on_update(); + let mut on_task_status_change = factory.on_task_status_change(); let messages_clone = messages.clone(); tokio::task::spawn(async move { loop { - let message = on_update.recv().await.unwrap(); + let message = on_task_status_change.recv().await.unwrap(); messages_clone.lock().unwrap().push(message); } }); From 821ff15f5f344d3e78ff0feb448c8182bab2d046 Mon Sep 17 00:00:00 2001 From: Vincent Ahrend Date: Wed, 22 Jun 2022 16:01:30 +0200 Subject: [PATCH 27/28] Avoid duplicate task rows using unique index --- .../20220617115933_create-tasks.sql | 15 +++++++-- aquadoggo/src/db/stores/task.rs | 32 +++---------------- 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/aquadoggo/migrations/20220617115933_create-tasks.sql b/aquadoggo/migrations/20220617115933_create-tasks.sql index db3371e47..b351d722a 100644 --- a/aquadoggo/migrations/20220617115933_create-tasks.sql +++ b/aquadoggo/migrations/20220617115933_create-tasks.sql @@ -3,6 +3,17 @@ CREATE TABLE IF NOT EXISTS tasks ( name TEXT NOT NULL, document_id TEXT NULL, - document_view_id TEXT NULL, - PRIMARY KEY (name, document_id, document_view_id) + document_view_id TEXT NULL ); + +-- Create a unique index using `COALESCE`. A regular `UNIQUE` clause will +-- consider two rows that have at least one `null` value to always be distinct +-- but we want to check for equality including `null` values. +CREATE UNIQUE INDEX ux_tasks ON tasks ( + name, + COALESCE(document_id, 0), + COALESCE(document_view_id, 0) +); + +-- Create an index because primary keys can not contain `null` columns. +CREATE INDEX idx_tasks ON tasks (name, document_id, document_view_id); diff --git a/aquadoggo/src/db/stores/task.rs b/aquadoggo/src/db/stores/task.rs index 954bc3071..5abb3fc2f 100644 --- a/aquadoggo/src/db/stores/task.rs +++ b/aquadoggo/src/db/stores/task.rs @@ -21,30 +21,10 @@ impl SqlStorage { .as_ref() .map(|view_id| view_id.as_str()); - // Check first if this task already exists, to avoid duplicate rows - let task_row = query_as::<_, TaskRow>( - " - SELECT - name, - document_id, - document_view_id - FROM - tasks - ", - ) - .fetch_optional(&self.pool) - .await - .map_err(|err| SqlStorageError::Transaction(err.to_string()))?; - - // If yes, we are already done here - if task_row.is_some() { - return Ok(()); - } - // Insert task into database - let result = query( + query( " - INSERT INTO + INSERT OR IGNORE INTO tasks ( name, document_id, @@ -61,11 +41,7 @@ impl SqlStorage { .await .map_err(|err| SqlStorageError::Transaction(err.to_string()))?; - if result.rows_affected() != 1 { - Err(SqlStorageError::Insertion("tasks".into())) - } else { - Ok(()) - } + Ok(()) } /// Removes a "pending" task from the database. @@ -85,6 +61,7 @@ impl SqlStorage { tasks WHERE name = $1 + -- Use `IS` because these columns can contain `null` values. AND document_id IS $2 AND document_view_id IS $3 ", @@ -205,6 +182,7 @@ mod tests { // Check for duplicates let result = db.store.get_tasks().await; + // println!("{:?}", result.unwrap()); assert_eq!(result.unwrap().len(), 1); } } From 51174023f64728ea55debe6b2dcf9ac0cc50f417 Mon Sep 17 00:00:00 2001 From: Vincent Ahrend Date: Wed, 22 Jun 2022 16:04:24 +0200 Subject: [PATCH 28/28] Remove print stmt --- aquadoggo/src/db/stores/task.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/aquadoggo/src/db/stores/task.rs b/aquadoggo/src/db/stores/task.rs index 5abb3fc2f..d1ff1417d 100644 --- a/aquadoggo/src/db/stores/task.rs +++ b/aquadoggo/src/db/stores/task.rs @@ -182,7 +182,6 @@ mod tests { // Check for duplicates let result = db.store.get_tasks().await; - // println!("{:?}", result.unwrap()); assert_eq!(result.unwrap().len(), 1); } }