Skip to content

Commit

Permalink
Merge branch 'main' into debug-pagination-query
Browse files Browse the repository at this point in the history
* main:
  Use `write_all`  when writing blob data to file system (#587)
  Use `p2panda-rs` `0.8.0` (#585)
  • Loading branch information
adzialocha committed Nov 15, 2023
2 parents e19acd4 + 1a7cefc commit 325210f
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 50 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Use `libp2p` `0.5.3` [#570](https://github.com/p2panda/aquadoggo/pull/570)
- Optimize test data generation methods [#572](https://github.com/p2panda/aquadoggo/pull/572)
- Use `SocketAddr` in network config instead of `MultiAddr` [#576](https://github.com/p2panda/aquadoggo/pull/576)
- Use `p2panda-rs` `0.8.0` [#585](https://github.com/p2panda/aquadoggo/pull/585)

## Fixed

Expand All @@ -36,6 +37,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Do not panic when blob does not have all pieces yet [#563](https://github.com/p2panda/aquadoggo/pull/563)
- Fix `blob` tasks being triggered too often [#578](https://github.com/p2panda/aquadoggo/pull/578)
- Fix `schema` tasks being triggered too often [#581](https://github.com/p2panda/aquadoggo/pull/581)
- Fix blobs getting corrupted when written to the file system [#587](https://github.com/p2panda/aquadoggo/pull/587)

## [0.5.0]

Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions aquadoggo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,15 @@ libp2p = { version = "0.52.3", features = [
"tokio",
"yamux",
"tcp",
"quic"
"quic",
] }
libp2p-allow-block-list = "0.2.0"
# libp2p-quic = { version = "0.9.2", features = ["tokio"] }
lipmaa-link = "0.2.2"
log = "0.4.19"
once_cell = "1.18.0"
openssl-probe = "0.1.5"
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "06b2fee74b40c779d85a0ef3b37bab8386b164ca", features = [
"storage-provider",
] }
p2panda-rs = { version = "0.8.0", features = ["storage-provider"] }
rand = "0.8.5"
regex = "1.9.3"
serde = { version = "1.0.152", features = ["derive"] }
Expand Down Expand Up @@ -99,7 +97,7 @@ http = "0.2.9"
hyper = "0.14.19"
libp2p-swarm-test = "0.2.0"
once_cell = "1.17.0"
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "06b2fee74b40c779d85a0ef3b37bab8386b164ca", features = [
p2panda-rs = { version = "0.8.0", features = [
"test-utils",
"storage-provider",
] }
Expand Down
15 changes: 5 additions & 10 deletions aquadoggo/src/db/stores/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,6 @@ async fn insert_document(
#[cfg(test)]
mod tests {
use p2panda_rs::api::next_args;
use p2panda_rs::document::materialization::build_graph;
use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::document::{DocumentBuilder, DocumentId, DocumentViewFields, DocumentViewId};
use p2panda_rs::entry::{LogId, SeqNum};
Expand Down Expand Up @@ -827,7 +826,7 @@ mod tests {

// Build the document from the operations.
let document_builder = DocumentBuilder::from(&operations);
let document = document_builder.build().unwrap();
let (document, _) = document_builder.build().unwrap();

// Insert the document into the store
let result = node.context.store.insert_document(&document).await;
Expand All @@ -843,8 +842,8 @@ mod tests {
.to_owned();

// Build the document with just the first operation.
let document_at_view_1 = document_builder
.build_to_view_id(Some(create_operation.into()))
let (document_at_view_1, _) = document_builder
.build_to_view_id(create_operation.into())
.unwrap();

// Insert it into the store as well.
Expand Down Expand Up @@ -1097,11 +1096,7 @@ mod tests {
.await
.unwrap();
let document_builder = DocumentBuilder::from(&operations);
let sorted_operations = build_graph(&document_builder.operations())
.unwrap()
.sort()
.unwrap()
.sorted();
let (_, sorted_operations) = document_builder.build().unwrap();

// We want to test that a document is updated.
let mut current_operations = Vec::new();
Expand All @@ -1112,7 +1107,7 @@ mod tests {
current_operations.push(operation.clone());

// We build each document.
let document = DocumentBuilder::new(current_operations.clone())
let (document, _) = DocumentBuilder::new(current_operations.clone())
.build()
.expect("Build document");

Expand Down
15 changes: 10 additions & 5 deletions aquadoggo/src/db/stores/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ impl From<&DocumentViewFieldRow> for OperationCursor {

#[cfg(test)]
mod tests {
use p2panda_rs::document::materialization::build_graph;
use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::document::{DocumentBuilder, DocumentId};
use p2panda_rs::identity::{KeyPair, PublicKey};
Expand Down Expand Up @@ -602,11 +601,17 @@ mod tests {
assert_eq!(operations_by_document_id.len(), 10);

// The operations should be in their topologically sorted order.
let operations = DocumentBuilder::from(&operations_by_document_id).operations();
let expected_operation_order =
build_graph(&operations).unwrap().sort().unwrap().sorted();
let document_builder = DocumentBuilder::from(&operations_by_document_id);
let (_, expected_operation_order) = document_builder.build().unwrap();

assert_eq!(operations, expected_operation_order);
assert_eq!(
operations_by_document_id.len(),
expected_operation_order.len()
);
for i in 0..operations_by_document_id.len() {
let operation_id: &OperationId = operations_by_document_id[i].id();
assert_eq!(operation_id, &expected_operation_order[i].0)
}
});
}

Expand Down
42 changes: 41 additions & 1 deletion aquadoggo/src/materializer/tasks/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub async fn blob_task(context: Context, input: TaskInput) -> TaskResult<TaskInp

while let Some(value) = stream.next().await {
match value {
Ok(buf) => file.write(&buf).await.map_err(|err| {
Ok(buf) => file.write_all(&buf).await.map_err(|err| {
TaskError::Critical(format!(
"Error occurred when writing to blob file @ {}: {}",
blob_view_path.display(),
Expand Down Expand Up @@ -172,6 +172,46 @@ mod tests {
})
}

#[rstest]
fn materializes_larger_blob_to_filesystem(key_pair: KeyPair) {
test_runner(|mut node: TestNode| async move {
// Publish blob
let length = 10e6 as u32; // 5MB
let blob_data: Vec<u8> = (0..length).map(|_| rand::random::<u8>()).collect();
let blob_view_id = add_blob(
&mut node,
&blob_data,
256 * 1000,
"application/octet-stream",
&key_pair,
)
.await;

// Run blob task
let result = blob_task(
node.context.clone(),
TaskInput::DocumentViewId(blob_view_id.clone()),
)
.await;

// It shouldn't fail
assert!(result.is_ok());
// It should return no extra tasks
assert!(result.unwrap().is_none());

// Construct the expected path to the blob view file
let base_path = &node.context.config.blobs_base_path;
let blob_path = base_path.join(blob_view_id.to_string());

// Read from this file
let retrieved_blob_data = fs::read(blob_path).await;

// Number of bytes for the publish and materialized blob should be the same
assert!(retrieved_blob_data.is_ok());
assert_eq!(blob_data.len(), retrieved_blob_data.unwrap().len());
})
}

#[rstest]
fn rejects_incorrect_schema(key_pair: KeyPair) {
test_runner(|mut node: TestNode| async move {
Expand Down
28 changes: 8 additions & 20 deletions aquadoggo/src/materializer/tasks/reduce.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::convert::TryFrom;

use log::{debug, trace};
use p2panda_rs::document::materialization::build_graph;
use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::document::{Document, DocumentBuilder, DocumentId, DocumentViewId};
use p2panda_rs::document::{DocumentBuilder, DocumentId, DocumentViewId};
use p2panda_rs::operation::traits::{AsOperation, WithPublicKey};
use p2panda_rs::operation::OperationId;
use p2panda_rs::storage_provider::traits::{DocumentStore, EntryStore, LogStore, OperationStore};
Expand Down Expand Up @@ -135,8 +132,8 @@ async fn reduce_document_view<O: AsOperation + WithId<OperationId> + WithPublicK

// Materialize document view
let document_builder: DocumentBuilder = operations.into();
let document = match document_builder.build_to_view_id(Some(document_view_id.to_owned())) {
Ok(document) => {
let document = match document_builder.build_to_view_id(document_view_id.to_owned()) {
Ok((document, _operations)) => {
// If the document was deleted, then we return nothing
debug!(
"Document materialized to view with id: {}",
Expand Down Expand Up @@ -193,8 +190,8 @@ async fn reduce_document<O: AsOperation + WithId<OperationId> + WithPublicKey>(
context: &Context,
operations: &Vec<O>,
) -> Result<Option<Vec<Task<TaskInput>>>, TaskError> {
match Document::try_from(operations) {
Ok(document) => {
match DocumentBuilder::from(operations).build() {
Ok((document, operations)) => {
// Make sure to not materialize and store document view twice
// @TODO: This can be a more efficient storage method. See issue:
// https://github.com/p2panda/aquadoggo/issues/431
Expand All @@ -209,13 +206,9 @@ async fn reduce_document<O: AsOperation + WithId<OperationId> + WithPublicKey>(
return Ok(None);
};

// @TODO: Make sorted operations available after building the document above so we can skip this step.
let operations = DocumentBuilder::from(operations).operations();
let sorted_operations = build_graph(&operations).unwrap().sort().unwrap().sorted();

// Iterate over the sorted document operations and update their sorted index on the
// operations_v1 table.
for (index, (operation_id, _, _)) in sorted_operations.iter().enumerate() {
for (index, (operation_id, _, _)) in operations.iter().enumerate() {
context
.store
.update_operation_index(operation_id, index as i32)
Expand Down Expand Up @@ -289,7 +282,6 @@ async fn reduce_document<O: AsOperation + WithId<OperationId> + WithPublicKey>(

#[cfg(test)]
mod tests {
use p2panda_rs::document::materialization::build_graph;
use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::document::{
DocumentBuilder, DocumentId, DocumentViewFields, DocumentViewId, DocumentViewValue,
Expand Down Expand Up @@ -442,11 +434,7 @@ mod tests {

// Sort the operations into their ready for reducing order
let document_builder = DocumentBuilder::from(&document_operations);
let sorted_document_operations = build_graph(&document_builder.operations())
.unwrap()
.sort()
.unwrap()
.sorted();
let (_, sorted_document_operations) = document_builder.build().unwrap();

// Reduce document to its current view and insert into database
let input = TaskInput::DocumentId(document_id.clone());
Expand Down Expand Up @@ -607,7 +595,7 @@ mod tests {

// Build the document from the operations
let document_builder = DocumentBuilder::from(&operations);
let document = document_builder.build().unwrap();
let (document, _) = document_builder.build().unwrap();

// Issue a reduce task for the document, which also inserts the current view
let input = TaskInput::DocumentId(document_id.clone());
Expand Down
12 changes: 6 additions & 6 deletions aquadoggo/src/test_utils/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,15 +563,15 @@ pub async fn populate_store(store: &SqlStore, config: &PopulateStoreConfig) -> V
// Conditionally create or update the document.
if let Some(mut document) = current_document {
document
.commit(&published_operation)
.commit(&published_operation.0, &published_operation)
.expect("Failed updating document");
current_document = Some(document);
} else {
current_document = Some(
DocumentBuilder::from(&vec![published_operation])
.build()
.expect("Failed to build document"),
);
let (document, _) = DocumentBuilder::from(&vec![published_operation])
.build()
.expect("Failed to build document");

current_document = Some(document);
}

// Set values used in the next iteration.
Expand Down
2 changes: 1 addition & 1 deletion aquadoggo_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ figment = { version = "0.10.10", features = ["toml", "env"] }
hex = "0.4.3"
libp2p = "0.52.0"
log = "0.4.20"
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "06b2fee74b40c779d85a0ef3b37bab8386b164ca" }
p2panda-rs = "0.8.0"
path-clean = "1.0.1"
serde = { version = "1.0.185", features = ["serde_derive"] }
tempfile = "3.7.0"
Expand Down

0 comments on commit 325210f

Please sign in to comment.