From ee66affd7916859cf924214257b2eb4389ad0eef Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Mon, 6 Mar 2023 18:43:42 +0100 Subject: [PATCH 1/6] Introduce requeue flag --- aquadoggo/src/materializer/worker.rs | 85 +++++++++++++++++++--------- 1 file changed, 57 insertions(+), 28 deletions(-) diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index 1d622eabd..c2b79022c 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -75,7 +75,7 @@ //! //! Task 1 results in "25", Task 2 in "64", Task 4 in "9". //! ``` -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::future::Future; use std::hash::Hash; @@ -139,6 +139,9 @@ pub enum TaskStatus { /// Workers are identified by simple string values. pub type WorkerName = String; +/// Flag for the input index to indicate if we want to requeue the task after it completed. +type RequeueFlag = bool; + /// Every registered worker pool is managed by a `WorkerManager` which holds the task queue for /// this registered work and an index of all current inputs in the task queue. struct WorkerManager @@ -149,7 +152,12 @@ where /// /// This allows us to avoid duplicate tasks by detecting if there is already a task in our /// queue with the same input hash. - input_index: Arc>>, + /// + /// An additional flag can be used to indicate that we want to requeue the same task again + /// after it completed. This is useful to account for more events which arrived _while_ the + /// task was processed. It is enough to only remember one of the potentially many events + /// arriving in this time, we're "batching" them for the next round. + input_index: Arc>>, /// FIFO queue of all tasks for this worker pool. queue: Arc>>, @@ -162,7 +170,7 @@ where /// Returns a new worker manager. pub fn new() -> Self { Self { - input_index: Arc::new(Mutex::new(HashSet::new())), + input_index: Arc::new(Mutex::new(HashMap::new())), queue: Arc::new(Queue::new()), } } @@ -405,16 +413,23 @@ where // Check if a task with the same input values already exists in queue match input_index.lock() { Ok(mut index) => { - if index.contains(&task.1) { - continue; // Task already exists - } else { - // Trigger status update - on_pending(task.clone()); - - // Generate a unique id for this new task and add it to queue - let next_id = counter.fetch_add(1, Ordering::Relaxed); - queue.push(QueueItem::new(next_id, task.1.clone())); - index.insert(task.1); + match index.get(&task.1) { + Some(requeue) => { + if *requeue { + continue; + } else { + index.insert(task.1, true); + } + } + None => { + // Trigger status update + on_pending(task.clone()); + + // Generate a unique id for this new task and add it to queue + let next_id = counter.fetch_add(1, Ordering::Relaxed); + queue.push(QueueItem::new(next_id, task.1.clone())); + index.insert(task.1, false); + } } } Err(err) => { @@ -480,21 +495,6 @@ where // Take this task and do work .. let result = work.call(context.clone(), item.input()).await; - // Remove input index from queue - match input_index.lock() { - Ok(mut index) => { - index.remove(&item.input()); - } - Err(err) => { - error!( - "Error while locking input index in worker {} for task {:?}: {}", - name, item, err - ); - - error_signal.trigger(); - } - } - // Trigger removing the task from the task store on_complete(item.input()); @@ -525,6 +525,35 @@ where ); } _ => (), // Task succeeded, but nothing to dispatch + }; + + // Remove input index from queue and check if we should requeue that task + let requeue = match input_index.lock() { + Ok(mut index) => match index.remove(&item.input()) { + Some(value) => value, + None => { + error!("Incosistency detected in queue input index"); + error_signal.trigger(); + false + } + }, + Err(err) => { + error!( + "Error while locking input index in worker {} for task {:?}: {}", + name, item, err + ); + + error_signal.trigger(); + false + } + }; + + // Send the task again to dispatcher if requeue flag is set + if requeue { + if let Err(err) = tx.send(Task(name.clone(), item.input())) { + error!("Error while broadcasting task during requeue: {}", err); + error_signal.trigger(); + } } } }); From e86e72a54ee79ae68edbc0c5b4affbfd56cf2804 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Mon, 6 Mar 2023 18:59:06 +0100 Subject: [PATCH 2/6] Write more about re-scheduling --- aquadoggo/src/materializer/worker.rs | 29 +++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index c2b79022c..4a9fb6e19 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -4,10 +4,10 @@ //! processed in worker pools where one worker executes the task. //! //! A task queue allows control over a) order of operations and b) amount of work being done per -//! time c) avoiding duplicate work. +//! time c) avoiding concurrent writes to the same data. //! -//! This particular task queue implementation rejects tasks with duplicate input values already -//! waiting in the queue (which would result in doing the same work again). +//! This particular task queue implementation "batches" tasks with duplicate input values, only +//! processing one at a time to avoid them overwriting each other's work. //! //! A worker can be defined by any sort of async function which returns a result, indicating if it //! succeeded, failed or crashed critically. @@ -65,7 +65,8 @@ //! -------------------- //! //! The internal queue of "square" contains now: [{Task 1}, {Task 2}, {Task 4}]. Task 3 got -//! rejected silently as it contains the same input data. +//! internally "batched" as it contains the same input data as Task 1, which means that Task 1 will +//! be re-scheduled after it finished. //! //! 3. Process tasks //! @@ -73,7 +74,12 @@ //! concurrently. After one of them finishes, the next free worker will eventually take Task 4 from //! the queue and process it. //! -//! Task 1 results in "25", Task 2 in "64", Task 4 in "9". +//! Task 1 results in "25", Task 2 in "64", Task 4 in "9". Later Task 1 gets re-scheduled to +//! account for the duplicate Task 3 (again resulting in "25"). +//! +//! In this example that might look redundant but in a more complex system the input might be the +//! same, but the worker function might have access to a database with possibily diverging state +//! between the tasks. //! ``` use std::collections::HashMap; use std::fmt::{Debug, Display}; @@ -150,7 +156,7 @@ where { /// Index of all current inputs inside the task queue organized in a hash set. /// - /// This allows us to avoid duplicate tasks by detecting if there is already a task in our + /// This allows us to detect duplicate tasks by checking if there is already a task in our /// queue with the same input hash. /// /// An additional flag can be used to indicate that we want to requeue the same task again @@ -342,8 +348,8 @@ where /// Queues up a new task in the regarding worker queue. /// - /// Tasks with duplicate input values which already exist in the queue will be silently - /// rejected. + /// Tasks with duplicate input values which already exist in the queue will be internally + /// batched. pub fn queue(&mut self, task: Task) { if let Err(err) = self.tx.send(task) { error!("Error while broadcasting task: {}", err); @@ -416,13 +422,18 @@ where match index.get(&task.1) { Some(requeue) => { if *requeue { + // We observed already one duplicate task coming in, + // let's ignore this one continue; } else { + // This is the first duplicate coming in, let's set + // the requeue flag to indicate that more work needs to + // be done when the current task completes index.insert(task.1, true); } } None => { - // Trigger status update + // This task is completly new! Trigger status update on_pending(task.clone()); // Generate a unique id for this new task and add it to queue From 45e1e5992a749a8ee05da7ede6cb93cdf092b1ce Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Mon, 6 Mar 2023 19:47:56 +0100 Subject: [PATCH 3/6] Only remove task from store when it is done for good --- aquadoggo/src/materializer/worker.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index 4a9fb6e19..738590dab 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -506,9 +506,6 @@ where // Take this task and do work .. let result = work.call(context.clone(), item.input()).await; - // Trigger removing the task from the task store - on_complete(item.input()); - // Check the result match result { Ok(Some(list)) => { @@ -565,6 +562,9 @@ where error!("Error while broadcasting task during requeue: {}", err); error_signal.trigger(); } + } else { + // Trigger removing the task from the task store + on_complete(item.input()); } } }); From b64ff0b5593568dbe692342ccc8c698829f88a89 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Tue, 7 Mar 2023 13:01:51 +0100 Subject: [PATCH 4/6] Use enum for flag instead --- aquadoggo/src/materializer/worker.rs | 45 +++++++++++++++++----------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index 738590dab..4840c4206 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -145,8 +145,14 @@ pub enum TaskStatus { /// Workers are identified by simple string values. pub type WorkerName = String; -/// Flag for the input index to indicate if we want to requeue the task after it completed. -type RequeueFlag = bool; +/// Flags for queue items to define post-completion actions. +enum PostAction { + /// Moves the completed task into the queue again. + Requeue, + + /// Do nothing after completion. + Idle, +} /// Every registered worker pool is managed by a `WorkerManager` which holds the task queue for /// this registered work and an index of all current inputs in the task queue. @@ -163,7 +169,7 @@ where /// after it completed. This is useful to account for more events which arrived _while_ the /// task was processed. It is enough to only remember one of the potentially many events /// arriving in this time, we're "batching" them for the next round. - input_index: Arc>>, + input_index: Arc>>, /// FIFO queue of all tasks for this worker pool. queue: Arc>>, @@ -420,26 +426,28 @@ where match input_index.lock() { Ok(mut index) => { match index.get(&task.1) { - Some(requeue) => { - if *requeue { - // We observed already one duplicate task coming in, - // let's ignore this one - continue; - } else { - // This is the first duplicate coming in, let's set - // the requeue flag to indicate that more work needs to - // be done when the current task completes - index.insert(task.1, true); - } - } None => { - // This task is completly new! Trigger status update + // 1. This task is completly new! We don't work on anything + // similar yet. + // + // Trigger status update: on_pending(task.clone()); // Generate a unique id for this new task and add it to queue let next_id = counter.fetch_add(1, Ordering::Relaxed); queue.push(QueueItem::new(next_id, task.1.clone())); - index.insert(task.1, false); + index.insert(task.1, PostAction::Idle); + } + Some(PostAction::Idle) => { + // 2. This is the first duplicate coming in, let's set the + // requeue flag to indicate that more work needs to be done + // when the current task completes + index.insert(task.1, PostAction::Requeue); + } + Some(PostAction::Requeue) => { + // 3. We observed already one duplicate task coming in, let's + // ignore this one + continue; } } } @@ -538,7 +546,8 @@ where // Remove input index from queue and check if we should requeue that task let requeue = match input_index.lock() { Ok(mut index) => match index.remove(&item.input()) { - Some(value) => value, + Some(PostAction::Idle) => false, + Some(PostAction::Requeue) => true, None => { error!("Incosistency detected in queue input index"); error_signal.trigger(); From 3573aab9cb22e40d89b7bdd8cac758d27d4dd08a Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Mon, 13 Mar 2023 11:05:46 +0100 Subject: [PATCH 5/6] Add entry to CHANGELOG.md --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 53a7d8676..59fab1c7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,8 +14,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- Correct use of `sqlx` transactions [#285](https://github.com/p2panda/aquadoggo/pull/285) `rs` +- Correct use of `sqlx` transactions [#285](https://github.com/p2panda/aquadoggo/pull/285) - Fix race-condition of mutably shared static schema store during testing [#269](https://github.com/p2panda/aquadoggo/pull/269) +- Introduce flag to requeue tasks in worker queue, fixes race-condition in materialization logic [#286](https://github.com/p2panda/aquadoggo/pull/286) ## [0.4.0] From 9d98a7e05cfba3eaf3c98f48fe267ffc18521f3f Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 14 Mar 2023 15:03:57 +0000 Subject: [PATCH 6/6] Add logging --- aquadoggo/src/materializer/worker.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index 4840c4206..3b76ad819 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -434,6 +434,7 @@ where on_pending(task.clone()); // Generate a unique id for this new task and add it to queue + debug!("Sending materializer {} task with input {} to the task queue.", task.worker_name(), task.input()); let next_id = counter.fetch_add(1, Ordering::Relaxed); queue.push(QueueItem::new(next_id, task.1.clone())); index.insert(task.1, PostAction::Idle); @@ -442,11 +443,13 @@ where // 2. This is the first duplicate coming in, let's set the // requeue flag to indicate that more work needs to be done // when the current task completes + debug!("Duplicate materializer {} task already in progress, setting re-queue flag for task with input {} and not adding this task to the queue.", task.worker_name(), task.input()); index.insert(task.1, PostAction::Requeue); } Some(PostAction::Requeue) => { // 3. We observed already one duplicate task coming in, let's // ignore this one + debug!("Materializer {} task with input {} not sent to queue as a task for this document has already been re-queued.", task.worker_name(), task.input()); continue; } }