diff --git a/Cargo.lock b/Cargo.lock index c451d75..d036e9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3110,7 +3110,7 @@ dependencies = [ [[package]] name = "stelae" -version = "0.3.0-alpha.3" +version = "0.3.0-alpha.4" dependencies = [ "actix-http", "actix-service", @@ -3124,6 +3124,7 @@ dependencies = [ "git2", "just", "lazy_static", + "md-5", "mime", "mime_guess", "regex", diff --git a/Cargo.toml b/Cargo.toml index 04d6dd0..c7956d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "stelae" description = "A collection of tools in Rust and Python for preserving, authenticating, and accessing laws in perpetuity." -version = "0.3.0-alpha.3" +version = "0.3.0-alpha.4" edition = "2021" readme = "README.md" license = "AGPL-3.0" @@ -16,6 +16,7 @@ actix-web = "4" actix-service = "2.0" actix-http = "3.2" async-trait = "0.1.77" +md-5 = "0.10.6" mime = "0.3.17" mime_guess = "2.0.4" anyhow = "1.0" diff --git a/migrations/sqlite/20240115152953_initial_db.down.sql b/migrations/sqlite/20240115152953_initial_db.down.sql index a6e8623..f6141c9 100644 --- a/migrations/sqlite/20240115152953_initial_db.down.sql +++ b/migrations/sqlite/20240115152953_initial_db.down.sql @@ -12,8 +12,8 @@ DROP TABLE IF EXISTS publication_has_publication_versions; DROP TABLE IF EXISTS publication_version; DROP TABLE IF EXISTS publication; DROP TABLE IF EXISTS version; -DROP TABLE IF EXISTS library_document; DROP TABLE IF EXISTS library; +DROP TABLE IF EXISTS document_element; DROP TABLE IF EXISTS document; DROP TABLE IF EXISTS stele; diff --git a/migrations/sqlite/20240115152953_initial_db.up.sql b/migrations/sqlite/20240115152953_initial_db.up.sql index 77e5fad..a681473 100644 --- a/migrations/sqlite/20240115152953_initial_db.up.sql +++ b/migrations/sqlite/20240115152953_initial_db.up.sql @@ -7,113 +7,98 @@ CREATE TABLE stele ( CREATE TABLE document ( doc_id TEXT PRIMARY KEY ); -CREATE TABLE library ( - mpath TEXT PRIMARY KEY -); -CREATE TABLE library_document ( - collection_mpath TEXT, +CREATE TABLE document_element ( + doc_mpath TEXT, + url TEXT, doc_id TEXT, - start DATE, - end DATE, - CONSTRAINT fk_coll_mpath - FOREIGN KEY (collection_mpath) - REFERENCES library(mpath), CONSTRAINT fk_doc_id FOREIGN KEY (doc_id) REFERENCES document(doc_id), - PRIMARY KEY (collection_mpath, doc_id) + PRIMARY KEY (doc_mpath) +); +CREATE TABLE library ( + mpath TEXT PRIMARY KEY, + url TEXT ); CREATE TABLE publication ( + id TEXT, name TEXT, date INTEGER, stele TEXT, revoked INTEGER, - last_valid_publication_name TEXT, + last_valid_publication_id TEXT, last_valid_version TEXT, CONSTRAINT fk_last_valid_version FOREIGN KEY (last_valid_version) REFERENCES version(codified_date), CONSTRAINT fk_last_valid_publication - FOREIGN KEY (last_valid_publication_name, stele) - REFERENCES publication(name, stele), + FOREIGN KEY (last_valid_publication_id) + REFERENCES publication(id), CONSTRAINT fk_stele FOREIGN KEY (stele) REFERENCES stele(name) ON DELETE CASCADE, - PRIMARY KEY (name, stele) + PRIMARY KEY (id) +); +CREATE TABLE version( + codified_date TEXT PRIMARY KEY ); CREATE TABLE publication_version ( + id TEXT, version TEXT, - publication TEXT, - stele TEXT, + publication_id TEXT, build_reason TEXT, CONSTRAINT fk_publication - FOREIGN KEY (publication, stele) - REFERENCES publication(name, stele) + FOREIGN KEY (publication_id) + REFERENCES publication(id) ON DELETE CASCADE, CONSTRAINT fk_version FOREIGN KEY (version) REFERENCES version(codified_date), - PRIMARY KEY (publication, version, stele) + PRIMARY KEY (id) ); CREATE TABLE publication_has_publication_versions ( - publication TEXT, - referenced_publication TEXT, - referenced_version TEXT, - stele TEXT, - CONSTRAINT fk_publication FOREIGN KEY (publication, stele) REFERENCES publication(name, stele) ON DELETE CASCADE, - CONSTRAINT fk_referenced_publication FOREIGN KEY (referenced_publication, referenced_version, stele) REFERENCES publication_version(publication, version, stele) ON DELETE CASCADE, - PRIMARY KEY (publication, referenced_publication, referenced_version, stele) -); -CREATE TABLE version( - codified_date TEXT PRIMARY KEY + publication_id TEXT, + publication_version_id TEXT, + CONSTRAINT fk_publication FOREIGN KEY (publication_id) REFERENCES publication(id) ON DELETE CASCADE, + CONSTRAINT fk_referenced_publication_version FOREIGN KEY (publication_version_id) REFERENCES publication_version(id) ON DELETE CASCADE, + PRIMARY KEY (publication_id, publication_version_id) ); CREATE TABLE document_change ( - doc_mpath TEXT, - status TEXT, - url TEXT, + id TEXT, + status INTEGER, change_reason TEXT, - publication TEXT, - version TEXT, - stele TEXT, - doc_id TEXT, - CONSTRAINT fk_doc_id - FOREIGN KEY (doc_id) - REFERENCES document(doc_id) + publication_version_id TEXT, + doc_mpath TEXT, + CONSTRAINT fk_doc_el + FOREIGN KEY (doc_mpath) + REFERENCES document_element(doc_mpath) ON DELETE CASCADE, CONSTRAINT fk_publication_version - FOREIGN KEY (publication, version, stele) - REFERENCES publication_version(publication, version, stele) + FOREIGN KEY (publication_version_id) + REFERENCES publication_version(id) ON DELETE CASCADE, - PRIMARY KEY (doc_mpath, status, publication, version, stele) + PRIMARY KEY (id) ); CREATE INDEX document_change_doc_mpath_idx ON document_change(doc_mpath COLLATE NOCASE); CREATE TABLE library_change ( - publication TEXT, - version TEXT, - stele TEXT, + publication_version_id TEXT, status TEXT, library_mpath TEXT, - url TEXT, CONSTRAINT fk_publication_version - FOREIGN KEY (publication, version, stele) - REFERENCES publication_version(publication, version, stele) + FOREIGN KEY (publication_version_id) + REFERENCES publication_version(id) ON DELETE CASCADE, - PRIMARY KEY (publication, version, stele, library_mpath, status) + PRIMARY KEY (publication_version_id, library_mpath, status) ); CREATE TABLE changed_library_document ( - publication TEXT, - version TEXT, - stele TEXT, - doc_mpath TEXT, - status TEXT, library_mpath TEXT, - url TEXT, + document_change_id TEXT, CONSTRAINT fk_document_change - FOREIGN KEY (publication, version, stele, doc_mpath, status) - REFERENCES document_change(publication, version, stele, doc_mpath, status) + FOREIGN KEY (document_change_id) + REFERENCES document_change(id) ON DELETE CASCADE, - PRIMARY KEY (publication, version, stele, library_mpath, doc_mpath, status) + PRIMARY KEY (document_change_id, library_mpath) ); CREATE INDEX library_change_library_mpath_idx ON library_change(library_mpath COLLATE NOCASE); CREATE INDEX changed_library_document_library_mpath_idx ON changed_library_document(library_mpath COLLATE NOCASE); diff --git a/src/db/mod.rs b/src/db/mod.rs index ee4d149..8954e47 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,6 +1,7 @@ //! Database related module. #![allow(clippy::unreachable)] use async_trait::async_trait; +use sqlx::Transaction; use std::str::FromStr; use sqlx::any::{self, AnyPoolOptions}; @@ -12,8 +13,6 @@ use tracing::instrument; pub mod init; /// Models for the database. pub mod models; -/// Statements for the database. -pub mod statements; #[async_trait] /// Generic Database @@ -25,6 +24,17 @@ pub trait Db { async fn connect(url: &str) -> anyhow::Result; } +#[async_trait] +/// Generic transaction +pub trait Tx { + /// Begin a transaction. + async fn begin(pool: AnyPool) -> anyhow::Result; + /// Commit a transaction. + async fn commit(self) -> anyhow::Result<()>; + /// Rollback a transaction. + async fn rollback(self) -> anyhow::Result<()>; +} + /// Type of database connection. #[derive(Debug, Clone)] pub enum DatabaseKind { @@ -43,6 +53,12 @@ pub struct DatabaseConnection { pub kind: DatabaseKind, } +/// Database transaction. +pub struct DatabaseTransaction { + /// Database transaction. + pub tx: Transaction<'static, sqlx::Any>, +} + #[async_trait] impl Db for DatabaseConnection { /// Connects to a database. @@ -72,3 +88,23 @@ impl Db for DatabaseConnection { Ok(connection) } } + +#[async_trait] +impl Tx for DatabaseTransaction { + /// Begin a transaction. + async fn begin(pool: AnyPool) -> anyhow::Result { + let tx = pool.begin().await?; + Ok(Self { tx }) + } + /// Commit a transaction. + async fn commit(self) -> anyhow::Result<()> { + self.tx.commit().await?; + Ok(()) + } + + /// Rollback a transaction. + async fn rollback(self) -> anyhow::Result<()> { + self.tx.rollback().await?; + Ok(()) + } +} diff --git a/src/db/models/changed_library_document.rs b/src/db/models/changed_library_document.rs deleted file mode 100644 index 7a734db..0000000 --- a/src/db/models/changed_library_document.rs +++ /dev/null @@ -1,21 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(sqlx::FromRow, Deserialize, Serialize)] -/// Model for library (collection) change events. -pub struct ChangedLibraryDocument { - /// Foreign key reference to publication name - pub publication: String, - /// Foreign key reference to codified date in a publication in %Y-%m-%d format - pub version: String, - /// Foreign key reference to stele identifier in / format. - pub stele: String, - /// Materialized path to the document - pub doc_mpath: String, - /// Change status of the document. - /// Currently could be 'Element added', 'Element effective', 'Element changed' or 'Element removed'. - pub status: String, - /// Materialized path to the library - pub library_mpath: String, - /// Url to the library that was changed. - pub url: String, -} diff --git a/src/db/models/changed_library_document/manager.rs b/src/db/models/changed_library_document/manager.rs new file mode 100644 index 0000000..0ab29f2 --- /dev/null +++ b/src/db/models/changed_library_document/manager.rs @@ -0,0 +1,31 @@ +//! Manager for the changed library document model. +use super::ChangedLibraryDocument; +use crate::db::{models::BATCH_SIZE, DatabaseTransaction}; +use async_trait::async_trait; +use sqlx::QueryBuilder; + +#[async_trait] +impl super::TxManager for DatabaseTransaction { + /// Upsert a bulk of changed library documents into the database. + /// + /// # Errors + /// Errors if the changed library documents cannot be inserted into the database. + async fn insert_bulk( + &mut self, + changed_library_document: Vec, + ) -> anyhow::Result<()> { + let mut query_builder = QueryBuilder::new( + "INSERT OR IGNORE INTO changed_library_document ( library_mpath, document_change_id ) ", + ); + for chunk in changed_library_document.chunks(BATCH_SIZE) { + query_builder.push_values(chunk, |mut bindings, cl| { + bindings + .push_bind(&cl.library_mpath) + .push_bind(&cl.document_change_id); + }); + let query = query_builder.build(); + query.execute(&mut *self.tx).await?; + } + Ok(()) + } +} diff --git a/src/db/models/changed_library_document/mod.rs b/src/db/models/changed_library_document/mod.rs new file mode 100644 index 0000000..bf38318 --- /dev/null +++ b/src/db/models/changed_library_document/mod.rs @@ -0,0 +1,34 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +pub mod manager; + +/// Trait for managing transactional changed library documents. +#[async_trait] +pub trait TxManager { + /// Insert bulk of changed library documents. + async fn insert_bulk( + &mut self, + changed_library_document: Vec, + ) -> anyhow::Result<()>; +} + +#[derive(sqlx::FromRow, Deserialize, Serialize)] +/// Model for library (collection) change events. +pub struct ChangedLibraryDocument { + /// Foreign key reference to `document_change` id. + pub document_change_id: String, + /// Materialized path to the library + pub library_mpath: String, +} + +impl ChangedLibraryDocument { + /// Create a new library change. + #[must_use] + pub const fn new(document_change_id: String, library_mpath: String) -> Self { + Self { + document_change_id, + library_mpath, + } + } +} diff --git a/src/db/models/document.rs b/src/db/models/document.rs deleted file mode 100644 index 5499822..0000000 --- a/src/db/models/document.rs +++ /dev/null @@ -1,8 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(sqlx::FromRow, Deserialize, Serialize)] -/// Model for documents. -pub struct Document { - /// Unique document identifier. - pub doc_id: String, -} diff --git a/src/db/models/document/manager.rs b/src/db/models/document/manager.rs new file mode 100644 index 0000000..a4d4f2f --- /dev/null +++ b/src/db/models/document/manager.rs @@ -0,0 +1,23 @@ +//! Manager for the document model. +use crate::db::DatabaseTransaction; +use async_trait::async_trait; + +#[async_trait] +impl super::TxManager for DatabaseTransaction { + /// Upsert a new document into the database. + /// + /// # Errors + /// Errors if the document cannot be inserted into the database. + async fn create(&mut self, doc_id: &str) -> anyhow::Result> { + let statement = " + INSERT OR IGNORE INTO document ( doc_id ) + VALUES ( $1 ) + "; + let id = sqlx::query(statement) + .bind(doc_id) + .execute(&mut *self.tx) + .await? + .last_insert_id(); + Ok(id) + } +} diff --git a/src/db/models/document/mod.rs b/src/db/models/document/mod.rs new file mode 100644 index 0000000..58c6b28 --- /dev/null +++ b/src/db/models/document/mod.rs @@ -0,0 +1,18 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +pub mod manager; + +/// Trait for managing transactional documents. +#[async_trait] +pub trait TxManager { + /// Create a new publication version. + async fn create(&mut self, doc_id: &str) -> anyhow::Result>; +} + +#[derive(sqlx::FromRow, Deserialize, Serialize)] +/// Model for documents. +pub struct Document { + /// Unique document identifier. + pub doc_id: String, +} diff --git a/src/db/models/document_change.rs b/src/db/models/document_change.rs deleted file mode 100644 index b91b889..0000000 --- a/src/db/models/document_change.rs +++ /dev/null @@ -1,39 +0,0 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - -use super::version::Version; - -/// Trait for managing document changes. -#[async_trait] -pub trait Manager { - /// Find one document materialized path by url. - async fn find_doc_mpath_by_url(&self, url: &str) -> anyhow::Result; - /// All dates on which given document changed. - async fn find_all_document_versions_by_mpath_and_publication( - &self, - mpath: &str, - publication: &str, - ) -> anyhow::Result>; -} - -#[derive(sqlx::FromRow, Deserialize, Serialize)] -/// Model for document change events. -pub struct DocumentChange { - /// Materialized path to the document - pub doc_mpath: String, - /// Change status of the document. - /// Currently could be 'Element added', 'Element effective', 'Element changed' or 'Element removed'. - pub status: String, - /// Url to the document that was changed. - pub url: String, - /// Optional reason for the change event. - pub change_reason: Option, - /// Foreign key reference to the publication name. - pub publication: String, - /// Foreign key reference to codified date in a publication in %Y-%m-%d format - pub version: String, - /// Foreign key reference to stele identifier in / format. - pub stele: String, - /// Foreign key reference to document id. - pub doc_id: String, -} diff --git a/src/db/models/document_change/manager.rs b/src/db/models/document_change/manager.rs new file mode 100644 index 0000000..faed424 --- /dev/null +++ b/src/db/models/document_change/manager.rs @@ -0,0 +1,129 @@ +//! Manager for the document change model. +use super::DocumentChange; +use crate::db::{ + models::{status::Status, version::Version, BATCH_SIZE}, + DatabaseConnection, DatabaseKind, DatabaseTransaction, +}; +use async_trait::async_trait; +use chrono::NaiveDate; +use sqlx::QueryBuilder; + +#[async_trait] +impl super::Manager for DatabaseConnection { + /// All dates on which given document changed. + /// + /// # Errors + /// Errors if can't establish a connection to the database. + async fn find_all_document_versions_by_mpath_and_publication( + &self, + mpath: &str, + publication_id: &str, + ) -> anyhow::Result> { + let mut statement = " + SELECT DISTINCT pv.version AS codified_date + FROM document_change dc + LEFT JOIN publication_has_publication_versions phpv ON dc.publication_version_id = phpv.publication_version_id + LEFT JOIN publication_version pv ON phpv.publication_version_id = pv.id + WHERE dc.doc_mpath LIKE $1 AND phpv.publication_id = $2 + "; + let mut rows = match self.kind { + DatabaseKind::Postgres | DatabaseKind::Sqlite => { + let mut connection = self.pool.acquire().await?; + sqlx::query_as::<_, Version>(statement) + .bind(format!("{mpath}%")) + .bind(publication_id) + .fetch_all(&mut *connection) + .await? + } + }; + statement = " + SELECT pv.version AS codified_date + FROM document_change dc + LEFT JOIN publication_has_publication_versions phpv ON dc.publication_version_id = phpv.publication_version_id + LEFT JOIN publication_version pv ON phpv.publication_version_id = pv.id + WHERE dc.doc_mpath = $1 AND phpv.publication_id = $2 AND dc.status = $3 + LIMIT 1 + "; + let element_added = match self.kind { + DatabaseKind::Postgres | DatabaseKind::Sqlite => { + let mut connection = self.pool.acquire().await?; + sqlx::query_as::<_, Version>(statement) + .bind(mpath) + .bind(publication_id) + .bind(Status::ElementAdded.to_int()) + .fetch_one(&mut *connection) + .await + .ok() + } + }; + + if element_added.is_none() { + // When element doesn't have date added, it means we're looking + // at an old publication and this element doesn't yet exist in it + rows.sort_by(|v1, v2| v2.codified_date.cmp(&v1.codified_date)); + return Ok(rows); + } + + statement = " + SELECT pv.version AS codified_date + FROM document_change dc + LEFT JOIN publication_has_publication_versions phpv ON dc.publication_version_id = phpv.publication_version_id + LEFT JOIN publication_version pv ON phpv.publication_version_id = pv.id + WHERE dc.doc_mpath = $1 AND phpv.publication_id = $2 AND dc.status = $3 + LIMIT 1 + "; + let mut doc = mpath.split('|').next().unwrap_or("").to_owned(); + doc.push('|'); + + let document_effective = match self.kind { + DatabaseKind::Postgres | DatabaseKind::Sqlite => { + let mut connection = self.pool.acquire().await?; + sqlx::query_as::<_, Version>(statement) + .bind(doc) + .bind(publication_id) + .bind(Status::ElementEffective.to_int()) + .fetch_one(&mut *connection) + .await + .ok() + } + }; + + if let (Some(doc_effective), Some(el_added)) = (document_effective, element_added) { + if !rows.contains(&doc_effective) + && NaiveDate::parse_from_str(&doc_effective.codified_date, "%Y-%m-%d") + .unwrap_or_default() + > NaiveDate::parse_from_str(&el_added.codified_date, "%Y-%m-%d") + .unwrap_or_default() + { + rows.push(doc_effective); + } + } + rows.sort_by(|v1, v2| v2.codified_date.cmp(&v1.codified_date)); + Ok(rows) + } +} + +#[async_trait] +impl super::TxManager for DatabaseTransaction { + /// Upsert a bulk of document changes into the database. + /// + /// # Errors + /// Errors if the document changes cannot be inserted into the database. + async fn insert_bulk(&mut self, document_changes: Vec) -> anyhow::Result<()> { + let mut query_builder = QueryBuilder::new("INSERT OR IGNORE INTO document_change ( id, status, change_reason, publication_version_id, doc_mpath ) "); + for chunk in document_changes.chunks(BATCH_SIZE) { + query_builder.push_values(chunk, |mut bindings, dc| { + bindings + .push_bind(&dc.id) + .push_bind(dc.status) + .push_bind(&dc.change_reason) + .push_bind(&dc.publication_version_id) + .push_bind(&dc.doc_mpath); + }); + let query = query_builder.build(); + query.execute(&mut *self.tx).await?; + query_builder.reset(); + } + Ok(()) + } +} diff --git a/src/db/models/document_change/mod.rs b/src/db/models/document_change/mod.rs new file mode 100644 index 0000000..f44f169 --- /dev/null +++ b/src/db/models/document_change/mod.rs @@ -0,0 +1,60 @@ +use super::version::Version; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +pub mod manager; + +/// Trait for managing document changes. +#[async_trait] +pub trait Manager { + /// All dates on which given document changed. + async fn find_all_document_versions_by_mpath_and_publication( + &self, + mpath: &str, + publication: &str, + ) -> anyhow::Result>; +} + +/// Trait for managing transactional document changes. +#[async_trait] +pub trait TxManager { + /// Insert a bulk of document changes. + async fn insert_bulk(&mut self, document_changes: Vec) -> anyhow::Result<()>; +} + +#[derive(sqlx::FromRow, Deserialize, Serialize)] +/// Model for document change events. +pub struct DocumentChange { + /// A hashed identifier for the document change. + /// The hash is generated from the `publication_version` id, `doc_mpath`, and `status` (as integer) field. + pub id: String, + /// Change status of the document. + /// Currently could be 'Element added' = 0, 'Element effective' = 1, 'Element changed' = 2 or 'Element removed' = 3. + pub status: i64, + /// Optional reason for the change event. + pub change_reason: Option, + /// Foreign key reference to the `publication_version` id. + pub publication_version_id: String, + /// Materialized path to the document + pub doc_mpath: String, +} + +impl DocumentChange { + /// Create a new document change. + #[must_use] + pub const fn new( + id: String, + status: i64, + change_reason: Option, + publication_version_id: String, + doc_mpath: String, + ) -> Self { + Self { + id, + status, + change_reason, + publication_version_id, + doc_mpath, + } + } +} diff --git a/src/db/models/document_element/manager.rs b/src/db/models/document_element/manager.rs new file mode 100644 index 0000000..c4e213c --- /dev/null +++ b/src/db/models/document_element/manager.rs @@ -0,0 +1,57 @@ +//! Manager for the document element model. +use async_trait::async_trait; +use sqlx::QueryBuilder; + +use crate::db::{models::BATCH_SIZE, DatabaseConnection, DatabaseKind, DatabaseTransaction}; + +use super::DocumentElement; + +#[async_trait] +impl super::Manager for DatabaseConnection { + /// Find one document materialized path by url. + /// + /// # Errors + /// Errors if can't establish a connection to the database. + async fn find_doc_mpath_by_url(&self, url: &str) -> anyhow::Result { + let statement = " + SELECT de.doc_mpath + FROM document_element de + WHERE de.url = $1 + LIMIT 1 + "; + let row = match self.kind { + DatabaseKind::Postgres | DatabaseKind::Sqlite => { + let mut connection = self.pool.acquire().await?; + sqlx::query_as::<_, (String,)>(statement) + .bind(url) + .fetch_one(&mut *connection) + .await? + } + }; + Ok(row.0) + } +} + +#[async_trait] +impl super::TxManager for DatabaseTransaction { + /// Upsert a bulk of document elements into the database. + /// + /// # Errors + /// Errors if the document elements cannot be inserted into the database. + async fn insert_bulk(&mut self, document_elements: Vec) -> anyhow::Result<()> { + let mut query_builder = + QueryBuilder::new("INSERT OR IGNORE INTO document_element ( doc_mpath, url, doc_id ) "); + for chunk in document_elements.chunks(BATCH_SIZE) { + query_builder.push_values(chunk, |mut bindings, de| { + bindings + .push_bind(&de.doc_mpath) + .push_bind(&de.url) + .push_bind(&de.doc_id); + }); + let query = query_builder.build(); + query.execute(&mut *self.tx).await?; + query_builder.reset(); + } + Ok(()) + } +} diff --git a/src/db/models/document_element/mod.rs b/src/db/models/document_element/mod.rs new file mode 100644 index 0000000..fa03832 --- /dev/null +++ b/src/db/models/document_element/mod.rs @@ -0,0 +1,41 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +pub mod manager; + +/// Trait for managing document elements. +#[async_trait] +pub trait Manager { + /// Find one document materialized path by url. + async fn find_doc_mpath_by_url(&self, url: &str) -> anyhow::Result; +} + +/// Trait for managing transactional document elements. +#[async_trait] +pub trait TxManager { + /// Insert a bulk of document elements. + async fn insert_bulk(&mut self, document_elements: Vec) -> anyhow::Result<()>; +} + +#[derive(sqlx::FromRow, Deserialize, Serialize)] +/// Model for document elements. +pub struct DocumentElement { + /// Materialized path to the document + pub doc_mpath: String, + /// Url to the document that was changed. + pub url: String, + /// Unique document identifier. + pub doc_id: String, +} + +impl DocumentElement { + /// Create a new document element. + #[must_use] + pub const fn new(doc_mpath: String, url: String, doc_id: String) -> Self { + Self { + doc_mpath, + url, + doc_id, + } + } +} diff --git a/src/db/models/library.rs b/src/db/models/library.rs deleted file mode 100644 index d6643bc..0000000 --- a/src/db/models/library.rs +++ /dev/null @@ -1,8 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(sqlx::FromRow, Deserialize, Serialize)] -/// Model for library (collection). -pub struct Library { - /// Materialized path to the library - pub mpath: String, -} diff --git a/src/db/models/library/manager.rs b/src/db/models/library/manager.rs new file mode 100644 index 0000000..3197beb --- /dev/null +++ b/src/db/models/library/manager.rs @@ -0,0 +1,50 @@ +//! Manager for the library model. +use super::Library; +use crate::db::{models::BATCH_SIZE, DatabaseConnection, DatabaseKind, DatabaseTransaction}; +use async_trait::async_trait; +use sqlx::QueryBuilder; + +#[async_trait] +impl super::Manager for DatabaseConnection { + /// Find one library materialized path by url. + /// + /// # Errors + /// Errors if can't establish a connection to the database. + async fn find_lib_mpath_by_url(&self, url: &str) -> anyhow::Result { + let statement = " + SELECT l.mpath + FROM library l + WHERE l.url = $1 + LIMIT 1 + "; + let row = match self.kind { + DatabaseKind::Postgres | DatabaseKind::Sqlite => { + let mut connection = self.pool.acquire().await?; + sqlx::query_as::<_, (String,)>(statement) + .bind(url) + .fetch_one(&mut *connection) + .await? + } + }; + Ok(row.0) + } +} + +#[async_trait] +impl super::TxManager for DatabaseTransaction { + /// Upsert a bulk of libraries into the database. + /// + /// # Errors + /// Errors if the libraries cannot be inserted into the database. + async fn insert_bulk(&mut self, libraries: Vec) -> anyhow::Result<()> { + let mut query_builder = QueryBuilder::new("INSERT OR IGNORE INTO library ( mpath, url ) "); + for chunk in libraries.chunks(BATCH_SIZE) { + query_builder.push_values(chunk, |mut bindings, lb| { + bindings.push_bind(&lb.mpath).push_bind(&lb.url); + }); + let query = query_builder.build(); + query.execute(&mut *self.tx).await?; + } + Ok(()) + } +} diff --git a/src/db/models/library/mod.rs b/src/db/models/library/mod.rs new file mode 100644 index 0000000..e1cf49f --- /dev/null +++ b/src/db/models/library/mod.rs @@ -0,0 +1,35 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +pub mod manager; + +/// Trait for managing collection changes. +#[async_trait] +pub trait Manager { + /// Find one library materialized path by url. + async fn find_lib_mpath_by_url(&self, url: &str) -> anyhow::Result; +} + +/// Trait for managing transactions on publication versions. +#[async_trait] +pub trait TxManager { + /// Insert bulk libraries. + async fn insert_bulk(&mut self, libraries: Vec) -> anyhow::Result<()>; +} + +#[derive(sqlx::FromRow, Deserialize, Serialize)] +/// Model for library (collection). +pub struct Library { + /// Materialized path to the collection + pub mpath: String, + /// Url to the collection. + pub url: String, +} + +impl Library { + /// Create a new library. + #[must_use] + pub const fn new(mpath: String, url: String) -> Self { + Self { mpath, url } + } +} diff --git a/src/db/models/library_change.rs b/src/db/models/library_change.rs deleted file mode 100644 index 2e3ea27..0000000 --- a/src/db/models/library_change.rs +++ /dev/null @@ -1,35 +0,0 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - -use super::version::Version; - -/// Trait for managing collection changes. -#[async_trait] -pub trait Manager { - /// Find one library materialized path by url. - async fn find_lib_mpath_by_url(&self, url: &str) -> anyhow::Result; - /// All dates on which given documents within a collection changed. - async fn find_all_collection_versions_by_mpath_and_publication( - &self, - mpath: &str, - publication: &str, - ) -> anyhow::Result>; -} - -#[derive(sqlx::FromRow, Deserialize, Serialize, Debug)] -/// Model for library (collection) change events. -pub struct LibraryChange { - /// Foreign key reference to publication name - pub publication: String, - /// Foreign key reference to codified date in a publication in %Y-%m-%d format - pub version: String, - /// Foreign key reference to stele identifier in / format. - pub stele: String, - /// Change status of the document. - /// Currently could be 'Element added', 'Element effective', 'Element changed' or 'Element removed'. - pub status: String, - /// Url to the library that was changed. - pub url: String, - /// Materialized path to the library - pub library_mpath: String, -} diff --git a/src/db/models/library_change/manager.rs b/src/db/models/library_change/manager.rs new file mode 100644 index 0000000..03a859c --- /dev/null +++ b/src/db/models/library_change/manager.rs @@ -0,0 +1,91 @@ +//! Manager for the library change model. +use crate::db::{ + models::{status::Status, version::Version, BATCH_SIZE}, + DatabaseConnection, DatabaseKind, DatabaseTransaction, +}; +use async_trait::async_trait; +use sqlx::QueryBuilder; + +use super::LibraryChange; + +#[async_trait] +impl super::Manager for DatabaseConnection { + /// All dates on which documents from this collection changed. + /// + /// # Errors + /// Errors if can't establish a connection to the database. + async fn find_all_collection_versions_by_mpath_and_publication( + &self, + mpath: &str, + publication_id: &str, + ) -> anyhow::Result> { + let mut statement = " + SELECT DISTINCT pv.version AS codified_date + FROM changed_library_document cld + LEFT JOIN document_change dc on cld.document_change_id = dc.id + LEFT JOIN publication_has_publication_versions phpv ON dc.publication_version_id = phpv.publication_version_id + LEFT JOIN publication_version pv ON phpv.publication_version_id = pv.id + WHERE cld.library_mpath LIKE $1 AND phpv.publication_id = $2 + "; + let mut rows = match self.kind { + DatabaseKind::Postgres | DatabaseKind::Sqlite => { + let mut connection = self.pool.acquire().await?; + sqlx::query_as::<_, Version>(statement) + .bind(format!("{mpath}%")) + .bind(publication_id) + .fetch_all(&mut *connection) + .await? + } + }; + statement = " + SELECT DISTINCT pv.version AS codified_date + FROM library_change lc + LEFT JOIN publication_has_publication_versions phpv ON lc.publication_version_id = phpv.publication_version_id + LEFT JOIN publication_version pv ON phpv.publication_version_id = pv.id + WHERE lc.library_mpath LIKE $1 AND lc.status = $2 AND phpv.publication_id = $3 + LIMIT 1 + "; + let element_added = match self.kind { + DatabaseKind::Postgres | DatabaseKind::Sqlite => { + let mut connection = self.pool.acquire().await?; + sqlx::query_as::<_, Version>(statement) + .bind(format!("{mpath}%")) + .bind(Status::ElementAdded.to_int()) + .bind(publication_id) + .fetch_one(&mut *connection) + .await + .ok() + } + }; + + if let Some(el_added) = element_added { + if !rows.contains(&el_added) { + rows.push(el_added); + } + } + rows.sort_by(|v1, v2| v2.codified_date.cmp(&v1.codified_date)); + Ok(rows) + } +} + +#[async_trait] +impl super::TxManager for DatabaseTransaction { + /// Upsert a bulk of library changes into the database. + /// + /// # Errors + /// Errors if the library changes cannot be inserted into the database. + async fn insert_bulk(&mut self, library_changes: Vec) -> anyhow::Result<()> { + let mut query_builder = QueryBuilder::new("INSERT OR IGNORE INTO library_change ( library_mpath, publication_version_id, status ) "); + for chunk in library_changes.chunks(BATCH_SIZE) { + query_builder.push_values(chunk, |mut bindings, lc| { + bindings + .push_bind(&lc.library_mpath) + .push_bind(&lc.publication_version_id) + .push_bind(lc.status); + }); + let query = query_builder.build(); + query.execute(&mut *self.tx).await?; + } + Ok(()) + } +} diff --git a/src/db/models/library_change/mod.rs b/src/db/models/library_change/mod.rs new file mode 100644 index 0000000..68e3fc0 --- /dev/null +++ b/src/db/models/library_change/mod.rs @@ -0,0 +1,47 @@ +use super::version::Version; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +pub mod manager; + +/// Trait for managing collection changes. +#[async_trait] +pub trait Manager { + /// All dates on which given documents within a collection changed. + async fn find_all_collection_versions_by_mpath_and_publication( + &self, + mpath: &str, + publication: &str, + ) -> anyhow::Result>; +} + +/// Trait for managing transactional collection changes. +#[async_trait] +pub trait TxManager { + /// Insert a bulk of collection changes. + async fn insert_bulk(&mut self, library_changes: Vec) -> anyhow::Result<()>; +} + +#[derive(sqlx::FromRow, Deserialize, Serialize, Debug)] +/// Model for library (collection) change events. +pub struct LibraryChange { + /// Foreign key reference to `publication_version` id. + pub publication_version_id: String, + /// Change status of the document. + /// Currently could be 'Element added' = 0, 'Element effective' = 1, 'Element changed' = 2 or 'Element removed' = 3. + pub status: i64, + /// Materialized path to the library + pub library_mpath: String, +} + +impl LibraryChange { + /// Create a new library change. + #[must_use] + pub const fn new(publication_version_id: String, status: i64, library_mpath: String) -> Self { + Self { + publication_version_id, + status, + library_mpath, + } + } +} diff --git a/src/db/models/mod.rs b/src/db/models/mod.rs index de42bf7..b70df07 100644 --- a/src/db/models/mod.rs +++ b/src/db/models/mod.rs @@ -1,22 +1,29 @@ //! This module contains all the sqlx structs for the database tables. -/// sqlx structs for `changed_library_document` table. +/// Size of the batch for bulk inserts. +const BATCH_SIZE: usize = 1000; + +/// module for interacting with the `changed_library_document` table. pub mod changed_library_document; -/// sqlx structs for `document` table. +/// module for interacting with the `document` table. pub mod document; -/// sqlx structs for `document_change` table. +/// module for interacting with the `document_change` table. pub mod document_change; -/// sqlx structs for `library` table. +/// module for interacting with the `document_element` table. +pub mod document_element; +/// module for interacting with the `library` table. pub mod library; -/// sqlx structs for `library_change` table. +/// module for interacting with the `library_change` table. pub mod library_change; -/// sqlx structs for `publication` table. +/// module for interacting with the `publication` table. pub mod publication; -/// sqlx structs for `publication_has_publication_versions` table. +/// module for interacting with the `publication_has_publication_versions` table. pub mod publication_has_publication_versions; -/// sqlx structs for `publication_version` table +/// module for interacting with the `publication_version` table pub mod publication_version; -/// sqlx structs for `stele` table. +/// module for the document or library status utility. +pub mod status; +/// module for interacting with the `stele` table. pub mod stele; -/// sqlx structs for `version` table. +/// module for interacting with the `version` table. pub mod version; diff --git a/src/db/models/publication/manager.rs b/src/db/models/publication/manager.rs new file mode 100644 index 0000000..0be81b8 --- /dev/null +++ b/src/db/models/publication/manager.rs @@ -0,0 +1,153 @@ +//! Manager for the publication model. +use crate::db::{DatabaseConnection, DatabaseKind, DatabaseTransaction}; +use async_trait::async_trait; +use chrono::NaiveDate; + +use super::Publication; + +#[async_trait] +impl super::Manager for DatabaseConnection { + /// Find all publications which are not revoked for a given stele. + /// + /// # Errors + /// Errors if can't establish a connection to the database. + async fn find_all_non_revoked_publications( + &self, + stele: &str, + ) -> anyhow::Result> { + let statement = " + SELECT * + FROM publication + WHERE revoked = 0 AND stele = $1 + ORDER BY name DESC + "; + let rows = match self.kind { + DatabaseKind::Postgres | DatabaseKind::Sqlite => { + let mut connection = self.pool.acquire().await?; + sqlx::query_as::<_, Publication>(statement) + .bind(stele) + .fetch_all(&mut *connection) + .await? + } + }; + Ok(rows) + } +} + +#[async_trait] +impl super::TxManager for DatabaseTransaction { + /// Upsert a new publication into the database. + /// + /// # Errors + /// Errors if the publication cannot be inserted into the database. + async fn create( + &mut self, + hash_id: &str, + name: &str, + date: &NaiveDate, + stele: &str, + last_valid_publication_id: Option, + last_valid_version: Option, + ) -> anyhow::Result> { + let statement = " + INSERT OR IGNORE INTO publication ( id, name, date, stele, revoked, last_valid_publication_id, last_valid_version ) + VALUES ( $1, $2, $3, $4, FALSE, $5, $6) + "; + let id = sqlx::query(statement) + .bind(hash_id) + .bind(name) + .bind(date.to_string()) + .bind(stele) + .bind(last_valid_publication_id) + .bind(last_valid_version) + .execute(&mut *self.tx) + .await? + .last_insert_id(); + Ok(id) + } + /// Update a publication by name and stele to be revoked. + /// + /// # Errors + /// Errors if the publication cannot be updated. + async fn update_by_name_and_stele_set_revoked_true( + &mut self, + name: &str, + stele: &str, + ) -> anyhow::Result<()> { + let statement = " + UPDATE publication + SET revoked = TRUE + WHERE name = $1 AND stele = $2 + "; + sqlx::query(statement) + .bind(name) + .bind(stele) + .execute(&mut *self.tx) + .await?; + Ok(()) + } + /// Find the last non-revoked publication by `stele_id`. + /// + /// # Errors + /// Errors if can't establish a connection to the database. + async fn find_last_inserted(&mut self, stele: &str) -> anyhow::Result> { + let statement = " + SELECT * + FROM publication + WHERE revoked = 0 AND stele = $1 + ORDER BY date DESC + LIMIT 1 + "; + let row = sqlx::query_as::<_, Publication>(statement) + .bind(stele) + .fetch_one(&mut *self.tx) + .await + .ok(); + Ok(row) + } + + /// Find a publication by `name` and `date` and `stele_id`. + /// + /// # Errors + /// Errors if can't establish a connection to the database. + async fn find_by_name_and_stele( + &mut self, + name: &str, + stele: &str, + ) -> anyhow::Result { + let statement = " + SELECT * + FROM publication + WHERE name = $1 AND stele = $2 + "; + let row = sqlx::query_as::<_, Publication>(statement) + .bind(name) + .bind(stele) + .fetch_one(&mut *self.tx) + .await?; + Ok(row) + } + + /// Find all publication names by date and stele. + /// + /// # Errors + /// Errors if can't establish a connection to the database. + async fn find_all_by_date_and_stele_order_by_name_desc( + &mut self, + date: String, + stele: String, + ) -> anyhow::Result> { + let statement = " + SELECT * + FROM publication + WHERE date = $1 AND stele = $2 + ORDER BY name DESC + "; + let rows = sqlx::query_as::<_, Publication>(statement) + .bind(date) + .bind(stele) + .fetch_all(&mut *self.tx) + .await?; + Ok(rows) + } +} diff --git a/src/db/models/publication.rs b/src/db/models/publication/mod.rs similarity index 52% rename from src/db/models/publication.rs rename to src/db/models/publication/mod.rs index c73df08..3fdfde5 100644 --- a/src/db/models/publication.rs +++ b/src/db/models/publication/mod.rs @@ -1,7 +1,10 @@ use async_trait::async_trait; +use chrono::NaiveDate; use serde::{Deserialize, Serialize}; use sqlx::{any::AnyRow, FromRow, Row}; +pub mod manager; + /// Trait for managing publications. #[async_trait] pub trait Manager { @@ -12,9 +15,48 @@ pub trait Manager { ) -> anyhow::Result>; } +/// Trait for managing transactions on publications. +#[async_trait] +pub trait TxManager { + /// Create a new publication. + async fn create( + &mut self, + hash_id: &str, + name: &str, + date: &NaiveDate, + stele: &str, + last_valid_publication_id: Option, + last_valid_version: Option, + ) -> anyhow::Result>; + /// Update a publication by name and set revoked to true. + async fn update_by_name_and_stele_set_revoked_true( + &mut self, + name: &str, + stele: &str, + ) -> anyhow::Result<()>; + /// Find the last inserted publication for a given stele. + async fn find_last_inserted(&mut self, stele: &str) -> anyhow::Result>; + /// Find a publication by name and stele. + async fn find_by_name_and_stele( + &mut self, + name: &str, + stele: &str, + ) -> anyhow::Result; + /// Find all by date and stele and sort by name in descending order. + /// Used in revocation logic to find the latest publication. + async fn find_all_by_date_and_stele_order_by_name_desc( + &mut self, + date: String, + stele: String, + ) -> anyhow::Result>; +} + #[derive(Deserialize, Serialize, Debug)] /// Model for a Stele. pub struct Publication { + /// A hashed identifier for the publication. + /// The hash is generated from the `name` and `stele` fields of the publication. + pub id: String, /// Name of the publication in %YYYY-%MM-%DD format /// with optionally incrementing version numbers /// when two publications exist on same date. @@ -29,7 +71,7 @@ pub struct Publication { pub revoked: i64, /// If a publication is derived from another publication, /// represents the last publication name that was valid before this publication. - pub last_valid_publication_name: Option, + pub last_valid_publication_id: Option, /// If a publication is derived from another publication, /// represents the last publication version (codified date) from the previous publication /// that the current publication is derived from. @@ -39,11 +81,12 @@ pub struct Publication { impl FromRow<'_, AnyRow> for Publication { fn from_row(row: &AnyRow) -> anyhow::Result { Ok(Self { + id: row.try_get("id")?, name: row.try_get("name")?, date: row.try_get("date")?, stele: row.try_get("stele")?, revoked: row.try_get("revoked")?, - last_valid_publication_name: row.try_get("last_valid_publication_name").ok(), + last_valid_publication_id: row.try_get("last_valid_publication_id").ok(), last_valid_version: row.try_get("last_valid_version").ok(), }) } @@ -52,13 +95,14 @@ impl FromRow<'_, AnyRow> for Publication { impl Publication { /// Create a new publication. #[must_use] - pub const fn new(name: String, date: String, stele: String) -> Self { + pub const fn new(id: String, name: String, date: String, stele: String) -> Self { Self { + id, name, date, stele, revoked: 0, - last_valid_publication_name: None, + last_valid_publication_id: None, last_valid_version: None, } } diff --git a/src/db/models/publication_has_publication_versions.rs b/src/db/models/publication_has_publication_versions.rs deleted file mode 100644 index ee035bf..0000000 --- a/src/db/models/publication_has_publication_versions.rs +++ /dev/null @@ -1,15 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(sqlx::FromRow, Deserialize, Serialize, Clone, Debug)] -/// Model for publication which contain publication versions. -pub struct PublicationHasPublicationVersions { - /// Foreign key reference to publication name. - pub publication: String, - /// Publication can reference another publication. - /// Foreign key reference to the referenced publication name. - pub referenced_publication: String, - /// Date in a publication in %Y-%m-%d format - pub referenced_version: String, - /// Foreign key reference to stele. - pub stele: String, -} diff --git a/src/db/models/publication_has_publication_versions/manager.rs b/src/db/models/publication_has_publication_versions/manager.rs new file mode 100644 index 0000000..8a8378c --- /dev/null +++ b/src/db/models/publication_has_publication_versions/manager.rs @@ -0,0 +1,30 @@ +//! Manager for the `publication_has_publication_versions` table. +use crate::db::{models::BATCH_SIZE, DatabaseTransaction}; +use async_trait::async_trait; +use sqlx::QueryBuilder; + +use super::PublicationHasPublicationVersions; + +#[async_trait] +impl super::TxManager for DatabaseTransaction { + /// Upsert a bulk of `publication_has_publication_versions` into the database. + /// + /// # Errors + /// Errors if the `publication_has_publication_versions` cannot be inserted into the database. + async fn insert_bulk( + &mut self, + publication_has_publication_versions: Vec, + ) -> anyhow::Result<()> { + let mut query_builder = QueryBuilder::new("INSERT OR IGNORE INTO publication_has_publication_versions ( publication_id, publication_version_id ) "); + for chunk in publication_has_publication_versions.chunks(BATCH_SIZE) { + query_builder.push_values(chunk, |mut bindings, pb| { + bindings + .push_bind(&pb.publication_id) + .push_bind(&pb.publication_version_id); + }); + let query = query_builder.build(); + query.execute(&mut *self.tx).await?; + } + Ok(()) + } +} diff --git a/src/db/models/publication_has_publication_versions/mod.rs b/src/db/models/publication_has_publication_versions/mod.rs new file mode 100644 index 0000000..327296a --- /dev/null +++ b/src/db/models/publication_has_publication_versions/mod.rs @@ -0,0 +1,24 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +pub mod manager; + +/// Trait for managing transactions on publication has publication versions. +#[async_trait] +pub trait TxManager { + /// Upsert a bulk of `publication_has_publication_versions` into the database. + async fn insert_bulk( + &mut self, + publication_has_publication_versions: Vec, + ) -> anyhow::Result<()>; +} + +#[derive(sqlx::FromRow, Deserialize, Serialize, Clone, Debug)] +/// Model for publication which contain publication versions. +pub struct PublicationHasPublicationVersions { + /// Foreign key reference to publication id. + pub publication_id: String, + /// Publication can reference another publication. + /// Foreign key reference to the referenced publication version id. + pub publication_version_id: String, +} diff --git a/src/db/models/publication_version.rs b/src/db/models/publication_version.rs deleted file mode 100644 index 8893a23..0000000 --- a/src/db/models/publication_version.rs +++ /dev/null @@ -1,26 +0,0 @@ -use serde::{Deserialize, Serialize}; -use sqlx::{any::AnyRow, FromRow, Row}; - -#[derive(Deserialize, Serialize, Hash, Eq, PartialEq, Clone)] -/// Model for a Stele. -pub struct PublicationVersion { - /// Date in a publication in %Y-%m-%d format - pub version: String, - /// Foreign key reference to publication name. - pub publication: String, - /// Foreign key reference to stele. - pub stele: String, - /// Reason for building the publication. - pub build_reason: Option, -} - -impl FromRow<'_, AnyRow> for PublicationVersion { - fn from_row(row: &AnyRow) -> anyhow::Result { - Ok(Self { - version: row.try_get("version")?, - publication: row.try_get("publication")?, - stele: row.try_get("stele")?, - build_reason: row.try_get("build_reason").ok(), - }) - } -} diff --git a/src/db/models/publication_version/manager.rs b/src/db/models/publication_version/manager.rs new file mode 100644 index 0000000..51dacab --- /dev/null +++ b/src/db/models/publication_version/manager.rs @@ -0,0 +1,147 @@ +//! Manager for the `publication_version` model. +use super::PublicationVersion; +use crate::db::DatabaseTransaction; +use async_trait::async_trait; +use std::collections::HashSet; + +#[async_trait] +impl super::TxManager for DatabaseTransaction { + /// Upsert a new publication version into the database. + /// + /// # Errors + /// Errors if the publication version cannot be inserted into the database. + async fn create( + &mut self, + hash_id: &str, + publication_id: &str, + codified_date: &str, + ) -> anyhow::Result> { + let statement = " + INSERT OR IGNORE INTO publication_version ( id, publication_id, version ) + VALUES ( $1, $2, $3 ) + "; + let id = sqlx::query(statement) + .bind(hash_id) + .bind(publication_id) + .bind(codified_date) + .execute(&mut *self.tx) + .await? + .last_insert_id(); + Ok(id) + } + + /// Find the last inserted publication version by a `stele` and `publication`. + /// + /// # Errors + /// Errors if can't establish a connection to the database. + async fn find_last_inserted_date_by_publication_id( + &mut self, + publication_id: &str, + ) -> anyhow::Result> { + let statement = " + SELECT * + FROM publication_version + WHERE publication_id = $1 + ORDER BY version DESC + LIMIT 1 + "; + let row = sqlx::query_as::<_, PublicationVersion>(statement) + .bind(publication_id) + .fetch_one(&mut *self.tx) + .await + .ok(); + Ok(row) + } + + /// Find a publication version by `publication_id`. + /// + /// # Errors + /// Errors if can't establish a connection to the database. + async fn find_all_by_publication_id( + &mut self, + publication_id: &str, + ) -> anyhow::Result> { + let statement = " + SELECT * + FROM publication_version + WHERE publication_id = $1 + "; + let rows = sqlx::query_as::<_, PublicationVersion>(statement) + .bind(publication_id) + .fetch_all(&mut *self.tx) + .await?; + Ok(rows) + } + + /// Find all publication versions that contain the in `publication_has_publication_versions` table. + async fn find_all_in_publication_has_publication_versions( + &mut self, + publication_ids: Vec, + ) -> anyhow::Result> { + let parameters = publication_ids + .iter() + .map(|_| "?") + .collect::>() + .join(", "); + let statement = format!( + " + SELECT DISTINCT * + FROM publication_has_publication_versions phpv + JOIN publication_version pv ON pv.id = phpv.publication_version_id + WHERE phpv.publication_id IN ({parameters}) + " + ); + let mut query = sqlx::query_as::<_, PublicationVersion>(&statement); + for id in publication_ids { + query = query.bind(id); + } + let rows = query.fetch_all(&mut *self.tx).await?; + Ok(rows) + } + + /// Recursively find all publication versions starting from a given publication ID. + + /// This is necessary publication versions can be the same across publications. + /// To make versions query simpler, we walk the publication hierarchy starting from + /// `publication_name` looking for related publications. + /// The function returns all the `publication_version` IDs, even in simple cases where a publication + /// has no hierarchy. + /// + /// # Errors + /// Errors if can't establish a connection to the database. + async fn find_all_recursive_for_publication( + &mut self, + publication_id: String, + ) -> anyhow::Result> { + let mut versions: HashSet = self + .find_all_by_publication_id(&publication_id) + .await? + .into_iter() + .collect(); + let mut checked_publication_ids = HashSet::new(); + checked_publication_ids.insert(publication_id.clone()); + + let mut publication_ids_to_check = HashSet::new(); + publication_ids_to_check.insert(publication_id); + + while !publication_ids_to_check.is_empty() { + let new_versions: HashSet = self + .find_all_in_publication_has_publication_versions( + publication_ids_to_check.clone().into_iter().collect(), + ) + .await? + .into_iter() + .collect(); + versions.extend(new_versions.clone()); + checked_publication_ids.extend(publication_ids_to_check.clone()); + + publication_ids_to_check = new_versions + .clone() + .into_iter() + .filter(|pv| !checked_publication_ids.contains(&pv.publication_id.clone())) + .map(|pv| pv.publication_id) + .collect(); + } + Ok(versions.into_iter().collect()) + } +} diff --git a/src/db/models/publication_version/mod.rs b/src/db/models/publication_version/mod.rs new file mode 100644 index 0000000..392e7ef --- /dev/null +++ b/src/db/models/publication_version/mod.rs @@ -0,0 +1,62 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use sqlx::{any::AnyRow, FromRow, Row}; + +pub mod manager; + +/// Trait for managing transactions on publication versions. +#[async_trait] +pub trait TxManager { + /// Create a new publication version. + async fn create( + &mut self, + hash_id: &str, + publication_id: &str, + codified_date: &str, + ) -> anyhow::Result>; + /// Find the last inserted publication version by a `stele` and `publication`. + async fn find_last_inserted_date_by_publication_id( + &mut self, + publication_id: &str, + ) -> anyhow::Result>; + /// Find all publication versions by a `publication_id`. + async fn find_all_by_publication_id( + &mut self, + publication_id: &str, + ) -> anyhow::Result>; + /// Find all publication has publication versions by `publication` ids. + async fn find_all_in_publication_has_publication_versions( + &mut self, + publication_ids: Vec, + ) -> anyhow::Result>; + /// Find all publication versions by a `publication` and `stele` recursively. + async fn find_all_recursive_for_publication( + &mut self, + publication_id: String, + ) -> anyhow::Result>; +} + +#[derive(Deserialize, Serialize, Hash, Eq, PartialEq, Clone)] +/// Model for a Stele. +pub struct PublicationVersion { + /// A hashed identifier for the publication version. + /// The hash is generated from the `publication` name, `version` date and `stele` fields. + pub id: String, + /// Date in a publication in %Y-%m-%d format + pub version: String, + /// Foreign key reference to the publication table. + pub publication_id: String, + /// Reason for building the publication. + pub build_reason: Option, +} + +impl FromRow<'_, AnyRow> for PublicationVersion { + fn from_row(row: &AnyRow) -> anyhow::Result { + Ok(Self { + id: row.try_get("id")?, + version: row.try_get("version")?, + publication_id: row.try_get("publication_id")?, + build_reason: row.try_get("build_reason").ok(), + }) + } +} diff --git a/src/db/models/status/mod.rs b/src/db/models/status/mod.rs new file mode 100644 index 0000000..99d995e --- /dev/null +++ b/src/db/models/status/mod.rs @@ -0,0 +1,39 @@ +//! Utility enum representation for the status of a document or library. + +/// Enum representation for the status of a document or library. +pub enum Status { + /// 0 - Element added. + ElementAdded, + /// 1 - Element changed. + ElementChanged, + /// 2 - Element removed. + ElementRemoved, + /// 3 - Element effective. + ElementEffective, +} + +impl Status { + /// Convert a string to a `Status` enum. + /// # Errors + /// Returns an error if the string is not a valid status value. + pub fn from_string(status: &str) -> anyhow::Result { + match status { + "Element added" => Ok(Self::ElementAdded), + "Element changed" => Ok(Self::ElementChanged), + "Element removed" => Ok(Self::ElementRemoved), + "Element effective" => Ok(Self::ElementEffective), + _ => Err(anyhow::anyhow!("Invalid status value")), + } + } + + /// Convert a `Status` enum to an integer. + #[must_use] + pub const fn to_int(&self) -> i64 { + match *self { + Self::ElementAdded => 0, + Self::ElementEffective => 1, + Self::ElementChanged => 2, + Self::ElementRemoved => 3, + } + } +} diff --git a/src/db/models/stele/manager.rs b/src/db/models/stele/manager.rs new file mode 100644 index 0000000..3e0cc3a --- /dev/null +++ b/src/db/models/stele/manager.rs @@ -0,0 +1,23 @@ +//! Manager for the stele model. +use crate::db::DatabaseTransaction; +use async_trait::async_trait; + +#[async_trait] +impl super::TxManager for DatabaseTransaction { + /// Upsert a new stele into the database. + /// + /// # Errors + /// Errors if the stele cannot be inserted into the database. + async fn create(&mut self, stele: &str) -> anyhow::Result> { + let statement = " + INSERT OR IGNORE INTO stele ( name ) + VALUES ( $1 ) + "; + let id = sqlx::query(statement) + .bind(stele) + .execute(&mut *self.tx) + .await? + .last_insert_id(); + Ok(id) + } +} diff --git a/src/db/models/stele.rs b/src/db/models/stele/mod.rs similarity index 51% rename from src/db/models/stele.rs rename to src/db/models/stele/mod.rs index 61a5093..02c49c2 100644 --- a/src/db/models/stele.rs +++ b/src/db/models/stele/mod.rs @@ -1,5 +1,15 @@ +use async_trait::async_trait; use serde::{Deserialize, Serialize}; +pub mod manager; + +/// Trait for managing transactional stele. +#[async_trait] +pub trait TxManager { + /// Create a stele. + async fn create(&mut self, stele: &str) -> anyhow::Result>; +} + #[derive(sqlx::FromRow, Deserialize, Serialize)] /// Model for a Stele. pub struct Stele { diff --git a/src/db/models/version/manager.rs b/src/db/models/version/manager.rs new file mode 100644 index 0000000..faecb89 --- /dev/null +++ b/src/db/models/version/manager.rs @@ -0,0 +1,23 @@ +//! Manager for the version model. +use crate::db::DatabaseTransaction; +use async_trait::async_trait; + +#[async_trait] +impl super::TxManager for DatabaseTransaction { + /// Upsert a new version into the database. + /// + /// # Errors + /// Errors if the version cannot be inserted into the database. + async fn create(&mut self, codified_date: &str) -> anyhow::Result> { + let statement = " + INSERT OR IGNORE INTO version ( codified_date ) + VALUES ( $1 ) + "; + let id = sqlx::query(statement) + .bind(codified_date) + .execute(&mut *self.tx) + .await? + .last_insert_id(); + Ok(id) + } +} diff --git a/src/db/models/version.rs b/src/db/models/version/mod.rs similarity index 69% rename from src/db/models/version.rs rename to src/db/models/version/mod.rs index c64b859..d643a6f 100644 --- a/src/db/models/version.rs +++ b/src/db/models/version/mod.rs @@ -1,9 +1,13 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; +pub mod manager; /// Trait for managing versions. #[async_trait] -pub trait Manager {} +pub trait TxManager { + /// Create a new version. + async fn create(&mut self, codified_date: &str) -> anyhow::Result>; +} #[derive(sqlx::FromRow, Deserialize, Serialize, Debug, Eq, PartialEq)] /// Model for a version. diff --git a/src/db/statements/inserts.rs b/src/db/statements/inserts.rs deleted file mode 100644 index 1668c23..0000000 --- a/src/db/statements/inserts.rs +++ /dev/null @@ -1,410 +0,0 @@ -//! Central place for database queries in Stelae -use sqlx::types::chrono::NaiveDate; -use sqlx::QueryBuilder; - -use crate::db::models::changed_library_document::ChangedLibraryDocument; -use crate::db::models::document_change::DocumentChange; -use crate::db::models::library::Library; -use crate::db::models::library_change::LibraryChange; -use crate::db::models::publication_has_publication_versions::PublicationHasPublicationVersions; -use crate::db::DatabaseConnection; -use crate::db::DatabaseKind; - -/// Size of the batch for bulk inserts. -const BATCH_SIZE: usize = 1000; - -/// Upsert a new document into the database. -/// -/// # Errors -/// Errors if the document cannot be inserted into the database. -pub async fn create_document( - conn: &DatabaseConnection, - doc_id: &str, -) -> anyhow::Result> { - let id = match conn.kind { - DatabaseKind::Sqlite => { - let statement = " - INSERT OR IGNORE INTO document ( doc_id ) - VALUES ( $1 ) - "; - let mut connection = conn.pool.acquire().await?; - sqlx::query(statement) - .bind(doc_id) - .execute(&mut *connection) - .await? - .last_insert_id() - } - DatabaseKind::Postgres => { - let statement = " - INSERT INTO document ( doc_id ) - VALUES ( $1 ) - ON CONFLICT ( doc_id ) DO NOTHING; - "; - let mut connection = conn.pool.acquire().await?; - sqlx::query(statement) - .bind(doc_id) - .execute(&mut *connection) - .await? - .last_insert_id() - } - }; - Ok(id) -} - -/// Upsert a bulk of document changes into the database. -/// -/// # Errors -/// Errors if the document changes cannot be inserted into the database. -pub async fn insert_document_changes_bulk( - conn: &DatabaseConnection, - document_changes: Vec, -) -> anyhow::Result<()> { - match conn.kind { - DatabaseKind::Sqlite => { - let mut connection = conn.pool.acquire().await?; - let mut query_builder = QueryBuilder::new("INSERT OR IGNORE INTO document_change (doc_mpath, status, url, change_reason, publication, version, stele, doc_id) "); - for chunk in document_changes.chunks(BATCH_SIZE) { - query_builder.push_values(chunk, |mut bindings, dc| { - bindings - .push_bind(&dc.doc_mpath) - .push_bind(&dc.status) - .push_bind(&dc.url) - .push_bind(&dc.change_reason) - .push_bind(&dc.publication) - .push_bind(&dc.version) - .push_bind(&dc.stele) - .push_bind(&dc.doc_id); - }); - let query = query_builder.build(); - query.execute(&mut *connection).await?; - query_builder.reset(); - } - } - DatabaseKind::Postgres => { - anyhow::bail!("Not supported yet") - } - }; - - Ok(()) -} - -/// Upsert a bulk of libraries into the database. -/// -/// # Errors -/// Errors if the libraries cannot be inserted into the database. -pub async fn insert_library_bulk( - conn: &DatabaseConnection, - libraries: Vec, -) -> anyhow::Result<()> { - match conn.kind { - DatabaseKind::Sqlite => { - let mut connection = conn.pool.acquire().await?; - let mut query_builder = QueryBuilder::new("INSERT OR IGNORE INTO library (mpath) "); - for chunk in libraries.chunks(BATCH_SIZE) { - query_builder.push_values(chunk, |mut bindings, lb| { - bindings.push_bind(&lb.mpath); - }); - let query = query_builder.build(); - query.execute(&mut *connection).await?; - } - } - DatabaseKind::Postgres => { - anyhow::bail!("Not supported yet") - } - } - Ok(()) -} - -/// Upsert a bulk of library changes into the database. -/// -/// # Errors -/// Errors if the library changes cannot be inserted into the database. -pub async fn insert_library_changes_bulk( - conn: &DatabaseConnection, - library_changes: Vec, -) -> anyhow::Result<()> { - match conn.kind { - DatabaseKind::Sqlite => { - let mut connection = conn.pool.acquire().await?; - let mut query_builder = QueryBuilder::new("INSERT OR IGNORE INTO library_change (library_mpath, publication, version, stele, status, url) "); - for chunk in library_changes.chunks(BATCH_SIZE) { - query_builder.push_values(chunk, |mut bindings, lc| { - bindings - .push_bind(&lc.library_mpath) - .push_bind(&lc.publication) - .push_bind(&lc.version) - .push_bind(&lc.stele) - .push_bind(&lc.status) - .push_bind(&lc.url); - }); - let query = query_builder.build(); - query.execute(&mut *connection).await?; - } - } - DatabaseKind::Postgres => { - anyhow::bail!("Not supported yet") - } - } - Ok(()) -} - -/// Upsert a bulk of changed library documents into the database. -/// -/// # Errors -/// Errors if the changed library documents cannot be inserted into the database. -pub async fn insert_changed_library_document_bulk( - conn: &DatabaseConnection, - changed_library_document: Vec, -) -> anyhow::Result<()> { - match conn.kind { - DatabaseKind::Sqlite => { - let mut connection = conn.pool.acquire().await?; - let mut query_builder = QueryBuilder::new("INSERT OR IGNORE INTO changed_library_document (publication, version, stele, doc_mpath, status, library_mpath, url) "); - for chunk in changed_library_document.chunks(BATCH_SIZE) { - query_builder.push_values(chunk, |mut bindings, cl| { - bindings - .push_bind(&cl.publication) - .push_bind(&cl.version) - .push_bind(&cl.stele) - .push_bind(&cl.doc_mpath) - .push_bind(&cl.status) - .push_bind(&cl.library_mpath) - .push_bind(&cl.url); - }); - let query = query_builder.build(); - query.execute(&mut *connection).await?; - } - } - DatabaseKind::Postgres => { - anyhow::bail!("Not supported yet") - } - } - Ok(()) -} - -/// Upsert a bulk of `publication_has_publication_versions` into the database. -/// -/// # Errors -/// Errors if the `publication_has_publication_versions` cannot be inserted into the database. -pub async fn insert_publication_has_publication_versions_bulk( - conn: &DatabaseConnection, - publication_has_publication_versions: Vec, -) -> anyhow::Result<()> { - match conn.kind { - DatabaseKind::Sqlite => { - let mut connection = conn.pool.acquire().await?; - let mut query_builder = QueryBuilder::new("INSERT OR IGNORE INTO publication_has_publication_versions (publication, referenced_publication, referenced_version, stele) "); - for chunk in publication_has_publication_versions.chunks(BATCH_SIZE) { - query_builder.push_values(chunk, |mut bindings, pb| { - bindings - .push_bind(&pb.publication) - .push_bind(&pb.referenced_publication) - .push_bind(&pb.referenced_version) - .push_bind(&pb.stele); - }); - let query = query_builder.build(); - query.execute(&mut *connection).await?; - } - } - DatabaseKind::Postgres => { - anyhow::bail!("Not supported yet") - } - } - Ok(()) -} - -/// Upsert a new publication into the database. -/// # Errors -/// Errors if the publication cannot be inserted into the database. -pub async fn create_publication( - conn: &DatabaseConnection, - name: &str, - date: &NaiveDate, - stele: &str, - last_valid_publication_name: Option, - last_valid_version: Option, -) -> anyhow::Result> { - let id = match conn.kind { - DatabaseKind::Sqlite => { - let statement = " - INSERT OR IGNORE INTO publication ( name, date, stele, revoked, last_valid_publication_name, last_valid_version ) - VALUES ( $1, $2, $3, FALSE, $4, $5 ) - "; - let mut connection = conn.pool.acquire().await?; - sqlx::query(statement) - .bind(name) - .bind(date.to_string()) - .bind(stele) - .bind(last_valid_publication_name) - .bind(last_valid_version) - .execute(&mut *connection) - .await? - .last_insert_id() - } - DatabaseKind::Postgres => { - let statement = " - INSERT INTO publication ( name, date, stele, revoked, last_valid_publication_name, last_valid_version ) - VALUES ( $1, $2, $3, FALSE, $4, $5 ) - ON CONFLICT ( name, stele ) DO NOTHING; - "; - let mut connection = conn.pool.acquire().await?; - sqlx::query(statement) - .bind(name) - .bind(date.to_string()) - .bind(stele) - .bind(last_valid_publication_name) - .bind(last_valid_version) - .execute(&mut *connection) - .await? - .last_insert_id() - } - }; - Ok(id) -} - -/// Upsert a new stele into the database. -/// -/// # Errors -/// Errors if the stele cannot be inserted into the database. -pub async fn create_stele(conn: &DatabaseConnection, stele: &str) -> anyhow::Result> { - let id = match conn.kind { - DatabaseKind::Sqlite => { - let statement = " - INSERT OR IGNORE INTO stele ( name ) - VALUES ( $1 ) - "; - let mut connection = conn.pool.acquire().await?; - sqlx::query(statement) - .bind(stele) - .execute(&mut *connection) - .await? - .last_insert_id() - } - DatabaseKind::Postgres => { - let statement = " - INSERT INTO stele ( name ) - VALUES ( $1 ) - ON CONFLICT ( name ) DO NOTHING; - "; - let mut connection = conn.pool.acquire().await?; - sqlx::query(statement) - .bind(stele) - .execute(&mut *connection) - .await? - .last_insert_id() - } - }; - Ok(id) -} - -/// Upsert a new version into the database. -/// -/// # Errors -/// Errors if the version cannot be inserted into the database. -pub async fn create_version( - conn: &DatabaseConnection, - codified_date: &str, -) -> anyhow::Result> { - let id = match conn.kind { - DatabaseKind::Sqlite => { - let statement = " - INSERT OR IGNORE INTO version ( codified_date ) - VALUES ( $1 ) - "; - let mut connection = conn.pool.acquire().await?; - sqlx::query(statement) - .bind(codified_date) - .execute(&mut *connection) - .await? - .last_insert_id() - } - DatabaseKind::Postgres => { - let statement = " - INSERT INTO version ( codified_date ) - VALUES ( $1 ) - ON CONFLICT ( codified_date ) DO NOTHING; - "; - let mut connection = conn.pool.acquire().await?; - sqlx::query(statement) - .bind(codified_date) - .execute(&mut *connection) - .await? - .last_insert_id() - } - }; - Ok(id) -} - -/// Upsert a new publication version into the database. -/// -/// # Errors -/// Errors if the publication version cannot be inserted into the database. -pub async fn create_publication_version( - conn: &DatabaseConnection, - publication: &str, - codified_date: &str, - stele: &str, -) -> anyhow::Result> { - let id = match conn.kind { - DatabaseKind::Sqlite => { - let statement = " - INSERT OR IGNORE INTO publication_version ( publication, version, stele ) - VALUES ( $1, $2, $3 ) - "; - let mut connection = conn.pool.acquire().await?; - sqlx::query(statement) - .bind(publication) - .bind(codified_date) - .bind(stele) - .execute(&mut *connection) - .await? - .last_insert_id() - } - DatabaseKind::Postgres => { - let statement = " - INSERT INTO publication_version ( publication, version, stele ) - VALUES ( $1, $2, $3 ) - ON CONFLICT ( publication, version, stele ) DO NOTHING; - "; - let mut connection = conn.pool.acquire().await?; - sqlx::query(statement) - .bind(publication) - .bind(codified_date) - .bind(stele) - .execute(&mut *connection) - .await? - .last_insert_id() - } - }; - Ok(id) -} - -/// Update a publication by name and stele to be revoked. -/// -/// # Errors -/// Errors if the publication cannot be updated. -pub async fn update_publication_by_name_and_stele_set_revoked_true( - conn: &DatabaseConnection, - name: &str, - stele: &str, -) -> anyhow::Result<()> { - let statement = " - UPDATE publication - SET revoked = TRUE - WHERE name = $1 AND stele = $2 - "; - match conn.kind { - DatabaseKind::Sqlite => { - let mut connection = conn.pool.acquire().await?; - sqlx::query(statement) - .bind(name) - .bind(stele) - .execute(&mut *connection) - .await?; - } - DatabaseKind::Postgres => { - unimplemented!() - } - } - Ok(()) -} diff --git a/src/db/statements/mod.rs b/src/db/statements/mod.rs deleted file mode 100644 index 3116fd5..0000000 --- a/src/db/statements/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -// Contains all the statements for the database -pub mod inserts; -// Contains all the queries for the database -pub mod queries; diff --git a/src/db/statements/queries.rs b/src/db/statements/queries.rs deleted file mode 100644 index 54cb529..0000000 --- a/src/db/statements/queries.rs +++ /dev/null @@ -1,516 +0,0 @@ -//! Central place for database queries - -use async_trait::async_trait; -use chrono::NaiveDate; - -use crate::db::models::publication::{self, Publication}; -use crate::db::models::publication_version::PublicationVersion; -use crate::db::models::stele::Stele; -use crate::db::models::version::Version; -use crate::db::models::{document_change, library_change}; -use crate::db::DatabaseConnection; -use std::collections::HashSet; - -use crate::db::DatabaseKind; - -/// Find a stele by `name`. -/// -/// # Errors -/// Errors if can't establish a connection to the database. -pub async fn find_stele_by_name(conn: &DatabaseConnection, name: &str) -> anyhow::Result { - let statement = " - SELECT * - FROM stele - WHERE name = $1 - "; - let row = match conn.kind { - DatabaseKind::Postgres | DatabaseKind::Sqlite => { - let mut connection = conn.pool.acquire().await?; - sqlx::query_as::<_, Stele>(statement) - .bind(name) - .fetch_one(&mut *connection) - .await? - } - }; - Ok(row) -} - -/// Find the last inserted publication by `stele_id`. -/// This function is then used to incrementally insert new change objects -/// -/// # Errors -/// Errors if can't establish a connection to the database. -pub async fn find_last_inserted_publication( - conn: &DatabaseConnection, - stele: &str, -) -> anyhow::Result> { - let statement = " - SELECT * - FROM publication - WHERE revoked = 0 AND stele = $1 - ORDER BY date DESC - LIMIT 1 - "; - let row = match conn.kind { - DatabaseKind::Sqlite => { - let mut connection = conn.pool.acquire().await?; - sqlx::query_as::<_, Publication>(statement) - .bind(stele) - .fetch_one(&mut *connection) - .await - .ok() - } - DatabaseKind::Postgres => { - unimplemented!() - } - }; - Ok(row) -} - -/// Find a publication by `name` and `date` and `stele_id`. -/// -/// # Errors -/// Errors if can't establish a connection to the database. -pub async fn find_publication_by_name_and_stele( - conn: &DatabaseConnection, - name: &str, - stele: &str, -) -> anyhow::Result { - let statement = " - SELECT * - FROM publication - WHERE name = $1 AND stele = $2 - "; - let row = match conn.kind { - DatabaseKind::Sqlite => { - let mut connection = conn.pool.acquire().await?; - sqlx::query_as::<_, Publication>(statement) - .bind(name) - .bind(stele) - .fetch_one(&mut *connection) - .await? - } - DatabaseKind::Postgres => { - unimplemented!() - } - }; - Ok(row) -} - -/// Find a publication version by `publication_id` and `version`. -/// -/// # Errors -/// Errors if can't establish a connection to the database. -pub async fn find_all_publication_versions_by_publication_name_and_stele( - conn: &DatabaseConnection, - publication: &str, - stele: &str, -) -> anyhow::Result> { - let statement = " - SELECT * - FROM publication_version - WHERE publication = $1 AND stele = $2 - "; - let rows = match conn.kind { - DatabaseKind::Sqlite => { - let mut connection = conn.pool.acquire().await?; - sqlx::query_as::<_, PublicationVersion>(statement) - .bind(publication) - .bind(stele) - .fetch_all(&mut *connection) - .await? - } - DatabaseKind::Postgres => { - unimplemented!() - } - }; - Ok(rows) -} - -/// Find all publication versions in `publications`. -async fn find_all_publication_versions_in_publication_has_publication_versions( - conn: &DatabaseConnection, - publications: Vec, - stele: &str, -) -> anyhow::Result> { - let parameters = publications - .iter() - .map(|_| "?") - .collect::>() - .join(", "); - let rows = match conn.kind { - DatabaseKind::Sqlite => { - let mut connection = conn.pool.acquire().await?; - - let statement = format!(" - SELECT DISTINCT pv.publication, pv.version - FROM publication_version pv - LEFT JOIN publication_has_publication_versions phpv ON pv.publication = phpv.referenced_publication AND pv.version = phpv.referenced_version - WHERE phpv.publication IN ({parameters} AND pv.stele = ?) - "); - - let mut query = sqlx::query_as::<_, PublicationVersion>(&statement); - for publication in publications { - query = query.bind(publication); - } - query = query.bind(stele); - - query.fetch_all(&mut *connection).await? - } - DatabaseKind::Postgres => { - unimplemented!() - } - }; - Ok(rows) -} - -/// Recursively find all publication versions starting from a given publication ID. - -/// This is necessary publication versions can be the same across publications. -/// To make versions query simpler, we walk the publication hierarchy starting from -/// `publication_name` looking for related publications. -/// The function returns all the `publication_version` IDs, even in simple cases where a publication -/// has no hierarchy. -/// -/// # Errors -/// Errors if can't establish a connection to the database. -pub async fn find_publication_versions_for_publication( - conn: &DatabaseConnection, - publication_name: String, - stele: String, -) -> anyhow::Result> { - let mut versions: HashSet = - find_all_publication_versions_by_publication_name_and_stele( - conn, - &publication_name, - &stele, - ) - .await? - .into_iter() - .collect(); - - let mut checked_publication_names = HashSet::new(); - checked_publication_names.insert(publication_name.clone()); - - let mut publication_names_to_check = HashSet::new(); - publication_names_to_check.insert(publication_name); - - while !publication_names_to_check.is_empty() { - let new_versions: HashSet = - find_all_publication_versions_in_publication_has_publication_versions( - conn, - publication_names_to_check.clone().into_iter().collect(), - &stele, - ) - .await? - .into_iter() - .collect(); - versions.extend(new_versions.clone()); - - checked_publication_names.extend(publication_names_to_check.clone()); - - publication_names_to_check = new_versions - .clone() - .into_iter() - .filter(|pv| !checked_publication_names.contains(&pv.publication.clone())) - .map(|pv| pv.publication) - .collect(); - } - Ok(versions.into_iter().collect()) -} - -/// Find all publication names by date and stele. -/// -/// # Errors -/// Errors if can't establish a connection to the database. -pub async fn find_all_publications_by_date_and_stele_order_by_name_desc( - conn: &DatabaseConnection, - date: String, - stele: String, -) -> anyhow::Result> { - let statement = " - SELECT * - FROM publication - WHERE date = $1 AND stele = $2 - ORDER BY name DESC - "; - let rows = match conn.kind { - DatabaseKind::Sqlite => { - let mut connection = conn.pool.acquire().await?; - sqlx::query_as::<_, Publication>(statement) - .bind(date) - .bind(stele) - .fetch_all(&mut *connection) - .await? - } - DatabaseKind::Postgres => { - unimplemented!(); - } - }; - Ok(rows) -} - -/// Find last inserted publication version in DB. -/// Used when partially inserted new changes to the database. -/// -/// # Errors -/// Errors if can't establish a connection to the database. -pub async fn find_last_inserted_publication_version_by_publication_and_stele( - conn: &DatabaseConnection, - publication: &str, - stele: &str, -) -> anyhow::Result> { - let statement = " - SELECT * - FROM publication_version - WHERE publication = $1 AND stele = $2 - ORDER BY version DESC - LIMIT 1 - "; - let row = match conn.kind { - DatabaseKind::Sqlite | DatabaseKind::Postgres => { - let mut connection = conn.pool.acquire().await?; - sqlx::query_as::<_, PublicationVersion>(statement) - .bind(publication) - .bind(stele) - .fetch_one(&mut *connection) - .await - .ok() - } - }; - Ok(row) -} - -#[async_trait] -impl document_change::Manager for DatabaseConnection { - /// Find one document materialized path by url. - /// - /// # Errors - /// Errors if can't establish a connection to the database. - async fn find_doc_mpath_by_url(&self, url: &str) -> anyhow::Result { - let statement = " - SELECT doc_mpath - FROM document_change - WHERE url = $1 - LIMIT 1 - "; - let row = match self.kind { - DatabaseKind::Postgres | DatabaseKind::Sqlite => { - let mut connection = self.pool.acquire().await?; - sqlx::query_as::<_, (String,)>(statement) - .bind(url) - .fetch_one(&mut *connection) - .await? - } - }; - Ok(row.0) - } - - /// All dates on which given document changed. - /// - /// # Errors - /// Errors if can't establish a connection to the database. - async fn find_all_document_versions_by_mpath_and_publication( - &self, - mpath: &str, - publication: &str, - ) -> anyhow::Result> { - let mut statement = " - SELECT DISTINCT phpv.referenced_version as codified_date - FROM document_change dc - LEFT JOIN publication_has_publication_versions phpv - ON dc.publication = phpv.referenced_publication - AND dc.version = phpv.referenced_version - WHERE dc.doc_mpath LIKE $1 AND phpv.publication = $2 - "; - let mut rows = match self.kind { - DatabaseKind::Postgres | DatabaseKind::Sqlite => { - let mut connection = self.pool.acquire().await?; - sqlx::query_as::<_, Version>(statement) - .bind(format!("{mpath}%")) - .bind(publication) - .fetch_all(&mut *connection) - .await? - } - }; - statement = " - SELECT phpv.referenced_version as codified_date - FROM document_change dc - LEFT JOIN publication_has_publication_versions phpv - ON dc.publication = phpv.referenced_publication - AND dc.version = phpv.referenced_version - WHERE dc.doc_mpath = $1 - AND phpv.publication = $2 - AND dc.status = 'Element added' - LIMIT 1 - "; - let element_added = match self.kind { - DatabaseKind::Postgres | DatabaseKind::Sqlite => { - let mut connection = self.pool.acquire().await?; - sqlx::query_as::<_, Version>(statement) - .bind(mpath) - .bind(publication) - .fetch_one(&mut *connection) - .await - .ok() - } - }; - - if element_added.is_none() { - // When element doesn't have date added, it means we're looking - // at an old publication and this element doesn't yet exist in it - rows.sort_by(|v1, v2| v2.codified_date.cmp(&v1.codified_date)); - return Ok(rows); - } - - statement = " - SELECT phpv.referenced_version as codified_date - FROM document_change dc - LEFT JOIN publication_has_publication_versions phpv - ON dc.publication = phpv.referenced_publication - AND dc.version = phpv.referenced_version - WHERE dc.doc_mpath = $1 - AND phpv.publication = $2 - AND dc.status = 'Element effective' - LIMIT 1 - "; - let mut doc = mpath.split('|').next().unwrap_or("").to_owned(); - doc.push('|'); - - let document_effective = match self.kind { - DatabaseKind::Postgres | DatabaseKind::Sqlite => { - let mut connection = self.pool.acquire().await?; - sqlx::query_as::<_, Version>(statement) - .bind(doc) - .bind(publication) - .fetch_one(&mut *connection) - .await - .ok() - } - }; - - if let (Some(doc_effective), Some(el_added)) = (document_effective, element_added) { - if !rows.contains(&doc_effective) - && NaiveDate::parse_from_str(&doc_effective.codified_date, "%Y-%m-%d") - .unwrap_or_default() - > NaiveDate::parse_from_str(&el_added.codified_date, "%Y-%m-%d") - .unwrap_or_default() - { - rows.push(doc_effective); - } - } - rows.sort_by(|v1, v2| v2.codified_date.cmp(&v1.codified_date)); - Ok(rows) - } -} - -#[async_trait] -impl library_change::Manager for DatabaseConnection { - /// Find one library materialized path by url. - /// - /// # Errors - /// Errors if can't establish a connection to the database. - async fn find_lib_mpath_by_url(&self, url: &str) -> anyhow::Result { - let statement = " - SELECT library_mpath - FROM library_change - WHERE url = $1 - LIMIT 1 - "; - let row = match self.kind { - DatabaseKind::Postgres | DatabaseKind::Sqlite => { - let mut connection = self.pool.acquire().await?; - sqlx::query_as::<_, (String,)>(statement) - .bind(url) - .fetch_one(&mut *connection) - .await? - } - }; - Ok(row.0) - } - /// All dates on which documents from this collection changed. - /// - /// # Errors - /// Errors if can't establish a connection to the database. - async fn find_all_collection_versions_by_mpath_and_publication( - &self, - mpath: &str, - publication: &str, - ) -> anyhow::Result> { - let mut statement = " - SELECT DISTINCT phpv.referenced_version as codified_date - FROM changed_library_document cld - LEFT JOIN publication_has_publication_versions phpv - ON cld.publication = phpv.referenced_publication - AND cld.version = phpv.referenced_version - WHERE cld.library_mpath LIKE $1 AND phpv.publication = $2 - "; - let mut rows = match self.kind { - DatabaseKind::Postgres | DatabaseKind::Sqlite => { - let mut connection = self.pool.acquire().await?; - sqlx::query_as::<_, Version>(statement) - .bind(format!("{mpath}%")) - .bind(publication) - .fetch_all(&mut *connection) - .await? - } - }; - statement = " - SELECT DISTINCT phpv.referenced_version as codified_date - FROM library_change lc - LEFT JOIN publication_has_publication_versions phpv - ON lc.publication = phpv.referenced_publication - AND lc.version = phpv.referenced_version - WHERE lc.library_mpath LIKE $1 AND lc.status = 'Element added' AND phpv.publication = $2 - LIMIT 1 - "; - let element_added = match self.kind { - DatabaseKind::Postgres | DatabaseKind::Sqlite => { - let mut connection = self.pool.acquire().await?; - sqlx::query_as::<_, Version>(statement) - .bind(format!("{mpath}%")) - .bind(publication) - .fetch_one(&mut *connection) - .await - .ok() - } - }; - - if let Some(el_added) = element_added { - if !rows.contains(&el_added) { - rows.push(el_added); - } - } - rows.sort_by(|v1, v2| v2.codified_date.cmp(&v1.codified_date)); - Ok(rows) - } -} - -#[async_trait] -impl publication::Manager for DatabaseConnection { - /// Find all publications which are not revoked for a given stele. - /// - /// # Errors - /// Errors if can't establish a connection to the database. - async fn find_all_non_revoked_publications( - &self, - stele: &str, - ) -> anyhow::Result> { - let statement = " - SELECT * - FROM publication - WHERE revoked = 0 AND stele = $1 - ORDER BY name DESC - "; - let rows = match self.kind { - DatabaseKind::Postgres | DatabaseKind::Sqlite => { - let mut connection = self.pool.acquire().await?; - sqlx::query_as::<_, Publication>(statement) - .bind(stele) - .fetch_all(&mut *connection) - .await? - } - }; - Ok(rows) - } -} diff --git a/src/history/changes.rs b/src/history/changes.rs index 104dc64..a8d1950 100644 --- a/src/history/changes.rs +++ b/src/history/changes.rs @@ -1,27 +1,32 @@ //! Module for inserting changes into the database -#![allow(clippy::exit, clippy::shadow_reuse, clippy::future_not_send)] -use crate::db::models::changed_library_document::ChangedLibraryDocument; -use crate::db::models::document_change::DocumentChange; -use crate::db::models::library::Library; -use crate::db::models::library_change::LibraryChange; -use crate::db::models::publication::Publication; -use crate::db::models::publication_has_publication_versions::PublicationHasPublicationVersions; -use crate::db::statements::inserts::{ - create_document, create_publication, create_publication_version, create_stele, create_version, - insert_changed_library_document_bulk, insert_document_changes_bulk, insert_library_bulk, - insert_library_changes_bulk, insert_publication_has_publication_versions_bulk, - update_publication_by_name_and_stele_set_revoked_true, -}; -use crate::db::statements::queries::{ - find_all_publications_by_date_and_stele_order_by_name_desc, find_last_inserted_publication, - find_last_inserted_publication_version_by_publication_and_stele, - find_publication_by_name_and_stele, find_publication_versions_for_publication, +#![allow( + clippy::exit, + clippy::shadow_reuse, + clippy::future_not_send, + clippy::string_add +)] +use super::rdf::graph::Bag; +use crate::db::models::changed_library_document::{self, ChangedLibraryDocument}; +use crate::db::models::document_change::{self, DocumentChange}; +use crate::db::models::document_element::DocumentElement; +use crate::db::models::library::{self, Library}; +use crate::db::models::library_change::{self, LibraryChange}; +use crate::db::models::publication::{self, Publication}; +use crate::db::models::publication_has_publication_versions::{ + self, PublicationHasPublicationVersions, }; +use crate::db::models::publication_version; +use crate::db::models::status::Status; +use crate::db::models::{document, document_element}; +use crate::db::models::{stele, version}; +use crate::db::{DatabaseTransaction, Tx}; use crate::history::rdf::graph::StelaeGraph; use crate::history::rdf::namespaces::{dcterms, oll}; +use crate::server::errors::CliError; use crate::stelae::stele::Stele; use crate::utils::archive::get_name_parts; use crate::utils::git::Repo; +use crate::utils::md5; use crate::{ db::{self, DatabaseConnection}, stelae::archive::Archive, @@ -36,49 +41,34 @@ use std::{ borrow::ToOwned, io::{self, BufReader}, path::{Path, PathBuf}, - process, result::Result, }; -use super::rdf::graph::Bag; - /// Inserts changes from the archive into the database /// /// # Errors /// Errors if the changes cannot be inserted into the archive #[actix_web::main] -pub async fn insert( - raw_archive_path: &str, - archive_path: PathBuf, - stele: Option, -) -> io::Result<()> { +#[tracing::instrument(name = "Stelae update", skip(raw_archive_path, archive_path))] +pub async fn insert(raw_archive_path: &str, archive_path: PathBuf) -> Result<(), CliError> { let conn = match db::init::connect(&archive_path).await { Ok(conn) => conn, Err(err) => { tracing::error!( - "error: could not connect to database. Confirm that DATABASE_URL env var is set correctly." + "error: could not connect to database. + Confirm that `db.sqlite3` exists in `.stelae` dir or that DATABASE_URL env var is set correctly." ); - tracing::error!("Error: {:?}", err); - process::exit(1); + tracing::error!("Error: {err:?}"); + return Err(CliError::DatabaseConnectionError); } }; - tracing::info!("Inserting history into archive"); - if let Some(_stele) = stele { - insert_changes_single_stele()?; - } else { - insert_changes_archive(&conn, raw_archive_path, &archive_path) - .await - .unwrap_or_else(|err| { - tracing::error!("Failed to insert changes into archive"); - tracing::error!("{:?}", err); - }); - } - Ok(()) -} - -/// Insert changes for a single stele instead of an entire archive -fn insert_changes_single_stele() -> io::Result<()> { - unimplemented!() + insert_changes_archive(&conn, raw_archive_path, &archive_path) + .await + .map_err(|err| { + tracing::error!("Failed to insert changes into archive"); + tracing::error!("{err:?}"); + CliError::GenericError + }) } /// Insert changes from the archive into the database @@ -87,6 +77,8 @@ async fn insert_changes_archive( raw_archive_path: &str, archive_path: &Path, ) -> anyhow::Result<()> { + tracing::debug!("Inserting history into archive"); + let archive = Archive::parse( archive_path.to_path_buf(), &PathBuf::from(raw_archive_path), @@ -97,15 +89,11 @@ async fn insert_changes_archive( for (name, mut stele) in archive.get_stelae() { match process_stele(conn, &name, &mut stele, archive_path).await { Ok(()) => (), - Err(err) => errors.push(err), + Err(err) => errors.push(format!("{name}: {err}")), } } if !errors.is_empty() { - let error_msg = errors - .into_iter() - .map(|err| err.to_string()) - .collect::>() - .join("\n"); + let error_msg = errors.into_iter().collect::>().join("\n"); return Err(anyhow::anyhow!( "Errors occurred while inserting changes:\n{error_msg}" )); @@ -151,13 +139,17 @@ async fn insert_changes_from_rdf_repository( ) -> anyhow::Result<()> { tracing::info!("Inserting changes from RDF repository: {}", stele_id); tracing::info!("RDF repository path: {}", rdf_repo.path.display()); - let tx = conn.pool.begin().await?; - match load_delta_for_stele(conn, &rdf_repo, stele_id).await { + let mut tx = DatabaseTransaction { + tx: conn.pool.begin().await?, + }; + match load_delta_for_stele(&mut tx, &rdf_repo, stele_id).await { Ok(()) => { + tracing::debug!("Applying transaction for stele: {stele_id}"); tx.commit().await?; Ok(()) } Err(err) => { + tracing::error!("Rolling back transaction for stele: {stele_id} due to error: {err:?}"); tx.rollback().await?; Err(err) } @@ -166,17 +158,17 @@ async fn insert_changes_from_rdf_repository( /// Load deltas from the publications async fn load_delta_for_stele( - conn: &DatabaseConnection, + tx: &mut DatabaseTransaction, rdf_repo: &Repo, stele: &str, ) -> anyhow::Result<()> { - create_stele(conn, stele).await?; - if let Some(publication) = find_last_inserted_publication(conn, stele).await? { + stele::TxManager::create(tx, stele).await?; + if let Some(publication) = publication::TxManager::find_last_inserted(tx, stele).await? { tracing::info!("Inserting changes from last inserted publication"); - load_delta_from_publications(conn, rdf_repo, stele, Some(publication)).await?; + load_delta_from_publications(tx, rdf_repo, stele, Some(publication)).await?; } else { tracing::info!("Inserting changes from beginning for stele: {}", stele); - load_delta_from_publications(conn, rdf_repo, stele, None).await?; + load_delta_from_publications(tx, rdf_repo, stele, None).await?; } Ok(()) } @@ -187,7 +179,7 @@ async fn load_delta_for_stele( /// Errors if the delta cannot be loaded from the publications #[allow(clippy::unwrap_used)] async fn load_delta_from_publications( - conn: &DatabaseConnection, + tx: &mut DatabaseTransaction, rdf_repo: &Repo, stele: &str, last_inserted_publication: Option, @@ -225,41 +217,63 @@ async fn load_delta_from_publications( // skip past publications since they are already in db continue; } - last_inserted_date = find_last_inserted_publication_version_by_publication_and_stele( - conn, &pub_name, stele, - ) - .await? - .map(|pv| { - NaiveDate::parse_from_str(&pv.version, "%Y-%m-%d").context("Could not parse date") - }) - .and_then(Result::ok); + last_inserted_date = + publication_version::TxManager::find_last_inserted_date_by_publication_id( + tx, + &last_inserted_pub.id, + ) + .await? + .map(|pv| { + NaiveDate::parse_from_str(&pv.version, "%Y-%m-%d") + .context("Could not parse date") + }) + .and_then(Result::ok); } tracing::info!("[{stele}] | Publication: {pub_name}"); publication_tree.walk(TreeWalkMode::PreOrder, |_, entry| { let path_name = entry.name().unwrap_or_default(); if path_name.contains(".rdf") { - let current_blob = rdf_repo.repo.find_blob(entry.id()).unwrap(); - let current_content = current_blob.content(); - parser::parse_bufread(BufReader::new(current_content)) - .add_to_graph(&mut pub_graph.fast_graph) - .unwrap(); + match rdf_repo.repo.find_blob(entry.id()) { + Ok(current_blob) => { + let current_content = current_blob.content(); + if let Err(err) = parser::parse_bufread(BufReader::new(current_content)) + .add_to_graph(&mut pub_graph.fast_graph) + { + tracing::error!( + "Error adding content to graph for entry {path_name}: {err:?}" + ); + } + } + Err(err) => { + tracing::error!("Error finding blob for entry {path_name}: {err:?}"); + } + } } TreeWalkResult::Ok })?; - let (last_valid_pub_name, last_valid_codified_date) = referenced_publication_information(&pub_graph); - create_publication( - conn, + let publication_hash = md5::compute(pub_name.clone() + stele); + let last_inserted_pub_id = if let Some(valid_pub_name) = last_valid_pub_name { + let last_inserted_pub = + publication::TxManager::find_by_name_and_stele(tx, &valid_pub_name, stele).await?; + Some(last_inserted_pub.id) + } else { + None + }; + publication::TxManager::create( + tx, + &publication_hash, &pub_name, &pub_date, stele, - last_valid_pub_name, + last_inserted_pub_id, last_valid_codified_date, ) .await?; - let publication = find_publication_by_name_and_stele(conn, &pub_name, stele).await?; - load_delta_for_publication(conn, publication, &pub_graph, last_inserted_date).await?; + let publication = + publication::TxManager::find_by_name_and_stele(tx, &pub_name, stele).await?; + load_delta_for_publication(tx, publication, &pub_graph, last_inserted_date).await?; } Ok(()) } @@ -269,7 +283,7 @@ async fn load_delta_from_publications( /// # Errors /// Errors if database connection fails or if delta cannot be loaded for the publication async fn load_delta_for_publication( - conn: &DatabaseConnection, + tx: &mut DatabaseTransaction, publication: Publication, pub_graph: &StelaeGraph, last_inserted_date: Option, @@ -280,7 +294,7 @@ async fn load_delta_for_publication( pub_graph.all_iris_from_triple_matching(None, None, Some(oll::CollectionVersion))?; insert_document_changes( - conn, + tx, &last_inserted_date, pub_document_versions, pub_graph, @@ -289,27 +303,30 @@ async fn load_delta_for_publication( .await?; insert_library_changes( - conn, + tx, &last_inserted_date, pub_collection_versions, pub_graph, &publication, ) .await?; - insert_shared_publication_versions_for_publication(conn, &publication).await?; - revoke_same_date_publications(conn, publication).await?; + insert_shared_publication_versions_for_publication(tx, &publication).await?; + + revoke_same_date_publications(tx, publication).await?; + Ok(()) } /// Insert document changes into the database async fn insert_document_changes( - conn: &DatabaseConnection, + tx: &mut DatabaseTransaction, last_inserted_date: &Option, pub_document_versions: Vec<&SimpleTerm<'_>>, pub_graph: &StelaeGraph, publication: &Publication, ) -> anyhow::Result<()> { + let mut document_elements_bulk: Vec = vec![]; let mut document_changes_bulk: Vec = vec![]; for version in pub_document_versions { let codified_date = @@ -321,13 +338,19 @@ async fn insert_document_changes( continue; } } - create_version(conn, &codified_date).await?; - create_publication_version(conn, &publication.name, &codified_date, &publication.stele) - .await?; + version::TxManager::create(tx, &codified_date).await?; + let pub_version_hash = + md5::compute(publication.name.clone() + &codified_date + &publication.stele); + publication_version::TxManager::create( + tx, + &pub_version_hash, + &publication.id, + &codified_date, + ) + .await?; let doc_id = pub_graph.literal_from_triple_matching(Some(version), Some(oll::docId), None)?; - create_document(conn, &doc_id).await?; - + document::TxManager::create(tx, &doc_id).await?; let changes_uri = pub_graph.iri_from_triple_matching(Some(version), Some(oll::hasChanges), None)?; let changes = Bag::new(pub_graph, changes_uri); @@ -339,6 +362,11 @@ async fn insert_document_changes( )?; let url = pub_graph.literal_from_triple_matching(Some(&change), Some(oll::url), None)?; + document_elements_bulk.push(DocumentElement::new( + doc_mpath.clone(), + url.clone(), + doc_id.clone(), + )); let reason = pub_graph .literal_from_triple_matching(Some(&change), Some(oll::reason), None) .ok(); @@ -347,27 +375,29 @@ async fn insert_document_changes( Some(oll::status), None, )?; - for status in statuses { - document_changes_bulk.push(DocumentChange { - doc_mpath: doc_mpath.clone(), - status: status.clone(), - url: url.clone(), - change_reason: reason.clone(), - publication: publication.name.clone(), - version: codified_date.clone(), - stele: publication.stele.clone(), - doc_id: doc_id.clone(), - }); + for el_status in statuses { + let status = Status::from_string(&el_status)?; + let document_change_hash = md5::compute( + pub_version_hash.clone() + &doc_mpath.clone() + &status.to_int().to_string(), + ); + document_changes_bulk.push(DocumentChange::new( + document_change_hash, + status.to_int(), + reason.clone(), + pub_version_hash.clone(), + doc_mpath.clone(), + )); } } } - insert_document_changes_bulk(conn, document_changes_bulk).await?; + document_element::TxManager::insert_bulk(tx, document_elements_bulk).await?; + document_change::TxManager::insert_bulk(tx, document_changes_bulk).await?; Ok(()) } /// Insert library changes into the database async fn insert_library_changes( - conn: &DatabaseConnection, + tx: &mut DatabaseTransaction, last_inserted_date: &Option, pub_collection_versions: Vec<&SimpleTerm<'_>>, pub_graph: &StelaeGraph, @@ -392,24 +422,22 @@ async fn insert_library_changes( None, )?; let url = pub_graph.literal_from_triple_matching(Some(version), Some(oll::url), None)?; - let status = + let el_status = pub_graph.literal_from_triple_matching(Some(version), Some(oll::status), None)?; - library_bulk.push(Library { - mpath: library_mpath.clone(), - }); - library_changes_bulk.push(LibraryChange { - library_mpath: library_mpath.clone(), - publication: publication.name.clone(), - version: codified_date.clone(), - stele: publication.stele.clone(), - status: status.clone(), - url: url.clone(), - }); + let library_status = Status::from_string(&el_status)?; + library_bulk.push(Library::new(library_mpath.clone(), url.clone())); + let pub_version_hash = + md5::compute(publication.name.clone() + &codified_date + &publication.stele); + library_changes_bulk.push(LibraryChange::new( + pub_version_hash.clone(), + library_status.to_int(), + library_mpath.clone(), + )); let changes_uri = pub_graph.iri_from_triple_matching(Some(version), Some(oll::hasChanges), None)?; let changes = Bag::new(pub_graph, changes_uri); for change in changes.items()? { - let Ok(el_status) = + let Ok(found_status) = pub_graph.literal_from_triple_matching(Some(&change), Some(oll::status), None) else { continue; @@ -421,20 +449,19 @@ async fn insert_library_changes( ) else { continue; }; - changed_library_document_bulk.push(ChangedLibraryDocument { - publication: publication.name.clone(), - version: codified_date.clone(), - stele: publication.stele.clone(), - doc_mpath: doc_mpath.clone(), - status: el_status.clone(), - library_mpath: library_mpath.clone(), - url: url.clone(), - }); + let status = Status::from_string(&found_status)?; + let document_change_hash = md5::compute( + pub_version_hash.clone() + &doc_mpath.clone() + &status.to_int().to_string(), + ); + changed_library_document_bulk.push(ChangedLibraryDocument::new( + document_change_hash, + library_mpath.clone(), + )); } } - insert_library_bulk(conn, library_bulk).await?; - insert_library_changes_bulk(conn, library_changes_bulk).await?; - insert_changed_library_document_bulk(conn, changed_library_document_bulk).await?; + library::TxManager::insert_bulk(tx, library_bulk).await?; + library_change::TxManager::insert_bulk(tx, library_changes_bulk).await?; + changed_library_document::TxManager::insert_bulk(tx, changed_library_document_bulk).await?; Ok(()) } @@ -442,39 +469,37 @@ async fn insert_library_changes( /// Support for lightweight publications. /// Populate the many-to-many mapping between change objects and publications async fn insert_shared_publication_versions_for_publication( - conn: &DatabaseConnection, + tx: &mut DatabaseTransaction, publication: &Publication, ) -> anyhow::Result<()> { let mut publication_has_publication_versions_bulk: Vec = vec![]; - let mut publication_versions = find_publication_versions_for_publication( - conn, - publication.name.clone(), - publication.stele.clone(), - ) - .await?; - if let (Some(last_valid_pub_name), Some(_)) = ( - publication.last_valid_publication_name.as_ref(), - publication.last_valid_version.as_ref(), - ) { - let publication_versions_last_valid = find_publication_versions_for_publication( - conn, - last_valid_pub_name.clone(), - publication.stele.clone(), + let mut publication_version_ids = + publication_version::TxManager::find_all_recursive_for_publication( + tx, + publication.id.clone(), ) .await?; - publication_versions.extend(publication_versions_last_valid); + if let (Some(last_valid_pub_id), Some(_)) = ( + publication.last_valid_publication_id.as_ref(), + publication.last_valid_version.as_ref(), + ) { + let publication_version_ids_last_valid = + publication_version::TxManager::find_all_recursive_for_publication( + tx, + last_valid_pub_id.clone(), + ) + .await?; + publication_version_ids.extend(publication_version_ids_last_valid); } - publication_has_publication_versions_bulk.extend(publication_versions.iter().map(|pv| { + publication_has_publication_versions_bulk.extend(publication_version_ids.iter().map(|pv| { PublicationHasPublicationVersions { - publication: publication.name.clone(), - referenced_publication: pv.publication.clone(), - referenced_version: pv.version.clone(), - stele: publication.stele.clone(), + publication_id: publication.id.clone(), + publication_version_id: pv.id.clone(), } })); - insert_publication_has_publication_versions_bulk( - conn, + publication_has_publication_versions::TxManager::insert_bulk( + tx, publication_has_publication_versions_bulk, ) .await?; @@ -499,19 +524,20 @@ fn referenced_publication_information(pub_graph: &StelaeGraph) -> (Option anyhow::Result<()> { - let duplicate_publications = find_all_publications_by_date_and_stele_order_by_name_desc( - conn, - publication.date, - publication.stele, - ) - .await?; + let duplicate_publications = + publication::TxManager::find_all_by_date_and_stele_order_by_name_desc( + tx, + publication.date, + publication.stele, + ) + .await?; if let Some(duplicate_publications_slice) = duplicate_publications.get(1..) { for duplicate_pub in duplicate_publications_slice { - update_publication_by_name_and_stele_set_revoked_true( - conn, + publication::TxManager::update_by_name_and_stele_set_revoked_true( + tx, &duplicate_pub.name, &duplicate_pub.stele, ) diff --git a/src/main.rs b/src/main.rs index eda87c5..4ff20b5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,6 @@ use stelae::utils::cli::run; -fn main() -> std::io::Result<()> { +fn main() { run() } diff --git a/src/server/api/versions/mod.rs b/src/server/api/versions/mod.rs index d67c487..05e4c15 100644 --- a/src/server/api/versions/mod.rs +++ b/src/server/api/versions/mod.rs @@ -7,7 +7,7 @@ use std::convert::Into; use crate::{ db::{ models::{ - document_change, library_change, + document_change, document_element, library, library_change, publication::{self, Publication}, }, DatabaseConnection, @@ -130,6 +130,7 @@ pub async fn versions( publications.insert( 0, Publication::new( + current_publication.id.clone(), CURRENT_PUBLICATION_NAME.to_owned(), current_publication.date.clone(), current_publication.stele.clone(), @@ -156,25 +157,25 @@ async fn publication_versions( ) -> Vec { tracing::debug!("Fetching publication versions for '{url}'"); let mut versions = vec![]; - let doc_mpath = document_change::Manager::find_doc_mpath_by_url(db, &url).await; + let doc_mpath = document_element::Manager::find_doc_mpath_by_url(db, &url).await; if let Ok(mpath) = doc_mpath { let doc_versions = document_change::Manager::find_all_document_versions_by_mpath_and_publication( db, &mpath, - &publication.name, + &publication.id, ) .await .unwrap_or_default(); versions = doc_versions.into_iter().map(Into::into).collect(); } else { - let lib_mpath = library_change::Manager::find_lib_mpath_by_url(db, &url).await; + let lib_mpath = library::Manager::find_lib_mpath_by_url(db, &url).await; if let Ok(mpath) = lib_mpath { let coll_versions = library_change::Manager::find_all_collection_versions_by_mpath_and_publication( db, &mpath, - &publication.name, + &publication.id, ) .await .unwrap_or_default(); diff --git a/src/server/app.rs b/src/server/app.rs index d2f92df..cbbbfbc 100644 --- a/src/server/app.rs +++ b/src/server/app.rs @@ -7,12 +7,13 @@ )] use crate::db; use crate::server::api::state::App as AppState; +use crate::server::errors::CliError; use crate::stelae::archive::Archive; use actix_web::dev::{ServiceRequest, ServiceResponse}; use actix_web::{App, Error, HttpServer}; use tracing_actix_web::TracingLogger; -use std::{io, path::PathBuf, process}; +use std::{path::PathBuf, process}; use actix_http::body::MessageBody; use actix_service::ServiceFactory; @@ -29,7 +30,7 @@ pub async fn serve_archive( archive_path: PathBuf, port: u16, individual: bool, -) -> io::Result<()> { +) -> Result<(), CliError> { let bind = "127.0.0.1"; let message = "Running Publish Server on a Stelae archive at"; tracing::info!("{message} '{raw_archive_path}' on http://{bind}:{port}.",); @@ -41,28 +42,40 @@ pub async fn serve_archive( "error: could not connect to database. Confirm that DATABASE_URL env var is set correctly." ); tracing::error!("Error: {:?}", err); - process::exit(1); + return Err(CliError::DatabaseConnectionError); } }; - let archive = Archive::parse(archive_path, &PathBuf::from(raw_archive_path), individual) - .unwrap_or_else(|err| { + let archive = match Archive::parse(archive_path, &PathBuf::from(raw_archive_path), individual) { + Ok(archive) => archive, + Err(err) => { tracing::error!("Unable to parse archive at '{raw_archive_path}'."); - tracing::error!("Error: {:?}", err); - process::exit(1); - }); + tracing::error!("Error: {err:?}"); + return Err(CliError::ArchiveParseError); + } + }; + let state = AppState { archive, db }; HttpServer::new(move || { init_app(&state).unwrap_or_else(|err| { tracing::error!("Unable to initialize app."); - tracing::error!("Error: {:?}", err); - process::exit(1); + tracing::error!("Error: {err:?}"); + // NOTE: We should not need to exit code 1 here (or in any of the closures in `routes.rs`). + // We should be able to return an error and let the caller handle it. + // However, Actix does not allow us to instantiate the app outside of the closure, + // because the opaque type `App` does not implement `Clone`. + // Figure out a way to handle this without exiting the process. + process::exit(1) }) }) .bind((bind, port))? .run() .await + .map_err(|err| { + tracing::error!("Error running server: {err:?}"); + CliError::GenericError + }) } /// Initialize the application and all possible routing at start-up time. diff --git a/src/server/errors.rs b/src/server/errors.rs index a733dde..8c20ed0 100644 --- a/src/server/errors.rs +++ b/src/server/errors.rs @@ -8,8 +8,29 @@ use actix_web::{error, http::StatusCode, HttpResponse}; use derive_more::{Display, Error}; +use std::io; -/// Collection of possible Stelae errors +/// Collection of possible CLI errors +#[derive(Debug, Display, Error)] +pub enum CliError { + /// Database connection error + #[display(fmt = "Failed to connect to the database")] + DatabaseConnectionError, + /// A generic fallback error + #[display(fmt = "A CLI error occurred")] + GenericError, + /// Errors during archive parsing + #[display(fmt = "Failed to parse the archive ")] + ArchiveParseError, +} + +impl From for CliError { + fn from(_error: io::Error) -> Self { + CliError::GenericError + } +} + +/// Collection of possible stelae web-facing errors #[derive(Debug, Display, Error)] pub enum StelaeError { /// Errors generated by the Git server diff --git a/src/server/git.rs b/src/server/git.rs index b08f699..4254053 100644 --- a/src/server/git.rs +++ b/src/server/git.rs @@ -11,13 +11,10 @@ use actix_web::{get, web, App, HttpResponse, HttpServer, Responder}; use git2::{self, ErrorCode}; -use std::{ - io, - path::{Path, PathBuf}, -}; +use std::path::{Path, PathBuf}; use tracing_actix_web::TracingLogger; -use super::errors::StelaeError; +use super::errors::{CliError, StelaeError}; use crate::utils::git::{Repo, GIT_REQUEST_NOT_FOUND}; use crate::utils::http::get_contenttype; use crate::{server::tracing::StelaeRootSpanBuilder, utils::paths::clean_path}; @@ -103,7 +100,11 @@ fn blob_error_response(error: &anyhow::Error, namespace: &str, name: &str) -> Ht /// Serve git repositories in the Stelae archive. #[actix_web::main] // or #[tokio::main] -pub async fn serve_git(raw_archive_path: &str, archive_path: PathBuf, port: u16) -> io::Result<()> { +pub async fn serve_git( + raw_archive_path: &str, + archive_path: PathBuf, + port: u16, +) -> Result<(), CliError> { let bind = "127.0.0.1"; let message = "Serving content from the Stelae archive at"; tracing::info!("{message} '{raw_archive_path}' on http://{bind}:{port}.",); @@ -121,4 +122,8 @@ pub async fn serve_git(raw_archive_path: &str, archive_path: PathBuf, port: u16) .bind((bind, port))? .run() .await + .map_err(|err| { + tracing::error!("Error running Git server: {err:?}"); + CliError::GenericError + }) } diff --git a/src/utils/cli.rs b/src/utils/cli.rs index 00d0a5f..46dc153 100644 --- a/src/utils/cli.rs +++ b/src/utils/cli.rs @@ -5,11 +5,11 @@ use crate::history::changes; use crate::server::app::serve_archive; +use crate::server::errors::CliError; use crate::server::git::serve_git; use crate::utils::archive::find_archive_path; use clap::Parser; use std::env; -use std::io; use std::path::Path; use std::path::PathBuf; use std::process; @@ -53,13 +53,14 @@ enum Subcommands { /// Serve an individual stele instead of the Stele specified in config.toml. individual: bool, }, - /// Insert historical information about the Steles in the archive. - /// Populates the database with change objects loaded in from RDF repository - /// By default inserts historical information for the root Stele (and all referenced stele) in the archive - InsertHistory { - /// Optionally insert historical information for this Stele only. - stele: Option, - }, + /// Update the archive + /// + /// NOTE: Once TAF is embedded with stelae, this command will be used to update the repositories within the archive. + /// Currently inserts historical information about the Steles in the archive. + /// + /// - Populates the database with change objects loaded in from RDF repository + /// - By default inserts historical information for the root and all referenced stele in the archive + Update, } /// Place to initialize tracing @@ -102,11 +103,24 @@ fn init_tracing(archive_path: &Path) { } } -/// Main entrypoint to application +/// Central place to execute commands /// /// # Errors -/// TODO: This function should not return errors -pub fn run() -> io::Result<()> { +/// This function returns the generic `CliError`, based on which we exit with a known exit code. +fn execute_command(cli: &Cli, archive_path: PathBuf) -> Result<(), CliError> { + match cli.subcommands { + Subcommands::Git { port } => serve_git(&cli.archive_path, archive_path, port), + Subcommands::Serve { port, individual } => { + serve_archive(&cli.archive_path, archive_path, port, individual) + } + Subcommands::Update => changes::insert(&cli.archive_path, archive_path), + } +} + +/// Main entrypoint to application +/// +/// Exits with 1 if we encounter an error +pub fn run() { tracing::debug!("Starting application"); let cli = Cli::parse(); let archive_path_wd = Path::new(&cli.archive_path); @@ -120,13 +134,12 @@ pub fn run() -> io::Result<()> { init_tracing(&archive_path); - match cli.subcommands { - Subcommands::Git { port } => serve_git(&cli.archive_path, archive_path, port), - Subcommands::Serve { port, individual } => { - serve_archive(&cli.archive_path, archive_path, port, individual) - } - Subcommands::InsertHistory { stele } => { - changes::insert(&cli.archive_path, archive_path, stele) + match execute_command(&cli, archive_path) { + Ok(()) => process::exit(0), + Err(err) => { + // Exit with 1 if we encounter an error + tracing::error!("Application error: {err:?}"); + process::exit(1); } } } diff --git a/src/utils/md5.rs b/src/utils/md5.rs new file mode 100644 index 0000000..5221183 --- /dev/null +++ b/src/utils/md5.rs @@ -0,0 +1,39 @@ +//! Utility functions for `md5` hash computation. +use md5::{Digest, Md5}; + +/// Compute the `md5` hash of a string. +/// +/// The result is a hexadecimal string of 32 characters. +#[must_use] +pub fn compute(data: String) -> String { + let mut hasher = Md5::new(); + hasher.update(data); + let result = hasher.finalize(); + format!("{result:x}") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_compute() { + let data = "hello world".to_string(); + let result = compute(data); + assert_eq!(result, "5eb63bbbe01eeed093cb22bb8f5acdc3"); + } + + #[test] + fn test_compute_empty() { + let data = "".to_string(); + let result = compute(data); + assert_eq!(result, "d41d8cd98f00b204e9800998ecf8427e"); + } + + #[test] + fn test_compute_unicode() { + let data = "😋".to_string(); + let result = compute(data); + assert_eq!(result, "a0a836f06f8bd1b45d2f70db1e334b5d"); + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index c08d434..125fa30 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -4,4 +4,5 @@ pub mod archive; pub mod cli; pub mod git; pub mod http; +pub mod md5; pub mod paths;