Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rdf): fixes to insert history command #46

Merged
merged 6 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "stelae"
description = "A collection of tools in Rust and Python for preserving, authenticating, and accessing laws in perpetuity."
version = "0.3.0-alpha.4"
version = "0.3.0-alpha.5"
edition = "2021"
readme = "README.md"
license = "AGPL-3.0"
Expand Down
10 changes: 10 additions & 0 deletions migrations/sqlite/20240115152953_initial_db.down.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,17 @@
PRAGMA foreign_keys = OFF;

DROP INDEX IF EXISTS changed_library_document_library_mpath_idx;
DROP INDEX IF EXISTS library_change_status_idx;
DROP INDEX IF EXISTS library_change_publication_version_idx;
DROP INDEX IF EXISTS library_change_library_mpath_idx;

DROP INDEX IF EXISTS publication_version_id_idx;

DROP INDEX IF EXISTS publication_has_publication_versions_publication_version_id_idx;
DROP INDEX IF EXISTS publication_has_publication_versions_publication_id_idx;

DROP INDEX IF EXISTS document_change_doc_mpath_pub_id_status_idx;
DROP INDEX IF EXISTS document_change_publication_version_idx;
DROP INDEX IF EXISTS document_change_doc_mpath_idx;

DROP TABLE IF EXISTS changed_library_document;
Expand Down
20 changes: 18 additions & 2 deletions migrations/sqlite/20240115152953_initial_db.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ CREATE TABLE document_element (
doc_mpath TEXT,
url TEXT,
doc_id TEXT,
stele TEXT,
CONSTRAINT fk_doc_id
FOREIGN KEY (doc_id)
REFERENCES document(doc_id),
PRIMARY KEY (doc_mpath)
PRIMARY KEY (doc_mpath),
CONSTRAINT fk_stele FOREIGN KEY (stele) REFERENCES stele(name) ON DELETE CASCADE
);
CREATE TABLE library (
mpath TEXT PRIMARY KEY,
url TEXT
url TEXT,
stele TEXT,
CONSTRAINT fk_stele FOREIGN KEY (stele) REFERENCES stele(name) ON DELETE CASCADE
);
CREATE TABLE publication (
id TEXT,
Expand Down Expand Up @@ -80,7 +84,16 @@ CREATE TABLE document_change (
ON DELETE CASCADE,
PRIMARY KEY (id)
);

CREATE INDEX document_change_doc_mpath_idx ON document_change(doc_mpath COLLATE NOCASE);
CREATE INDEX document_change_publication_version_idx ON document_change(publication_version_id);
CREATE INDEX document_change_doc_mpath_pub_id_status_idx ON document_change(doc_mpath COLLATE NOCASE, status);

CREATE INDEX publication_has_publication_versions_publication_id_idx ON publication_has_publication_versions(publication_id);
CREATE INDEX publication_has_publication_versions_publication_version_id_idx ON publication_has_publication_versions(publication_version_id);

CREATE INDEX publication_version_id_idx ON publication_version(id);

CREATE TABLE library_change (
publication_version_id TEXT,
status TEXT,
Expand All @@ -101,6 +114,9 @@ CREATE TABLE changed_library_document (
PRIMARY KEY (document_change_id, library_mpath)
);
CREATE INDEX library_change_library_mpath_idx ON library_change(library_mpath COLLATE NOCASE);
CREATE INDEX library_change_publication_version_idx ON library_change(publication_version_id);
CREATE INDEX library_change_status_idx ON library_change(library_mpath COLLATE NOCASE, status);

CREATE INDEX changed_library_document_library_mpath_idx ON changed_library_document(library_mpath COLLATE NOCASE);

PRAGMA optimize;
1 change: 1 addition & 0 deletions src/db/models/changed_library_document/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ impl super::TxManager for DatabaseTransaction {
});
let query = query_builder.build();
query.execute(&mut *self.tx).await?;
query_builder.reset();
}
Ok(())
}
Expand Down
13 changes: 8 additions & 5 deletions src/db/models/document_element/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@ impl super::Manager for DatabaseConnection {
///
/// # Errors
/// Errors if can't establish a connection to the database.
async fn find_doc_mpath_by_url(&self, url: &str) -> anyhow::Result<String> {
async fn find_doc_mpath_by_url(&self, url: &str, stele: &str) -> anyhow::Result<String> {
let statement = "
SELECT de.doc_mpath
FROM document_element de
WHERE de.url = $1
WHERE de.url = $1 AND de.stele = $2
LIMIT 1
";
let row = match self.kind {
DatabaseKind::Postgres | DatabaseKind::Sqlite => {
let mut connection = self.pool.acquire().await?;
sqlx::query_as::<_, (String,)>(statement)
.bind(url)
.bind(stele)
.fetch_one(&mut *connection)
.await?
}
Expand All @@ -39,14 +40,16 @@ impl super::TxManager for DatabaseTransaction {
/// # Errors
/// Errors if the document elements cannot be inserted into the database.
async fn insert_bulk(&mut self, document_elements: Vec<DocumentElement>) -> anyhow::Result<()> {
let mut query_builder =
QueryBuilder::new("INSERT OR IGNORE INTO document_element ( doc_mpath, url, doc_id ) ");
let mut query_builder = QueryBuilder::new(
"INSERT OR IGNORE INTO document_element ( doc_mpath, url, doc_id, stele ) ",
);
for chunk in document_elements.chunks(BATCH_SIZE) {
query_builder.push_values(chunk, |mut bindings, de| {
bindings
.push_bind(&de.doc_mpath)
.push_bind(&de.url)
.push_bind(&de.doc_id);
.push_bind(&de.doc_id)
.push_bind(&de.stele);
});
let query = query_builder.build();
query.execute(&mut *self.tx).await?;
Expand Down
7 changes: 5 additions & 2 deletions src/db/models/document_element/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod manager;
#[async_trait]
pub trait Manager {
/// Find one document materialized path by url.
async fn find_doc_mpath_by_url(&self, url: &str) -> anyhow::Result<String>;
async fn find_doc_mpath_by_url(&self, url: &str, stele: &str) -> anyhow::Result<String>;
}

/// Trait for managing transactional document elements.
Expand All @@ -26,16 +26,19 @@ pub struct DocumentElement {
pub url: String,
/// Unique document identifier.
pub doc_id: String,
/// Reference to the stele.
pub stele: String,
}

impl DocumentElement {
/// Create a new document element.
#[must_use]
pub const fn new(doc_mpath: String, url: String, doc_id: String) -> Self {
pub const fn new(doc_mpath: String, url: String, doc_id: String, stele: String) -> Self {
Self {
doc_mpath,
url,
doc_id,
stele,
}
}
}
14 changes: 10 additions & 4 deletions src/db/models/library/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ impl super::Manager for DatabaseConnection {
///
/// # Errors
/// Errors if can't establish a connection to the database.
async fn find_lib_mpath_by_url(&self, url: &str) -> anyhow::Result<String> {
async fn find_lib_mpath_by_url(&self, url: &str, stele: &str) -> anyhow::Result<String> {
let statement = "
SELECT l.mpath
FROM library l
WHERE l.url = $1
WHERE l.url = $1 AND l.stele = $2
LIMIT 1
";
let row = match self.kind {
DatabaseKind::Postgres | DatabaseKind::Sqlite => {
let mut connection = self.pool.acquire().await?;
sqlx::query_as::<_, (String,)>(statement)
.bind(url)
.bind(stele)
.fetch_one(&mut *connection)
.await?
}
Expand All @@ -37,13 +38,18 @@ impl super::TxManager for DatabaseTransaction {
/// # Errors
/// Errors if the libraries cannot be inserted into the database.
async fn insert_bulk(&mut self, libraries: Vec<Library>) -> anyhow::Result<()> {
let mut query_builder = QueryBuilder::new("INSERT OR IGNORE INTO library ( mpath, url ) ");
let mut query_builder =
QueryBuilder::new("INSERT OR IGNORE INTO library ( mpath, url, stele ) ");
for chunk in libraries.chunks(BATCH_SIZE) {
query_builder.push_values(chunk, |mut bindings, lb| {
bindings.push_bind(&lb.mpath).push_bind(&lb.url);
bindings
.push_bind(&lb.mpath)
.push_bind(&lb.url)
.push_bind(&lb.stele);
});
let query = query_builder.build();
query.execute(&mut *self.tx).await?;
query_builder.reset();
}
Ok(())
}
Expand Down
8 changes: 5 additions & 3 deletions src/db/models/library/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod manager;
#[async_trait]
pub trait Manager {
/// Find one library materialized path by url.
async fn find_lib_mpath_by_url(&self, url: &str) -> anyhow::Result<String>;
async fn find_lib_mpath_by_url(&self, url: &str, stele: &str) -> anyhow::Result<String>;
}

/// Trait for managing transactions on publication versions.
Expand All @@ -24,12 +24,14 @@ pub struct Library {
pub mpath: String,
/// Url to the collection.
pub url: String,
/// Reference to the stele.
pub stele: String,
}

impl Library {
/// Create a new library.
#[must_use]
pub const fn new(mpath: String, url: String) -> Self {
Self { mpath, url }
pub const fn new(mpath: String, url: String, stele: String) -> Self {
Self { mpath, url, stele }
}
}
1 change: 1 addition & 0 deletions src/db/models/library_change/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl super::TxManager for DatabaseTransaction {
});
let query = query_builder.build();
query.execute(&mut *self.tx).await?;
query_builder.reset();
}
Ok(())
}
Expand Down
14 changes: 11 additions & 3 deletions src/history/changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,11 @@ async fn insert_document_changes(
let doc_id =
pub_graph.literal_from_triple_matching(Some(version), Some(oll::docId), None)?;
document::TxManager::create(tx, &doc_id).await?;
let changes_uri =
pub_graph.iri_from_triple_matching(Some(version), Some(oll::hasChanges), None)?;
let Ok(changes_uri) =
pub_graph.iri_from_triple_matching(Some(version), Some(oll::hasChanges), None)
else {
continue;
};
let changes = Bag::new(pub_graph, changes_uri);
for change in changes.items()? {
let doc_mpath = pub_graph.literal_from_triple_matching(
Expand All @@ -366,6 +369,7 @@ async fn insert_document_changes(
doc_mpath.clone(),
url.clone(),
doc_id.clone(),
publication.stele.clone(),
));
let reason = pub_graph
.literal_from_triple_matching(Some(&change), Some(oll::reason), None)
Expand Down Expand Up @@ -425,7 +429,11 @@ async fn insert_library_changes(
let el_status =
pub_graph.literal_from_triple_matching(Some(version), Some(oll::status), None)?;
let library_status = Status::from_string(&el_status)?;
library_bulk.push(Library::new(library_mpath.clone(), url.clone()));
library_bulk.push(Library::new(
library_mpath.clone(),
url.clone(),
publication.stele.clone(),
));
let pub_version_hash =
md5::compute(publication.name.clone() + &codified_date + &publication.stele);
library_changes_bulk.push(LibraryChange::new(
Expand Down
4 changes: 3 additions & 1 deletion src/history/rdf/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ impl StelaeGraph {
let triple = self
.triples_matching_inner(subject, predicate, object)
.next()
.context("Expected to find triple matching")?;
.context(format!(
"Expected to find triple matching s={subject:?}, p={predicate:?}, o={object:?}"
))?;
Ok(triple?)
}

Expand Down
5 changes: 3 additions & 2 deletions src/server/api/versions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ async fn publication_versions(
) -> Vec<response::Version> {
tracing::debug!("Fetching publication versions for '{url}'");
let mut versions = vec![];
let doc_mpath = document_element::Manager::find_doc_mpath_by_url(db, &url).await;
let doc_mpath =
document_element::Manager::find_doc_mpath_by_url(db, &url, &publication.stele).await;
if let Ok(mpath) = doc_mpath {
let doc_versions =
document_change::Manager::find_all_document_versions_by_mpath_and_publication(
Expand All @@ -169,7 +170,7 @@ async fn publication_versions(
.unwrap_or_default();
versions = doc_versions.into_iter().map(Into::into).collect();
} else {
let lib_mpath = library::Manager::find_lib_mpath_by_url(db, &url).await;
let lib_mpath = library::Manager::find_lib_mpath_by_url(db, &url, &publication.stele).await;
if let Ok(mpath) = lib_mpath {
let coll_versions =
library_change::Manager::find_all_collection_versions_by_mpath_and_publication(
Expand Down
Loading