Skip to content

Commit

Permalink
fix: store documents when ingesting a dataset
Browse files Browse the repository at this point in the history
Ingesting a dataset did not store the source document, of the ingested
documents, in storage backend. This change fixes this.
  • Loading branch information
ctron committed Sep 16, 2024
1 parent 3cc7b55 commit 600dd36
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions modules/fundamental/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ urlencoding = { workspace = true }
criterion = { workspace = true, features = ["html_reports", "async_tokio"] }
csaf = { workspace = true }
packageurl = { workspace = true }
walkdir = { workspace = true }
zip = { workspace = true }

[[bench]]
Expand Down
12 changes: 9 additions & 3 deletions modules/fundamental/src/advisory/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,23 @@ pub async fn download(
return Ok(HttpResponse::NotFound().finish());
};

log::debug!("Found document - hashes: {:?}", advisory.head.hashes);

let stream = ingestor
.get_ref()
.storage()
.clone()
.retrieve(advisory.head.hashes.try_into()?)
.await
.map_err(Error::Storage)?
.map(|stream| stream.map_err(Error::Storage));

Ok(match stream {
Some(s) => HttpResponse::Ok().streaming(s),
None => HttpResponse::NotFound().finish(),
None => {
tracing::warn!(
uuid = ?advisory.head.uuid,
"Found the document, but not its content"
);
HttpResponse::NotFound().finish()
}
})
}
88 changes: 88 additions & 0 deletions modules/fundamental/tests/dataset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#![allow(clippy::unwrap_used)]

use bytes::BytesMut;
use futures_util::StreamExt;
use std::{
io::{Cursor, Write},
time::Instant,
};
use test_context::test_context;
use test_log::test;
use tracing::instrument;
use trustify_common::id::Id;
use trustify_module_fundamental::sbom::service::SbomService;
use trustify_module_storage::service::StorageBackend;
use trustify_test_context::TrustifyContext;
use zip::write::FileOptions;

/// Test ingesting a dataset.
#[test_context(TrustifyContext, skip_teardown)]
#[test(tokio::test)]
#[instrument]
async fn ingest(ctx: TrustifyContext) -> anyhow::Result<()> {
let service = SbomService::new(ctx.db.clone());
let storage = &ctx.storage;

let start = Instant::now();

// create dataset ad-hoc

let base = ctx.absolute_path("../datasets/ds3")?;
let mut data = vec![];
let mut dataset = zip::write::ZipWriter::new(Cursor::new(&mut data));
for entry in walkdir::WalkDir::new(&base) {
let entry = entry?;
let Ok(path) = entry.path().strip_prefix(&base) else {
continue;
};

if entry.file_type().is_file() {
dataset.start_file_from_path(path, FileOptions::<()>::default())?;
dataset.write_all(&(std::fs::read(entry.path())?))?;
} else if entry.file_type().is_dir() {
dataset.add_directory_from_path(path, FileOptions::<()>::default())?;
}
}
dataset.finish()?;

// ingest

let result = ctx.ingestor.ingest_dataset(&data, ()).await?;

let ingest_time = start.elapsed();

// check ingest results

log::info!("ingest: {}", humantime::Duration::from(ingest_time));

assert!(result.warnings.is_empty());
assert_eq!(result.files.len(), 64);

// get a document

let sbom = &result.files["spdx/quarkus-bom-2.13.8.Final-redhat-00004.json.bz2"];
assert!(matches!(sbom.id, Id::Uuid(_)));

let sbom_summary = service.fetch_sbom_summary(sbom.id.clone(), ()).await?;
assert!(sbom_summary.is_some());
let sbom_summary = sbom_summary.unwrap();
assert_eq!(sbom_summary.head.name, "quarkus-bom");

// test source document

let stream = storage
.retrieve(sbom_summary.head.hashes.try_into()?)
.await?;
assert!(stream.is_some());
let mut stream = stream.unwrap();
let mut content = BytesMut::new();
while let Some(data) = stream.next().await {
content.extend(&data?);
}

assert_eq!(content.len(), 1174356);

// done

Ok(())
}
13 changes: 11 additions & 2 deletions modules/ingestor/src/service/dataset/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
model::IngestResult,
service::{Error, Format, Warnings},
};
use anyhow::anyhow;
use bytes::Bytes;
use sbom_walker::common::compression::decompress;
use std::{
Expand All @@ -13,17 +14,20 @@ use std::{
str::FromStr,
};
use tokio::runtime::Handle;
use tokio_util::io::ReaderStream;
use tracing::instrument;
use trustify_common::hashing::Digests;
use trustify_entity::labels::Labels;
use trustify_module_storage::{service::dispatch::DispatchBackend, service::StorageBackend};

pub struct DatasetLoader<'g> {
graph: &'g Graph,
storage: &'g DispatchBackend,
}

impl<'g> DatasetLoader<'g> {
pub fn new(graph: &'g Graph) -> Self {
Self { graph }
pub fn new(graph: &'g Graph, storage: &'g DispatchBackend) -> Self {
Self { graph, storage }
}

#[instrument(skip(self, buffer), ret)]
Expand Down Expand Up @@ -74,6 +78,11 @@ impl<'g> DatasetLoader<'g> {

let labels = labels.clone().add("datasetFile", &full_name);

self.storage
.store(ReaderStream::new(&*data))
.await
.map_err(|err| Error::Storage(anyhow!("{err}")))?;

// We need to box it, to work around async recursion limits
let result = Box::pin({
async move {
Expand Down
2 changes: 1 addition & 1 deletion modules/ingestor/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl IngestorService {
bytes: &[u8],
labels: impl Into<Labels> + Debug,
) -> Result<DatasetIngestResult, Error> {
let loader = DatasetLoader::new(self.graph());
let loader = DatasetLoader::new(self.graph(), self.storage());
loader.load(labels.into(), bytes).await
}
}
Expand Down
2 changes: 2 additions & 0 deletions modules/storage/src/service/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ impl StorageBackend for FileSystemBackend {
create_dir_all(&target).await?;
let target = target.join(hash);

log::debug!("Opening file: {}", target.display());

let file = match File::open(&target).await {
Ok(file) => Some(file),
Err(err) if err.kind() == ErrorKind::NotFound => None,
Expand Down
8 changes: 6 additions & 2 deletions test-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use peak_alloc::PeakAlloc;
use postgresql_embedded::PostgreSQL;
use std::env;
use std::io::{Read, Seek};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use test_context::AsyncTestContext;
use tokio_util::bytes::Bytes;
use tokio_util::io::{ReaderStream, SyncIoBridge};
Expand Down Expand Up @@ -90,6 +90,10 @@ impl TrustifyContext {
.ingest(&bytes, Format::Unknown, ("source", "TrustifyContext"), None)
.await?)
}

pub fn absolute_path(&self, path: impl AsRef<Path>) -> anyhow::Result<PathBuf> {
absolute(path)
}
}

impl AsyncTestContext for TrustifyContext {
Expand Down Expand Up @@ -128,7 +132,7 @@ impl AsyncTestContext for TrustifyContext {
}
}

fn absolute(path: &str) -> Result<PathBuf, anyhow::Error> {
fn absolute(path: impl AsRef<Path>) -> Result<PathBuf, anyhow::Error> {
let workspace_root: PathBuf = env!("CARGO_WORKSPACE_ROOT").into();
let test_data = workspace_root.join("etc/test-data");
Ok(test_data.join(path))
Expand Down

0 comments on commit 600dd36

Please sign in to comment.