diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d0d71f2f..8ee833c22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Service error handling, refactor runtime [#92](https://github.com/p2panda/aquadoggo/pull/92) - Refactor module structure, propagate errors in worker to service manager [#97](https://github.com/p2panda/aquadoggo/pull/97) - Restructure storage modules and remove JSON RPC [#101](https://github.com/p2panda/aquadoggo/pull/101) +- Implement new methods required for replication defined by `EntryStore` trait [#102](https://github.com/p2panda/aquadoggo/pull/102) ### Changed diff --git a/Cargo.lock b/Cargo.lock index d0f72b733..5b77dac95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -100,6 +100,7 @@ dependencies = [ "http", "hyper", "jsonrpc-v2", + "lipmaa-link 0.2.2", "log", "openssl-probe", "p2panda-rs", @@ -513,7 +514,7 @@ dependencies = [ "blake2b_simd", "ed25519-dalek", "hex", - "lipmaa-link", + "lipmaa-link 0.1.1", "rayon", "serde", "serde_derive", @@ -1029,6 +1030,12 @@ dependencies = [ "const-oid 0.7.1", ] +[[package]] +name = "difflib" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" + [[package]] name = "digest" version = "0.8.1" @@ -1099,6 +1106,12 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "ecdsa" version = "0.12.4" @@ -1297,6 +1310,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "float-ord" version = "0.3.2" @@ -1331,6 +1353,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9d758e60b45e8d749c89c1b389ad8aee550f86aa12e2b9298b546dda7a82ab1" + [[package]] name = "futures" version = "0.3.21" @@ -1946,6 +1974,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8fbee9dd9064a4e7405d9550402d8d0b9ddebec0f31d84c0f9337a1389c6ef9" +[[package]] +name = "lipmaa-link" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36f5f0defb4c1af68c2b96ef3c9dee86cc4d113a21e1845fd2c557b36c16fbcb" + [[package]] name = "lock_api" version = "0.4.7" @@ -2065,6 +2099,33 @@ dependencies = [ "winapi", ] +[[package]] +name = "mockall" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d4d70639a72f972725db16350db56da68266ca368b2a1fe26724a903ad3d6b8" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79ef208208a0dea3f72221e26e904cdc6db2e481d9ade89081ddd494f1dbaa6b" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "multer" version = "2.0.2" @@ -2093,6 +2154,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + [[package]] name = "ntapi" version = "0.3.7" @@ -2247,7 +2314,7 @@ dependencies = [ [[package]] name = "p2panda-rs" version = "0.3.0" -source = "git+https://github.com/p2panda/p2panda?branch=main#a1ab8f4d0dbf7208dde0c28d0ab9542f48315b43" +source = "git+https://github.com/p2panda/p2panda?branch=aquadoggo-wip#8bc8eb5466a881c06265e3d2b4a2ab36d404f1d0" dependencies = [ "arrayvec 0.5.2", "async-trait", @@ -2260,8 +2327,10 @@ dependencies = [ "hex", "js-sys", "lazy_static", + "lipmaa-link 0.2.2", "log", "memory_keystore", + "mockall", "openmls", "openmls_traits", "rand 0.7.3", @@ -2519,6 +2588,36 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "predicates" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5aab5be6e4732b473071984b3164dbbfb7a3674d30ea5ff44410b6bcd960c3c" +dependencies = [ + "difflib", + "float-cmp", + "itertools", + "normalize-line-endings", + "predicates-core", + "regex", +] + +[[package]] +name = "predicates-core" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da1c2388b1513e1b605fcec39a95e0a9e8ef088f71443ef37099fa9ae6673fcb" + +[[package]] +name = "predicates-tree" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d86de6de25020a36c6d3643a86d9a6a9f552107c0559c60ea03551b5e16c032" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "proc-macro-crate" version = "1.1.3" @@ -3390,6 +3489,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "termtree" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "507e9898683b6c43a9aa55b64259b721b52ba226e0f3779137e50ad114a4c90b" + [[package]] name = "textwrap" version = "0.11.0" diff --git a/aquadoggo/Cargo.toml b/aquadoggo/Cargo.toml index d1762fb1c..b3a5f1118 100644 --- a/aquadoggo/Cargo.toml +++ b/aquadoggo/Cargo.toml @@ -29,12 +29,13 @@ jsonrpc-v2 = { version = "0.10.1", features = [ "easy-errors", "bytes-v05", ], default-features = false } +lipmaa-link = "0.2.2" log = "0.4.14" openssl-probe = "0.1.4" # We can not publish the `aquadoggo` crate yet, since `p2panda-rs` is an # unpublished dependency. # @TODO: This points at a WIP branch in p2panda_rs which is used as long as things are slightly unstable -p2panda-rs = { git = "https://github.com/p2panda/p2panda", branch = "main" } +p2panda-rs = { git = "https://github.com/p2panda/p2panda", branch = "aquadoggo-wip" } rand = "0.8.4" serde = { version = "1.0.130", features = ["derive"] } serde_json = "1.0.67" diff --git a/aquadoggo/src/db/models/entry.rs b/aquadoggo/src/db/models/entry.rs index dfe5580f4..cc719c76a 100644 --- a/aquadoggo/src/db/models/entry.rs +++ b/aquadoggo/src/db/models/entry.rs @@ -1,9 +1,5 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use p2panda_rs::entry::{decode_entry, Entry, EntrySigned}; -use p2panda_rs::operation::OperationEncoded; -use p2panda_rs::storage_provider::errors::ValidationError; -use p2panda_rs::Validate; use serde::Serialize; use sqlx::FromRow; @@ -35,37 +31,3 @@ pub struct EntryRow { /// Sequence number of this entry. pub seq_num: String, } - -impl EntryRow { - pub fn entry_decoded(&self) -> Entry { - // Unwrapping as validation occurs in `EntryWithOperation`. - decode_entry(&self.entry_signed(), self.operation_encoded().as_ref()).unwrap() - } - - pub fn entry_signed(&self) -> EntrySigned { - EntrySigned::new(&self.entry_bytes).unwrap() - } - - pub fn operation_encoded(&self) -> Option { - Some(OperationEncoded::new(&self.payload_bytes.clone().unwrap()).unwrap()) - } -} - -impl Validate for EntryRow { - type Error = ValidationError; - - fn validate(&self) -> Result<(), Self::Error> { - self.entry_signed().validate()?; - if let Some(operation) = self.operation_encoded() { - operation.validate()?; - } - decode_entry(&self.entry_signed(), self.operation_encoded().as_ref())?; - Ok(()) - } -} - -impl AsRef for EntryRow { - fn as_ref(&self) -> &Self { - self - } -} diff --git a/aquadoggo/src/db/provider.rs b/aquadoggo/src/db/provider.rs index d63c740c2..81130ac37 100644 --- a/aquadoggo/src/db/provider.rs +++ b/aquadoggo/src/db/provider.rs @@ -1,13 +1,13 @@ // SPDX-License-Identifier: AGPL-3.0-or-later - use async_trait::async_trait; +use sqlx::query_scalar; + use p2panda_rs::document::DocumentId; use p2panda_rs::hash::Hash; use p2panda_rs::storage_provider::traits::StorageProvider; -use sqlx::query_scalar; -use crate::db::models::entry::EntryRow; -use crate::db::models::log::LogRow; +use crate::db::stores::entry::StorageEntry; +use crate::db::stores::log::StorageLog; use crate::db::Pool; use crate::errors::StorageProviderResult; use crate::rpc::{EntryArgsRequest, EntryArgsResponse, PublishEntryRequest, PublishEntryResponse}; @@ -18,7 +18,7 @@ pub struct SqlStorage { /// A `StorageProvider` implementation based on `sqlx` that supports SQLite and PostgreSQL databases. #[async_trait] -impl StorageProvider for SqlStorage { +impl StorageProvider for SqlStorage { type EntryArgsResponse = EntryArgsResponse; type EntryArgsRequest = EntryArgsRequest; type PublishEntryResponse = PublishEntryResponse; diff --git a/aquadoggo/src/db/stores/entry.rs b/aquadoggo/src/db/stores/entry.rs index 8e9c6d5f7..5b45fbd3b 100644 --- a/aquadoggo/src/db/stores/entry.rs +++ b/aquadoggo/src/db/stores/entry.rs @@ -1,10 +1,12 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use async_trait::async_trait; -use bamboo_rs_core_ed25519_yasmf::lipmaa; +use lipmaa_link::get_lipmaa_links_back_to; +use p2panda_rs::storage_provider::ValidationError; +use p2panda_rs::Validate; use sqlx::{query, query_as}; -use p2panda_rs::entry::{decode_entry, EntrySigned, LogId, SeqNum}; +use p2panda_rs::entry::{decode_entry, Entry, EntrySigned, LogId, SeqNum}; use p2panda_rs::hash::Hash; use p2panda_rs::identity::Author; use p2panda_rs::operation::{Operation, OperationEncoded}; @@ -15,30 +17,70 @@ use p2panda_rs::storage_provider::traits::{AsStorageEntry, EntryStore}; use crate::db::models::entry::EntryRow; use crate::db::provider::SqlStorage; +#[derive(Debug, Clone, PartialEq)] + +pub struct StorageEntry { + entry_signed: EntrySigned, + operation_encoded: OperationEncoded, +} + +impl StorageEntry { + pub fn entry_decoded(&self) -> Entry { + // Unwrapping as validation occurs in `EntryWithOperation`. + decode_entry(self.entry_signed(), self.operation_encoded()).unwrap() + } + + pub fn entry_signed(&self) -> &EntrySigned { + &self.entry_signed + } + + pub fn operation_encoded(&self) -> Option<&OperationEncoded> { + Some(&self.operation_encoded) + } +} + +impl Validate for StorageEntry { + type Error = ValidationError; + + fn validate(&self) -> Result<(), Self::Error> { + self.entry_signed().validate()?; + if let Some(operation) = self.operation_encoded() { + operation.validate()?; + } + decode_entry(self.entry_signed(), self.operation_encoded())?; + Ok(()) + } +} + +impl From for StorageEntry { + fn from(entry_row: EntryRow) -> Self { + // Unwrapping everything here as we assume values coming from the database are valid. + let entry_signed = EntrySigned::new(&entry_row.entry_bytes).unwrap(); + let operation_encoded = OperationEncoded::new(&entry_row.payload_bytes.unwrap()).unwrap(); + StorageEntry::new(&entry_signed, &operation_encoded).unwrap() + } +} + /// Implement `AsStorageEntry` trait for `EntryRow`. -impl AsStorageEntry for EntryRow { +impl AsStorageEntry for StorageEntry { type AsStorageEntryError = EntryStorageError; fn new( entry_signed: &EntrySigned, operation_encoded: &OperationEncoded, ) -> Result { - let entry = decode_entry(entry_signed, Some(operation_encoded)) - .map_err(|e| EntryStorageError::Custom(e.to_string()))?; + let storage_entry = Self { + entry_signed: entry_signed.clone(), + operation_encoded: operation_encoded.clone(), + }; - Ok(Self { - author: entry_signed.author().as_str().into(), - entry_bytes: entry_signed.as_str().into(), - entry_hash: entry_signed.hash().as_str().into(), - log_id: entry.log_id().as_u64().to_string(), - payload_bytes: Some(operation_encoded.as_str().to_string()), - payload_hash: entry_signed.payload_hash().as_str().into(), - seq_num: entry.seq_num().as_u64().to_string(), - }) + storage_entry.validate()?; + + Ok(storage_entry) } fn author(&self) -> Author { - Author::new(self.author.as_ref()).unwrap() + self.entry_signed.author() } fn hash(&self) -> Hash { @@ -67,16 +109,20 @@ impl AsStorageEntry for EntryRow { fn operation(&self) -> Operation { let operation_encoded = self.operation_encoded().unwrap(); - Operation::from(&operation_encoded) + Operation::from(operation_encoded) } } /// Trait which handles all storage actions relating to `Entries`. #[async_trait] -impl EntryStore for SqlStorage { +impl EntryStore for SqlStorage { /// Insert an entry into storage. - async fn insert_entry(&self, entry: EntryRow) -> Result { - let rows_affected = query( + /// + /// Returns a result containing `true` when the insertion occured (one row affected) + /// returns `false` when an unexpected number of rows was affected. Errors when + /// a fatal storage error occured. + async fn insert_entry(&self, entry: StorageEntry) -> Result<(), EntryStorageError> { + let insert_entry_result = query( " INSERT INTO entries ( @@ -101,19 +147,62 @@ impl EntryStore for SqlStorage { .bind(entry.seq_num().as_u64().to_string()) .execute(&self.pool) .await - .map_err(|e| EntryStorageError::Custom(e.to_string()))? - .rows_affected(); + .map_err(|e| EntryStorageError::Custom(e.to_string()))?; + + if insert_entry_result.rows_affected() != 1 { + return Err(EntryStorageError::Custom(format!( + "Unexpected number of inserts occured for entry with id: {}", + entry.hash() + ))); + } - Ok(rows_affected == 1) + Ok(()) } - /// Returns entry at sequence position within an author's log. - async fn entry_at_seq_num( + /// Get an entry from storage by it's hash id. + /// + /// Returns a result containing the entry wrapped in an option if it was + /// found successfully. Returns `None` if the entry was not found in storage. + /// Errors when a fatal storage error occured. + async fn get_entry_by_hash( + &self, + hash: &Hash, + ) -> Result, EntryStorageError> { + let entry_row = query_as::<_, EntryRow>( + " + SELECT + author, + entry_bytes, + entry_hash, + log_id, + payload_bytes, + payload_hash, + seq_num + FROM + entries + WHERE + entry_hash = $1 + ", + ) + .bind(hash.as_str()) + .fetch_optional(&self.pool) + .await + .map_err(|e| EntryStorageError::Custom(e.to_string()))?; + + Ok(entry_row.map(|row| row.into())) + } + + /// Get an entry at a sequence position within an author's log. + /// + /// Returns a result containing the entry wrapped in an option if it was found + /// successfully. Returns None if the entry was not found in storage. Errors when + /// a fatal storage error occured. + async fn get_entry_at_seq_num( &self, author: &Author, log_id: &LogId, seq_num: &SeqNum, - ) -> Result, EntryStorageError> { + ) -> Result, EntryStorageError> { let entry_row = query_as::<_, EntryRow>( " SELECT @@ -139,15 +228,19 @@ impl EntryStore for SqlStorage { .await .map_err(|e| EntryStorageError::Custom(e.to_string()))?; - Ok(entry_row) + Ok(entry_row.map(|row| row.into())) } - /// Returns the latest Bamboo entry of an author's log. - async fn latest_entry( + /// Get the latest entry of an author's log. + /// + /// Returns a result containing the latest log entry wrapped in an option if an + /// entry was found. Returns None if the specified author and log could not be + /// found in storage. Errors when a fatal storage error occured. + async fn get_latest_entry( &self, author: &Author, log_id: &LogId, - ) -> Result, EntryStorageError> { + ) -> Result, EntryStorageError> { let entry_row = query_as::<_, EntryRow>( " SELECT @@ -164,7 +257,7 @@ impl EntryStore for SqlStorage { author = $1 AND log_id = $2 ORDER BY - seq_num DESC + CAST(seq_num AS INTEGER) DESC LIMIT 1 ", @@ -175,11 +268,18 @@ impl EntryStore for SqlStorage { .await .map_err(|e| EntryStorageError::Custom(e.to_string()))?; - Ok(entry_row) + Ok(entry_row.map(|row| row.into())) } - /// Return vector of all entries of a given schema - async fn by_schema(&self, schema: &SchemaId) -> Result, EntryStorageError> { + /// Get all entries of a given schema + /// + /// Returns a result containing a vector of all entries which follow the passed + /// schema (identified by it's `SchemaId`). If no entries exist, or the schema + /// is not known by this node, then an empty vecot is returned. + async fn get_entries_by_schema( + &self, + schema: &SchemaId, + ) -> Result, EntryStorageError> { let entries = query_as::<_, EntryRow>( " SELECT @@ -204,49 +304,362 @@ impl EntryStore for SqlStorage { .await .map_err(|e| EntryStorageError::Custom(e.to_string()))?; - Ok(entries) + Ok(entries.into_iter().map(|row| row.into()).collect()) + } + + /// Get all entries of a given schema + /// + /// Returns a result containing a vector of all entries which follow the passed + /// schema (identified by it's `SchemaId`). If no entries exist, or the schema + /// is not known by this node, then an empty vector is returned. + async fn get_paginated_log_entries( + &self, + author: &Author, + log_id: &LogId, + seq_num: &SeqNum, + max_number_of_entries: usize, + ) -> Result, EntryStorageError> { + let max_seq_num = seq_num.as_u64() as usize + max_number_of_entries - 1; + let entries = query_as::<_, EntryRow>( + " + SELECT + author, + entry_bytes, + entry_hash, + log_id, + payload_bytes, + payload_hash, + seq_num + FROM + entries + WHERE + author = $1 + AND log_id = $2 + AND CAST(seq_num AS INTEGER) BETWEEN $3 and $4 + ORDER BY + CAST(seq_num AS INTEGER) + ", + ) + .bind(author.as_str()) + .bind(log_id.as_u64().to_string()) + .bind(seq_num.as_u64().to_string()) + .bind((max_seq_num as u64).to_string()) + .fetch_all(&self.pool) + .await + .map_err(|e| EntryStorageError::Custom(e.to_string()))?; + + Ok(entries.into_iter().map(|row| row.into()).collect()) + } + + /// Get all entries which make up the certificate pool for a specified entry. + /// + /// Returns a result containing a vector of all stored entries which are part + /// the passed entries' certificate pool. Errors if a fatal storage error + /// occurs. + /// + /// It is worth noting that this method doesn't check if the certificate pool + /// is complete, it only returns entries which are part of the pool and found + /// in storage. If an entry was not stored, then the pool may be incomplete. + async fn get_all_skiplink_entries_for_entry( + &self, + author: &Author, + log_id: &LogId, + initial_seq_num: &SeqNum, + ) -> Result, EntryStorageError> { + let cert_pool_seq_nums = get_lipmaa_links_back_to(initial_seq_num.as_u64(), 1) + .iter() + .map(|seq_num| seq_num.to_string()) + .collect::>() + .join(","); + + // Formatting query string in this way as `sqlx` currently + // doesn't support binding list arguments for IN queries. + let sql_str = format!( + "SELECT + author, + entry_bytes, + entry_hash, + log_id, + payload_bytes, + payload_hash, + seq_num + FROM + entries + WHERE + author = $1 + AND log_id = $2 + AND CAST(seq_num AS INTEGER) IN ({}) + ORDER BY + CAST(seq_num AS INTEGER) DESC + ", + cert_pool_seq_nums + ); + + let entries = query_as::<_, EntryRow>(sql_str.as_str()) + .bind(author.as_str()) + .bind(log_id.as_u64().to_string()) + .fetch_all(&self.pool) + .await + .map_err(|e| EntryStorageError::Custom(e.to_string()))?; + + Ok(entries.into_iter().map(|row| row.into()).collect()) } } #[cfg(test)] mod tests { - use p2panda_rs::entry::LogId; + use std::convert::TryFrom; + use std::str::FromStr; + + use p2panda_rs::document::DocumentId; + use p2panda_rs::entry::{sign_and_encode, Entry}; + use p2panda_rs::entry::{LogId, SeqNum}; use p2panda_rs::hash::Hash; - use p2panda_rs::identity::Author; + use p2panda_rs::identity::{Author, KeyPair}; + use p2panda_rs::operation::{Operation, OperationEncoded, OperationFields, OperationValue}; use p2panda_rs::schema::SchemaId; - use p2panda_rs::storage_provider::traits::EntryStore; + use p2panda_rs::storage_provider::traits::{AsStorageEntry, EntryStore, StorageProvider}; + use p2panda_rs::test_utils::constants::{DEFAULT_PRIVATE_KEY, TEST_SCHEMA_ID}; - use crate::db::provider::SqlStorage; - use crate::test_helpers::initialize_db; + use crate::db::stores::entry::StorageEntry; + use crate::db::stores::test_utils::test_db; + use crate::rpc::EntryArgsRequest; - const TEST_AUTHOR: &str = "1a8a62c5f64eed987326513ea15a6ea2682c256ac57a418c1c92d96787c8b36e"; + #[tokio::test] + async fn insert_entry() { + let storage_provider = test_db(100).await; + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author = Author::try_from(key_pair.public_key().to_owned()).unwrap(); + let log_id = LogId::new(1); + let schema = SchemaId::from_str(TEST_SCHEMA_ID).unwrap(); + + // Derive the document_id by fetching the first entry + let document_id: DocumentId = storage_provider + .get_entry_at_seq_num(&author, &log_id, &SeqNum::new(1).unwrap()) + .await + .unwrap() + .unwrap() + .hash() + .into(); + + let next_entry_args = storage_provider + .get_entry_args(&EntryArgsRequest { + author: author.clone(), + document: Some(document_id.clone()), + }) + .await + .unwrap(); + + let mut fields = OperationFields::new(); + fields + .add("username", OperationValue::Text("stitch".to_owned())) + .unwrap(); + + let update_operation = Operation::new_update( + schema.clone(), + vec![next_entry_args.entry_hash_backlink.clone().unwrap().into()], + fields.clone(), + ) + .unwrap(); + + let update_entry = Entry::new( + &next_entry_args.log_id, + Some(&update_operation), + next_entry_args.entry_hash_skiplink.as_ref(), + next_entry_args.entry_hash_backlink.as_ref(), + &next_entry_args.seq_num, + ) + .unwrap(); + + let entry_encoded = sign_and_encode(&update_entry, &key_pair).unwrap(); + let operation_encoded = OperationEncoded::try_from(&update_operation).unwrap(); + let doggo_entry = StorageEntry::new(&entry_encoded, &operation_encoded).unwrap(); + let result = storage_provider.insert_entry(doggo_entry).await; + + assert!(result.is_ok()) + } + + #[tokio::test] + async fn try_insert_non_unique_entry() { + let storage_provider = test_db(100).await; + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author = Author::try_from(key_pair.public_key().to_owned()).unwrap(); + let log_id = LogId::new(1); + + let first_entry = storage_provider + .get_entry_at_seq_num(&author, &log_id, &SeqNum::new(1).unwrap()) + .await + .unwrap() + .unwrap(); + + let duplicate_doggo_entry = StorageEntry::new( + first_entry.entry_signed(), + first_entry.operation_encoded().unwrap(), + ) + .unwrap(); + let result = storage_provider.insert_entry(duplicate_doggo_entry).await; + + assert_eq!(result.unwrap_err().to_string(), "Error occured during `EntryStorage` request in storage provider: error returned from database: UNIQUE constraint failed: entries.author, entries.log_id, entries.seq_num") + } #[tokio::test] async fn latest_entry() { - let pool = initialize_db().await; - let storage_provider = SqlStorage { pool }; + let storage_provider = test_db(100).await; - let author = Author::new(TEST_AUTHOR).unwrap(); + let author_not_in_db = Author::try_from(*KeyPair::new().public_key()).unwrap(); let log_id = LogId::new(1); let latest_entry = storage_provider - .latest_entry(&author, &log_id) + .get_latest_entry(&author_not_in_db, &log_id) .await .unwrap(); assert!(latest_entry.is_none()); + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author_in_db = Author::try_from(*key_pair.public_key()).unwrap(); + + let latest_entry = storage_provider + .get_latest_entry(&author_in_db, &log_id) + .await + .unwrap(); + assert_eq!(latest_entry.unwrap().seq_num(), SeqNum::new(100).unwrap()); } #[tokio::test] async fn entries_by_schema() { - let pool = initialize_db().await; - let storage_provider = SqlStorage { pool }; + let storage_provider = test_db(100).await; - let schema = SchemaId::new_application( + let schema_not_in_the_db = SchemaId::new_application( "venue", &Hash::new_from_bytes(vec![1, 2, 3]).unwrap().into(), ); - let entries = storage_provider.by_schema(&schema).await.unwrap(); - assert!(entries.len() == 0); + let entries = storage_provider + .get_entries_by_schema(&schema_not_in_the_db) + .await + .unwrap(); + assert!(entries.is_empty()); + + let schema_in_the_db = SchemaId::new(TEST_SCHEMA_ID).unwrap(); + + let entries = storage_provider + .get_entries_by_schema(&schema_in_the_db) + .await + .unwrap(); + assert!(entries.len() == 100); + } + + #[tokio::test] + async fn entry_by_seq_num() { + let storage_provider = test_db(100).await; + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author = Author::try_from(*key_pair.public_key()).unwrap(); + + for seq_num in [1, 10, 56, 77, 90] { + let seq_num = SeqNum::new(seq_num).unwrap(); + let entry = storage_provider + .get_entry_at_seq_num(&author, &LogId::new(1), &seq_num) + .await + .unwrap(); + assert_eq!(entry.unwrap().seq_num(), seq_num) + } + + let wrong_log = LogId::new(2); + let entry = storage_provider + .get_entry_at_seq_num(&author, &wrong_log, &SeqNum::new(1).unwrap()) + .await + .unwrap(); + assert!(entry.is_none()); + + let author_not_in_db = Author::try_from(*KeyPair::new().public_key()).unwrap(); + let entry = storage_provider + .get_entry_at_seq_num(&author_not_in_db, &LogId::new(1), &SeqNum::new(1).unwrap()) + .await + .unwrap(); + assert!(entry.is_none()); + + let seq_num_not_in_log = SeqNum::new(1000).unwrap(); + let entry = storage_provider + .get_entry_at_seq_num(&author_not_in_db, &LogId::new(1), &seq_num_not_in_log) + .await + .unwrap(); + assert!(entry.is_none()) + } + + #[tokio::test] + async fn get_entry_by_hash() { + let storage_provider = test_db(100).await; + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author = Author::try_from(*key_pair.public_key()).unwrap(); + + for seq_num in [1, 11, 32, 45, 76] { + let seq_num = SeqNum::new(seq_num).unwrap(); + let entry = storage_provider + .get_entry_at_seq_num(&author, &LogId::new(1), &seq_num) + .await + .unwrap() + .unwrap(); + + let entry_hash = entry.hash(); + let entry_by_hash = storage_provider + .get_entry_by_hash(&entry_hash) + .await + .unwrap() + .unwrap(); + assert_eq!(entry, entry_by_hash) + } + + let entry_hash_not_in_db = Hash::new_from_bytes(vec![1, 2, 3]).unwrap(); + let entry = storage_provider + .get_entry_by_hash(&entry_hash_not_in_db) + .await + .unwrap(); + assert!(entry.is_none()) + } + + #[tokio::test] + async fn gets_next_n_entries_after_seq() { + let storage_provider = test_db(50).await; + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author = Author::try_from(*key_pair.public_key()).unwrap(); + + let entries = storage_provider + .get_paginated_log_entries(&author, &LogId::default(), &SeqNum::default(), 20) + .await + .unwrap(); + for entry in entries.clone() { + assert!(entry.seq_num().as_u64() >= 1 && entry.seq_num().as_u64() <= 20) + } + assert_eq!(entries.len(), 20); + } + + #[tokio::test] + async fn gets_all_skiplink_entries_for_entry() { + let storage_provider = test_db(50).await; + + let key_pair = KeyPair::from_private_key_str(DEFAULT_PRIVATE_KEY).unwrap(); + let author = Author::try_from(*key_pair.public_key()).unwrap(); + + let entries = storage_provider + .get_all_skiplink_entries_for_entry( + &author, + &LogId::default(), + &SeqNum::new(20).unwrap(), + ) + .await + .unwrap(); + + let cert_pool_seq_nums = entries + .iter() + .map(|entry| entry.seq_num().as_u64()) + .collect::>(); + + assert!(!entries.is_empty()); + assert_eq!(cert_pool_seq_nums, vec![19, 18, 17, 13, 4, 1]); } } diff --git a/aquadoggo/src/db/stores/log.rs b/aquadoggo/src/db/stores/log.rs index e66524132..2e730b782 100644 --- a/aquadoggo/src/db/stores/log.rs +++ b/aquadoggo/src/db/stores/log.rs @@ -1,52 +1,61 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use std::str::FromStr; - use async_trait::async_trait; +use sqlx::{query, query_scalar}; + use p2panda_rs::document::DocumentId; use p2panda_rs::entry::LogId; use p2panda_rs::identity::Author; use p2panda_rs::schema::SchemaId; use p2panda_rs::storage_provider::errors::LogStorageError; use p2panda_rs::storage_provider::traits::{AsStorageLog, LogStore}; -use sqlx::{query, query_scalar}; -use crate::db::models::log::LogRow; use crate::db::provider::SqlStorage; -impl AsStorageLog for LogRow { - fn new(author: &Author, schema: &SchemaId, document: &DocumentId, log_id: &LogId) -> Self { +pub struct StorageLog { + author: Author, + log_id: LogId, + document_id: DocumentId, + schema_id: SchemaId, +} + +impl AsStorageLog for StorageLog { + fn new( + author: &Author, + schema_id: &SchemaId, + document_id: &DocumentId, + log_id: &LogId, + ) -> Self { Self { - author: author.as_str().to_string(), - log_id: log_id.as_u64().to_string(), - document: document.as_str().to_string(), - schema: schema.as_str(), + author: author.to_owned(), + log_id: log_id.to_owned(), + document_id: document_id.to_owned(), + schema_id: schema_id.to_owned(), } } fn author(&self) -> Author { - Author::new(&self.author).unwrap() + self.author.clone() } fn id(&self) -> LogId { - LogId::from_str(&self.log_id).unwrap() + self.log_id } fn document_id(&self) -> DocumentId { - let document_id: DocumentId = self.document.parse().unwrap(); - document_id + self.document_id.clone() } fn schema_id(&self) -> SchemaId { - SchemaId::new(&self.schema).unwrap() + self.schema_id.clone() } } -/// Trait which handles all storage actions relating to `LogRow`s. +/// Trait which handles all storage actions relating to `StorageLog`s. #[async_trait] -impl LogStore for SqlStorage { +impl LogStore for SqlStorage { /// Insert a log into storage. - async fn insert_log(&self, log: LogRow) -> Result { + async fn insert_log(&self, log: StorageLog) -> Result { let rows_affected = query( " INSERT INTO @@ -164,9 +173,9 @@ mod tests { AsStorageEntry, AsStorageLog, EntryStore, LogStore, StorageProvider, }; - use crate::db::models::entry::EntryRow; - use crate::db::models::log::LogRow; use crate::db::provider::SqlStorage; + use crate::db::stores::entry::StorageEntry; + use crate::db::stores::log::StorageLog; use crate::test_helpers::{initialize_db, random_entry_hash}; const TEST_AUTHOR: &str = "58223678ab378f1b07d1d8c789e6da01d16a06b1a4d17cc10119a0109181156c"; @@ -195,10 +204,10 @@ mod tests { let schema = SchemaId::new_application("venue", &Hash::new(&random_entry_hash()).unwrap().into()); - let log = LogRow::new(&author, &schema, &document.clone().into(), &LogId::new(1)); + let log = StorageLog::new(&author, &schema, &document.clone().into(), &LogId::new(1)); assert!(storage_provider.insert_log(log).await.is_ok()); - let log = LogRow::new(&author, &schema, &document.into(), &LogId::new(1)); + let log = StorageLog::new(&author, &schema, &document.into(), &LogId::new(1)); assert!(storage_provider.insert_log(log).await.is_err()); } @@ -217,7 +226,7 @@ mod tests { ]), ); - let log = LogRow::new(&author, &schema, &document.into(), &LogId::new(1)); + let log = StorageLog::new(&author, &schema, &document.into(), &LogId::new(1)); assert!(storage_provider.insert_log(log).await.is_ok()); } @@ -253,7 +262,7 @@ mod tests { .await .unwrap(); assert_eq!(LogId::new(n.into()), log_id); - let log = LogRow::new(&author, &schema, &doc, &log_id); + let log = StorageLog::new(&author, &schema, &doc, &log_id); storage_provider.insert_log(log).await.unwrap(); } } @@ -292,12 +301,12 @@ mod tests { None ); - let entry = EntryRow::new(&entry_encoded.clone(), &operation_encoded).unwrap(); + let entry = StorageEntry::new(&entry_encoded.clone(), &operation_encoded).unwrap(); // Store entry in database assert!(storage_provider.insert_entry(entry).await.is_ok()); - let log = LogRow::new( + let log = StorageLog::new( &author, &schema, &entry_encoded.hash().into(), @@ -347,8 +356,8 @@ mod tests { let storage_provider = SqlStorage { pool }; // Register two log ids at the beginning - let log_1 = LogRow::new(&author, &schema, &document_first.into(), &LogId::new(1)); - let log_2 = LogRow::new(&author, &schema, &document_second.into(), &LogId::new(2)); + let log_1 = StorageLog::new(&author, &schema, &document_first.into(), &LogId::new(1)); + let log_2 = StorageLog::new(&author, &schema, &document_second.into(), &LogId::new(2)); storage_provider.insert_log(log_1).await.unwrap(); storage_provider.insert_log(log_2).await.unwrap(); @@ -357,7 +366,7 @@ mod tests { let log_id = storage_provider.next_log_id(&author).await.unwrap(); assert_eq!(log_id, LogId::new(3)); - let log_3 = LogRow::new(&author, &schema, &document_third.into(), &log_id); + let log_3 = StorageLog::new(&author, &schema, &document_third.into(), &log_id); storage_provider.insert_log(log_3).await.unwrap(); @@ -365,7 +374,7 @@ mod tests { let log_id = storage_provider.next_log_id(&author).await.unwrap(); assert_eq!(log_id, LogId::new(4)); - let log_4 = LogRow::new(&author, &schema, &document_forth.into(), &log_id); + let log_4 = StorageLog::new(&author, &schema, &document_forth.into(), &log_id); storage_provider.insert_log(log_4).await.unwrap(); diff --git a/aquadoggo/src/db/stores/mod.rs b/aquadoggo/src/db/stores/mod.rs index 1bef8fc32..9a04686c9 100644 --- a/aquadoggo/src/db/stores/mod.rs +++ b/aquadoggo/src/db/stores/mod.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -mod entry; -mod log; +pub mod entry; +pub mod log; #[cfg(test)] mod test_utils;