Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reschedule pending tasks during startup #168

Merged
merged 28 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1265022
Prepare pending_tasks method
adzialocha Jun 17, 2022
ed25e10
Add tasks table
adzialocha Jun 17, 2022
2dd9e93
Add channel to inform about task status changes
adzialocha Jun 17, 2022
cf772ba
Fix SQL syntax error
adzialocha Jun 17, 2022
511c722
Nicer error messages
adzialocha Jun 17, 2022
e61a1c6
Move TODO comment into right match arm
adzialocha Jun 17, 2022
5e02d4f
Adding some more docstrings
adzialocha Jun 17, 2022
4270a36
Remove debug statement
adzialocha Jun 17, 2022
cc36001
Implement SQL queries to deal with tasks in database
adzialocha Jun 17, 2022
9c04573
Clippy happy, coder happy
adzialocha Jun 17, 2022
1114cb1
Minor change
adzialocha Jun 17, 2022
72a454f
Add tests for store, fix bugs
adzialocha Jun 17, 2022
a53e0a3
Add another test for on_update, fix bugs
adzialocha Jun 17, 2022
db73617
Silently fail when subscribers are missing
adzialocha Jun 17, 2022
fe5f97f
Add a test for materializer service
adzialocha Jun 17, 2022
7e61ee4
Happy clippy, happy developer
adzialocha Jun 17, 2022
2600052
Make tasks unique, check duplicates before insertion
adzialocha Jun 17, 2022
f27656e
Add a test for re-scheduled tasks
adzialocha Jun 17, 2022
e9966d8
Happy peppi
adzialocha Jun 17, 2022
85472b1
Add entry to CHANGELOG.md
adzialocha Jun 17, 2022
4d950e8
Reassure myself that internally dispatched tasks are also reported
adzialocha Jun 17, 2022
88e7fc0
Fix grammar
adzialocha Jun 17, 2022
a45d453
Add a test for checking for duplicates
adzialocha Jun 17, 2022
225010d
Make worker name private
adzialocha Jun 17, 2022
709eacb
Change equality of name in SQL query
adzialocha Jun 17, 2022
18e0367
Change method name to on_task_status_change
adzialocha Jun 20, 2022
821ff15
Avoid duplicate task rows using unique index
cafca Jun 22, 2022
5117402
Remove print stmt
cafca Jun 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Reduce and dependency tasks [#144](https://github.com/p2panda/aquadoggo/pull/144)
- GraphQL endpoints for replication [#100](https://github.com/p2panda/aquadoggo/pull/100)
- Inform materialization service about new operations [#161](https://github.com/p2panda/aquadoggo/pull/161)
- Reschedule pending tasks on startup [#168](https://github.com/p2panda/aquadoggo/pull/168)

### Changed

Expand Down
8 changes: 8 additions & 0 deletions aquadoggo/migrations/20220617115933_create-tasks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- SPDX-License-Identifier: AGPL-3.0-or-later

CREATE TABLE IF NOT EXISTS tasks (
name TEXT NOT NULL,
document_id TEXT NULL,
document_view_id TEXT NULL,
PRIMARY KEY (name, document_id, document_view_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just stumbling about this primary key with null columns. If I understand correctly this is actually only possible in Sqlite:

According to the SQL standard, PRIMARY KEY should always imply NOT NULL. Unfortunately, due to a bug in some early versions, this is not the case in SQLite. Unless the column is an INTEGER PRIMARY KEY or the table is a WITHOUT ROWID table or a STRICT table or the column is declared NOT NULL, SQLite allows NULL values in a PRIMARY KEY column. SQLite could be fixed to conform to the standard, but doing so might break legacy applications. Hence, it has been decided to merely document the fact that SQLite allows NULLs in most PRIMARY KEY columns.

https://www.sqlite.org/lang_createtable.html

);
20 changes: 16 additions & 4 deletions aquadoggo/src/db/errors.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use p2panda_rs::{
document::{DocumentId, DocumentViewId},
schema::{system::SystemSchemaError, SchemaError, SchemaIdError},
};
use p2panda_rs::document::{DocumentId, DocumentViewId};
use p2panda_rs::schema::system::SystemSchemaError;
use p2panda_rs::schema::{SchemaError, SchemaIdError};

/// `SQLStorage` errors.
#[derive(thiserror::Error, Debug)]
pub enum SqlStorageError {
#[error("SQL query failed: {0}")]
Transaction(String),

#[error("Insertion of row into table {0} did not show any effect")]
Insertion(String),

#[error("Deletion of row from table {0} did not show any effect")]
Deletion(String),
}

/// `DocumentStore` errors.
#[derive(thiserror::Error, Debug)]
Expand Down
2 changes: 2 additions & 0 deletions aquadoggo/src/db/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ pub mod document;
mod entry;
mod log;
mod operation;
mod task;

pub use self::log::LogRow;
pub use entry::EntryRow;
pub use operation::{OperationFieldsJoinedRow, OperationRow};
pub use task::TaskRow;
20 changes: 20 additions & 0 deletions aquadoggo/src/db/models/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use serde::Serialize;
use sqlx::FromRow;

/// Representation of a row from the `tasks` table as stored in the database.
///
/// This table holds all "pending" tasks of the materialization service worker.
#[derive(FromRow, Debug, Serialize, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct TaskRow {
/// Name of the task worker.
pub name: String,

/// `DocumentId` of the task input.
pub document_id: Option<String>,

/// `DocumentViewId` of the task input.
pub document_view_id: Option<String>,
}
6 changes: 2 additions & 4 deletions aquadoggo/src/db/provider.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use async_trait::async_trait;
use sqlx::query_scalar;

use p2panda_rs::document::DocumentId;
use p2panda_rs::hash::Hash;
use p2panda_rs::storage_provider::traits::StorageProvider;
use sqlx::query_scalar;

use crate::db::stores::StorageEntry;
use crate::db::stores::StorageLog;
use crate::db::stores::{StorageEntry, StorageLog};
use crate::db::Pool;
use crate::errors::StorageProviderResult;
use crate::graphql::client::{
Expand Down
1 change: 1 addition & 0 deletions aquadoggo/src/db/stores/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod entry;
mod log;
mod operation;
mod schema;
mod task;
#[cfg(test)]
pub mod test_utils;

Expand Down
210 changes: 210 additions & 0 deletions aquadoggo/src/db/stores/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use anyhow::Result;
use p2panda_rs::document::{DocumentId, DocumentViewId};
use sqlx::{query, query_as};

use crate::db::errors::SqlStorageError;
use crate::db::models::TaskRow;
use crate::db::provider::SqlStorage;
use crate::materializer::{Task, TaskInput};

/// Methods to interact with the `tasks` table in the database.
impl SqlStorage {
/// Inserts a "pending" task into the database.
pub async fn insert_task(&self, task: &Task<TaskInput>) -> Result<(), SqlStorageError> {
// Convert task input to correct database types
let task_input = task.input();
let document_id = task_input.document_id.as_ref().map(|id| id.as_str());
let document_view_id = task_input
.document_view_id
.as_ref()
.map(|view_id| view_id.as_str());

// Check first if this task already exists, to avoid duplicate rows
let task_row = query_as::<_, TaskRow>(
"
SELECT
name,
document_id,
document_view_id
FROM
tasks
",
)
.fetch_optional(&self.pool)
.await
.map_err(|err| SqlStorageError::Transaction(err.to_string()))?;

// If yes, we are already done here
if task_row.is_some() {
return Ok(());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could spare this query with a unique constraint on the tasks table couldn't we?

Copy link
Member Author

@adzialocha adzialocha Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, No 😅 When the pending tasks are loaded into the queue during rescheduling, they are not being removed from the database (yet). The worker itself will fire on_update with a new TaskStatus::Pending as soon as it got queued, which leads to a new row in the tasks table and ultimately to a duplicate.

The worker itself makes sure that no duplicates exist in the queues, but thats not the case with the database, this is why I've added that check before insertion.

I guess one could take the rows out of the database after we run get_tasks and before we queue them up. Or we need a new Factory::reschedule method which is used for rescheduling tasks which should lead to not firing TaskStatus::Pending? 🤔


// Insert task into database
let result = query(
"
INSERT INTO
tasks (
name,
document_id,
document_view_id
)
VALUES
($1, $2, $3)
",
)
.bind(task.worker_name())
.bind(document_id)
.bind(document_view_id)
.execute(&self.pool)
.await
.map_err(|err| SqlStorageError::Transaction(err.to_string()))?;

if result.rows_affected() != 1 {
Err(SqlStorageError::Insertion("tasks".into()))
} else {
Ok(())
}
}

/// Removes a "pending" task from the database.
pub async fn remove_task(&self, task: &Task<TaskInput>) -> Result<(), SqlStorageError> {
// Convert task input to correct database types
let task_input = task.input();
let document_id = task_input.document_id.as_ref().map(|id| id.as_str());
let document_view_id = task_input
.document_view_id
.as_ref()
.map(|view_id| view_id.as_str());

// Remove task from database
let result = query(
"
DELETE FROM
tasks
WHERE
name = $1
AND document_id IS $2
AND document_view_id IS $3
",
)
.bind(task.worker_name())
.bind(document_id)
.bind(document_view_id)
.execute(&self.pool)
.await
.map_err(|err| SqlStorageError::Transaction(err.to_string()))?;

if result.rows_affected() != 1 {
Err(SqlStorageError::Deletion("tasks".into()))
} else {
Ok(())
}
}

/// Returns "pending" tasks of the materialization service worker.
pub async fn get_tasks(&self) -> Result<Vec<Task<TaskInput>>, SqlStorageError> {
let task_rows = query_as::<_, TaskRow>(
"
SELECT
name,
document_id,
document_view_id
FROM
tasks
",
)
.fetch_all(&self.pool)
.await
.map_err(|err| SqlStorageError::Transaction(err.to_string()))?;

// Convert database rows into correct p2panda types
let mut tasks: Vec<Task<TaskInput>> = Vec::new();
for task in task_rows {
let document_id: Option<DocumentId> = task.document_id.map(|id| {
id.parse()
.unwrap_or_else(|_| panic!("Invalid document id stored in database {}", id))
});

let document_view_id: Option<DocumentViewId> = task.document_view_id.map(|view_id| {
view_id.parse().unwrap_or_else(|_| {
panic!("Invalid document view id stored in database: {}", view_id)
})
});

tasks.push(Task::new(
&task.name,
TaskInput::new(document_id, document_view_id),
));
}

Ok(tasks)
}
}

#[cfg(test)]
mod tests {
use p2panda_rs::document::{DocumentId, DocumentViewId};
use p2panda_rs::test_utils::fixtures::{document_id, document_view_id};
use rstest::rstest;

use crate::db::stores::test_utils::{test_db, TestSqlStore};
use crate::materializer::{Task, TaskInput};

#[rstest]
#[tokio::test]
async fn insert_get_remove_tasks(
document_view_id: DocumentViewId,
#[from(test_db)]
#[future]
db: TestSqlStore,
) {
let db = db.await;

// Prepare test data
let task = Task::new("reduce", TaskInput::new(None, Some(document_view_id)));

// Insert task
let result = db.store.insert_task(&task).await;
assert!(result.is_ok(), "{:?}", result);

// Check if task exists in database
let result = db.store.get_tasks().await;
assert_eq!(result.unwrap(), vec![task.clone()]);

// Remove task
let result = db.store.remove_task(&task).await;
assert!(result.is_ok(), "{:?}", result);

// Check if all tasks got removed
let result = db.store.get_tasks().await;
assert_eq!(result.unwrap(), vec![]);
}

#[rstest]
#[tokio::test]
async fn avoid_duplicates(
document_id: DocumentId,
#[from(test_db)]
#[future]
db: TestSqlStore,
) {
let db = db.await;

// Prepare test data
let task = Task::new("reduce", TaskInput::new(Some(document_id), None));

// Insert task
let result = db.store.insert_task(&task).await;
assert!(result.is_ok(), "{:?}", result);

// Insert the same thing again, it should silently fail
let result = db.store.insert_task(&task).await;
assert!(result.is_ok(), "{:?}", result);

// Check for duplicates
let result = db.store.get_tasks().await;
assert_eq!(result.unwrap().len(), 1);
}
}
4 changes: 2 additions & 2 deletions aquadoggo/src/db/stores/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ pub async fn insert_entry_operation_and_view(
(document_id, document_view_id)
}

/// Container for `SqlStore` with access to the document ids and key_pairs
/// used in the pre-populated database for testing.
/// Container for `SqlStore` with access to the document ids and key_pairs used in the
/// pre-populated database for testing.
pub struct TestSqlStore {
pub store: SqlStorage,
pub key_pairs: Vec<KeyPair>,
Expand Down
1 change: 0 additions & 1 deletion aquadoggo/src/db/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ pub fn parse_document_view_field_rows(

#[cfg(test)]
mod tests {

use p2panda_rs::document::DocumentViewValue;
use p2panda_rs::operation::{
AsOperation, OperationId, OperationValue, PinnedRelation, PinnedRelationList, Relation,
Expand Down
12 changes: 9 additions & 3 deletions aquadoggo/src/graphql/client/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,15 @@ impl ClientMutationRoot {

// Send new operation on service communication bus, this will arrive eventually at
// the materializer service
tx.send(ServiceMessage::NewOperation(
verified_operation.operation_id().to_owned(),
))?;
if tx
.send(ServiceMessage::NewOperation(
verified_operation.operation_id().to_owned(),
))
.is_err()
{
// Silently fail here as we don't mind if there are no subscribers. We have
// tests in other places to check if messages arrive.
}

Ok(response)
}
Expand Down
10 changes: 10 additions & 0 deletions aquadoggo/src/materializer/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,23 @@

use p2panda_rs::document::{DocumentId, DocumentViewId};

/// Input of every task worker containing all information we need to process.
///
/// The workers are designed such that they EITHER await a `DocumentId` OR a `DocumentViewId`.
/// Setting both values `None` or both values `Some` will be rejected.
#[derive(Clone, Eq, PartialEq, Debug, Hash)]
pub struct TaskInput {
/// Specifying a `DocumentId`, indicating that we're interested in processing the "latest"
/// state of that document.
pub document_id: Option<DocumentId>,

/// Specifying a `DocumentViewId`, indicating that we're interested in processing the state of
/// that document view at this point.
pub document_view_id: Option<DocumentViewId>,
}

impl TaskInput {
/// Returns a new instance of `TaskInput`.
pub fn new(document_id: Option<DocumentId>, document_view_id: Option<DocumentViewId>) -> Self {
Self {
document_id,
Expand Down
1 change: 1 addition & 0 deletions aquadoggo/src/materializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ mod worker;

pub use input::TaskInput;
pub use service::materializer_service;
pub use worker::Task;
Loading