Skip to content

Commit

Permalink
Remove quick_commit from materialization service (#450)
Browse files Browse the repository at this point in the history
* Remove `quick_commit` from materialization service

* Update CHANGELOG
  • Loading branch information
sandreae authored Jul 14, 2023
1 parent 1bc3171 commit d9b196b
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 181 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Use `libp2p` `0.52.0` [#425](https://github.com/p2panda/aquadoggo/pull/425)
- Check for duplicate entries arriving to `Ingest` before consuming [#439](https://github.com/p2panda/aquadoggo/pull/439)
- Replicate entries in their topologically sorted document order [#442](https://github.com/p2panda/aquadoggo/pull/442)
- Remove `quick_commit` from materialization service [#450](https://github.com/p2panda/aquadoggo/pull/450)

### Fixed

Expand Down
185 changes: 4 additions & 181 deletions aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@

use anyhow::Result;
use log::{debug, warn};
use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::operation::OperationId;
use p2panda_rs::storage_provider::traits::{DocumentStore, OperationStore};
use p2panda_rs::storage_provider::traits::OperationStore;
use tokio::task;

use crate::bus::{ServiceMessage, ServiceSender};
use crate::context::Context;
use crate::db::types::StorageDocument;
use crate::manager::{ServiceReadySender, Shutdown};
use crate::materializer::tasks::{dependency_task, reduce_task, schema_task};
use crate::materializer::worker::{Factory, Task, TaskStatus};
Expand Down Expand Up @@ -107,45 +104,8 @@ pub async fn materializer_service(

match document_id {
Some(document_id) => {
// Get the document by it's document id.
let document = context
.store
.get_document(&document_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retrieving document {}",
document_id
)
});

let mut quick_commit_success = false;

// If a document was found we can try to incrementally update the document.
if document.is_some() {
// Attempt a quick commit of the document.
//
// This succeeds if the operation passed on the bus refers to the documents'
// current view in it's previous field.
if let Some(mut document) = document {
quick_commit_success =
quick_commit(&context, &mut document, &operation_id).await;

// If the commit succeeded and the document isn't now deleted dispatch "dependency" task for the documents new view.
if quick_commit_success && !document.is_deleted() {
factory.queue(Task::new(
"dependency",
TaskInput::DocumentViewId(document.view_id().to_owned()),
))
};
};
}

if !quick_commit_success {
// We couldn't perform a quick commit for this document.
// Dispatch "reduce" task which will materialize the regarding document.
factory.queue(Task::new("reduce", TaskInput::DocumentId(document_id)))
}
// Dispatch "reduce" task which will materialize the regarding document.
factory.queue(Task::new("reduce", TaskInput::DocumentId(document_id)))
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
Expand Down Expand Up @@ -175,44 +135,6 @@ pub async fn materializer_service(
Ok(())
}

async fn quick_commit(
context: &Context,
document: &mut StorageDocument,
operation_id: &OperationId,
) -> bool {
let operation = context
.store
.get_operation(operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retrieving operation {}",
operation_id
)
})
// An operation should exist for every operation id passed on the bus
.unwrap();

match document.commit(&operation) {
Ok(_) => {
// The quick commit was successful so we now insert the updated document.
context
.store
.insert_document(document)
.await
.unwrap_or_else(|_| {
panic!(
"Failed inserting document with view {} into database",
document.view_id()
)
});
debug!("Incrementally updated document {}", document.view_id());
true
}
Err(_) => false,
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand All @@ -233,12 +155,10 @@ mod tests {
use tokio::task;

use crate::context::Context;
use crate::materializer::service::quick_commit;
use crate::materializer::{Task, TaskInput};
use crate::schema::SchemaProvider;
use crate::test_utils::{
doggo_fields, doggo_schema, populate_and_materialize, populate_store_config, test_runner,
TestNode,
doggo_fields, doggo_schema, populate_store_config, test_runner, TestNode,
};
use crate::Configuration;

Expand Down Expand Up @@ -563,101 +483,4 @@ mod tests {
assert_eq!(document.id(), &entry_encoded.hash().into());
});
}

#[rstest]
fn performs_quick_commit(
#[from(populate_store_config)]
#[with(1, 1, 1)]
config: PopulateStoreConfig,
) {
test_runner(move |mut node: TestNode| async move {
let (key_pairs, document_ids) = populate_and_materialize(&mut node, &config).await;
let document_id = document_ids[0].clone();
let key_pair = &key_pairs[0];
let schema = config.schema;
let store = node.context.store.clone();

// Now we create and insert an UPDATE operation for this document which is pointing at
// the root CREATE operation.
let (encoded_entry, _) = send_to_store(
&node.context.store,
&operation(
Some(operation_fields(vec![(
"username",
OperationValue::String("melon".to_string()),
)])),
Some(document_id.as_str().parse().unwrap()),
schema.id().to_owned(),
),
&schema,
key_pair,
)
.await
.unwrap();

// Get the document.
let mut document = store.get_document(&document_id).await.unwrap().unwrap();

// We expect the quick commit to succeed as the new operation is pointing at the
// current document view id.
assert!(quick_commit(&node.context, &mut document, &encoded_entry.hash().into()).await);

// Get the document again.
let document = store.get_document(&document_id).await.unwrap().unwrap();
// It should have an updated value.
assert_eq!(
*document.get("username").unwrap(),
"melon".to_string().into()
)
})
}

#[rstest]
fn does_not_performs_quick_commit(
#[from(populate_store_config)]
#[with(2, 1, 1)]
config: PopulateStoreConfig,
) {
test_runner(move |mut node: TestNode| async move {
let (key_pairs, document_ids) = populate_and_materialize(&mut node, &config).await;
let document_id = document_ids[0].clone();
let key_pair = &key_pairs[0];
let schema = config.schema;
let store = node.context.store.clone();

// Now we create and insert an UPDATE operation for this document which is pointing at
// the root CREATE operation.
let (encoded_entry, _) = send_to_store(
&node.context.store,
&operation(
Some(operation_fields(vec![(
"username",
OperationValue::String("melon".to_string()),
)])),
Some(document_id.as_str().parse().unwrap()),
schema.id().to_owned(),
),
&schema,
key_pair,
)
.await
.unwrap();

// Get the document.
let mut document = store.get_document(&document_id).await.unwrap().unwrap();
// We expect the quick commit to fail as the operation isn't pointing at the current
// document view id.
assert!(
!quick_commit(&node.context, &mut document, &encoded_entry.hash().into()).await
);

// Get the document again.
let document = store.get_document(&document_id).await.unwrap().unwrap();
// It should be the original value still.
assert_eq!(
*document.get("username").unwrap(),
"bubu".to_string().into()
)
})
}
}

0 comments on commit d9b196b

Please sign in to comment.