Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce requeue flag #286

Merged
merged 6 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Correct use of `sqlx` transactions [#285](https://github.com/p2panda/aquadoggo/pull/285) `rs`
- Correct use of `sqlx` transactions [#285](https://github.com/p2panda/aquadoggo/pull/285)
- Fix race-condition of mutably shared static schema store during testing [#269](https://github.com/p2panda/aquadoggo/pull/269)
- Introduce flag to requeue tasks in worker queue, fixes race-condition in materialization logic [#286](https://github.com/p2panda/aquadoggo/pull/286)

## [0.4.0]

Expand Down
130 changes: 91 additions & 39 deletions aquadoggo/src/materializer/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
//! processed in worker pools where one worker executes the task.
//!
//! A task queue allows control over a) order of operations and b) amount of work being done per
//! time c) avoiding duplicate work.
//! time c) avoiding concurrent writes to the same data.
//!
//! This particular task queue implementation rejects tasks with duplicate input values already
//! waiting in the queue (which would result in doing the same work again).
//! This particular task queue implementation "batches" tasks with duplicate input values, only
//! processing one at a time to avoid them overwriting each other's work.
//!
//! A worker can be defined by any sort of async function which returns a result, indicating if it
//! succeeded, failed or crashed critically.
Expand Down Expand Up @@ -65,17 +65,23 @@
//! --------------------
//!
//! The internal queue of "square" contains now: [{Task 1}, {Task 2}, {Task 4}]. Task 3 got
//! rejected silently as it contains the same input data.
//! internally "batched" as it contains the same input data as Task 1, which means that Task 1 will
//! be re-scheduled after it finished.
//!
//! 3. Process tasks
//!
//! Worker "a" takes Task 1, worker "b" takes Task 2 from the queue. They both get processed
//! concurrently. After one of them finishes, the next free worker will eventually take Task 4 from
//! the queue and process it.
//!
//! Task 1 results in "25", Task 2 in "64", Task 4 in "9".
//! Task 1 results in "25", Task 2 in "64", Task 4 in "9". Later Task 1 gets re-scheduled to
//! account for the duplicate Task 3 (again resulting in "25").
//!
//! In this example that might look redundant but in a more complex system the input might be the
//! same, but the worker function might have access to a database with possibily diverging state
//! between the tasks.
//! ```
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::future::Future;
use std::hash::Hash;
Expand Down Expand Up @@ -139,6 +145,15 @@ pub enum TaskStatus<IN> {
/// Workers are identified by simple string values.
pub type WorkerName = String;

/// Flags for queue items to define post-completion actions.
enum PostAction {
/// Moves the completed task into the queue again.
Requeue,

/// Do nothing after completion.
Idle,
}

/// Every registered worker pool is managed by a `WorkerManager` which holds the task queue for
/// this registered work and an index of all current inputs in the task queue.
struct WorkerManager<IN>
Expand All @@ -147,9 +162,14 @@ where
{
/// Index of all current inputs inside the task queue organized in a hash set.
///
/// This allows us to avoid duplicate tasks by detecting if there is already a task in our
/// This allows us to detect duplicate tasks by checking if there is already a task in our
/// queue with the same input hash.
input_index: Arc<Mutex<HashSet<IN>>>,
///
/// An additional flag can be used to indicate that we want to requeue the same task again
/// after it completed. This is useful to account for more events which arrived _while_ the
/// task was processed. It is enough to only remember one of the potentially many events
/// arriving in this time, we're "batching" them for the next round.
input_index: Arc<Mutex<HashMap<IN, PostAction>>>,

/// FIFO queue of all tasks for this worker pool.
queue: Arc<Queue<QueueItem<IN>>>,
Expand All @@ -162,7 +182,7 @@ where
/// Returns a new worker manager.
pub fn new() -> Self {
Self {
input_index: Arc::new(Mutex::new(HashSet::new())),
input_index: Arc::new(Mutex::new(HashMap::new())),
queue: Arc::new(Queue::new()),
}
}
Expand Down Expand Up @@ -334,8 +354,8 @@ where

/// Queues up a new task in the regarding worker queue.
///
/// Tasks with duplicate input values which already exist in the queue will be silently
/// rejected.
/// Tasks with duplicate input values which already exist in the queue will be internally
/// batched.
pub fn queue(&mut self, task: Task<IN>) {
if let Err(err) = self.tx.send(task) {
error!("Error while broadcasting task: {}", err);
Expand Down Expand Up @@ -405,16 +425,33 @@ where
// Check if a task with the same input values already exists in queue
match input_index.lock() {
Ok(mut index) => {
if index.contains(&task.1) {
continue; // Task already exists
} else {
// Trigger status update
on_pending(task.clone());

// Generate a unique id for this new task and add it to queue
let next_id = counter.fetch_add(1, Ordering::Relaxed);
queue.push(QueueItem::new(next_id, task.1.clone()));
index.insert(task.1);
match index.get(&task.1) {
None => {
// 1. This task is completly new! We don't work on anything
// similar yet.
//
// Trigger status update:
on_pending(task.clone());

// Generate a unique id for this new task and add it to queue
debug!("Sending materializer {} task with input {} to the task queue.", task.worker_name(), task.input());
let next_id = counter.fetch_add(1, Ordering::Relaxed);
queue.push(QueueItem::new(next_id, task.1.clone()));
index.insert(task.1, PostAction::Idle);
}
Some(PostAction::Idle) => {
// 2. This is the first duplicate coming in, let's set the
// requeue flag to indicate that more work needs to be done
// when the current task completes
debug!("Duplicate materializer {} task already in progress, setting re-queue flag for task with input {} and not adding this task to the queue.", task.worker_name(), task.input());
index.insert(task.1, PostAction::Requeue);
}
Some(PostAction::Requeue) => {
// 3. We observed already one duplicate task coming in, let's
// ignore this one
debug!("Materializer {} task with input {} not sent to queue as a task for this document has already been re-queued.", task.worker_name(), task.input());
continue;
}
}
}
Err(err) => {
Expand Down Expand Up @@ -480,24 +517,6 @@ where
// Take this task and do work ..
let result = work.call(context.clone(), item.input()).await;

// Remove input index from queue
match input_index.lock() {
Ok(mut index) => {
index.remove(&item.input());
}
Err(err) => {
error!(
"Error while locking input index in worker {} for task {:?}: {}",
name, item, err
);

error_signal.trigger();
}
}

// Trigger removing the task from the task store
on_complete(item.input());

// Check the result
match result {
Ok(Some(list)) => {
Expand Down Expand Up @@ -525,6 +544,39 @@ where
);
}
_ => (), // Task succeeded, but nothing to dispatch
};

// Remove input index from queue and check if we should requeue that task
let requeue = match input_index.lock() {
Ok(mut index) => match index.remove(&item.input()) {
Some(PostAction::Idle) => false,
Some(PostAction::Requeue) => true,
None => {
error!("Incosistency detected in queue input index");
error_signal.trigger();
false
}
},
Err(err) => {
error!(
"Error while locking input index in worker {} for task {:?}: {}",
name, item, err
);

error_signal.trigger();
false
}
};

// Send the task again to dispatcher if requeue flag is set
if requeue {
if let Err(err) = tx.send(Task(name.clone(), item.input())) {
error!("Error while broadcasting task during requeue: {}", err);
error_signal.trigger();
}
} else {
// Trigger removing the task from the task store
on_complete(item.input());
}
}
});
Expand Down