diff --git a/crates/api/src/v1/package.rs b/crates/api/src/v1/package.rs index e8ac9506..eae61a2a 100644 --- a/crates/api/src/v1/package.rs +++ b/crates/api/src/v1/package.rs @@ -6,8 +6,8 @@ use std::{borrow::Cow, collections::HashMap}; use thiserror::Error; use warg_crypto::hash::AnyHash; use warg_protocol::{ - registry::{LogId, MapCheckpoint, PackageId, RecordId}, - ProtoEnvelopeBody, SerdeEnvelope, + registry::{LogId, PackageId, RecordId}, + ProtoEnvelopeBody, }; /// Represents the supported kinds of content sources. @@ -45,16 +45,6 @@ pub struct PackageRecord { } impl PackageRecord { - /// Gets the checkpoint of the record. - /// - /// Returns `None` if the record hasn't been published yet. - pub fn checkpoint(&self) -> Option<&SerdeEnvelope> { - match &self.state { - PackageRecordState::Published { checkpoint, .. } => Some(checkpoint), - _ => None, - } - } - /// Gets the missing content digests of the record. pub fn missing_content(&self) -> &[AnyHash] { match &self.state { @@ -89,10 +79,10 @@ pub enum PackageRecordState { }, /// The package record was successfully published to the log. Published { - /// The checkpoint that the record was included in. - checkpoint: SerdeEnvelope, /// The envelope of the package record. record: ProtoEnvelopeBody, + /// The index of the record in the registry log. + registry_log_index: u32, /// The content sources of the record. content_sources: HashMap>, }, diff --git a/crates/crypto/src/hash/static.rs b/crates/crypto/src/hash/static.rs index 98688eda..8a4bd2ca 100644 --- a/crates/crypto/src/hash/static.rs +++ b/crates/crypto/src/hash/static.rs @@ -9,7 +9,7 @@ use crate::{ByteVisitor, VisitBytes}; use super::{Output, SupportedDigest}; -#[derive(PartialOrd, Ord)] +#[derive(Default, PartialOrd, Ord)] pub struct Hash { pub(crate) digest: Output, } diff --git a/crates/protocol/src/serde_envelope.rs b/crates/protocol/src/serde_envelope.rs index aa1f6500..b160bbc0 100644 --- a/crates/protocol/src/serde_envelope.rs +++ b/crates/protocol/src/serde_envelope.rs @@ -45,6 +45,10 @@ impl SerdeEnvelope { }) } + pub fn into_contents(self) -> Contents { + self.contents + } + pub fn key_id(&self) -> &signing::KeyID { &self.key_id } diff --git a/crates/server/openapi.yaml b/crates/server/openapi.yaml index 520646bb..4da61026 100644 --- a/crates/server/openapi.yaml +++ b/crates/server/openapi.yaml @@ -746,9 +746,9 @@ components: description: The state of the package record. enum: [published] example: published - checkpoint: - "$ref": "#/components/schemas/SignedMapCheckpoint" - description: The checkpoint of the record. + registryLogIndex: + type: number + description: The index of the record in the registry log. record: "$ref": "#/components/schemas/EnvelopeBody" description: The package record. diff --git a/crates/server/src/api/v1/package.rs b/crates/server/src/api/v1/package.rs index f87c9519..f2a0606e 100644 --- a/crates/server/src/api/v1/package.rs +++ b/crates/server/src/api/v1/package.rs @@ -289,7 +289,7 @@ async fn get_record( id: record_id, state: PackageRecordState::Published { record: record.envelope.into(), - checkpoint: record.checkpoint.unwrap(), + registry_log_index: record.registry_log_index.unwrap(), content_sources, }, })) diff --git a/crates/server/src/datastore/memory.rs b/crates/server/src/datastore/memory.rs index d4ecd52b..d5aa3593 100644 --- a/crates/server/src/datastore/memory.rs +++ b/crates/server/src/datastore/memory.rs @@ -11,14 +11,18 @@ use warg_crypto::{hash::AnyHash, Signable}; use warg_protocol::{ operator, package::{self, PackageEntry}, - registry::{LogId, LogLeaf, MapCheckpoint, PackageId, RecordId}, + registry::{LogId, MapCheckpoint, PackageId, RecordId}, ProtoEnvelope, SerdeEnvelope, }; +struct Entry { + registry_index: u32, + record_content: ProtoEnvelope, +} + struct Log { validator: V, - entries: Vec>, - checkpoint_indices: Vec, + entries: Vec>, } impl Default for Log @@ -29,7 +33,6 @@ where Self { validator: V::default(), entries: Vec::new(), - checkpoint_indices: Vec::new(), } } } @@ -37,8 +40,8 @@ where struct Record { /// Index in the log's entries. index: usize, - /// Index in the checkpoints map. - checkpoint_index: Option, + /// Index in the registry's log. + registry_index: u32, } enum PendingRecord { @@ -77,13 +80,6 @@ struct State { records: HashMap>, } -fn get_records_before_checkpoint(indices: &[usize], checkpoint_index: usize) -> usize { - indices - .iter() - .filter(|index| **index <= checkpoint_index) - .count() -} - /// Represents an in-memory data store. /// /// Data is not persisted between restarts of the server. @@ -161,10 +157,11 @@ impl DataStore for MemoryDataStore { Ok(()) } - async fn validate_operator_record( + async fn commit_operator_record( &self, log_id: &LogId, record_id: &RecordId, + registry_log_index: u32, ) -> Result<(), DataStoreError> { let mut state = self.0.write().await; @@ -189,10 +186,13 @@ impl DataStore for MemoryDataStore { { Ok(_) => { let index = log.entries.len(); - log.entries.push(record); + log.entries.push(Entry { + registry_index: registry_log_index, + record_content: record, + }); *status = RecordStatus::Validated(Record { index, - checkpoint_index: None, + registry_index: registry_log_index, }); Ok(()) } @@ -266,10 +266,11 @@ impl DataStore for MemoryDataStore { Ok(()) } - async fn validate_package_record( + async fn commit_package_record( &self, log_id: &LogId, record_id: &RecordId, + registry_log_index: u32, ) -> Result<(), DataStoreError> { let mut state = self.0.write().await; @@ -294,10 +295,13 @@ impl DataStore for MemoryDataStore { { Ok(_) => { let index = log.entries.len(); - log.entries.push(record); + log.entries.push(Entry { + registry_index: registry_log_index, + record_content: record, + }); *status = RecordStatus::Validated(Record { index, - checkpoint_index: None, + registry_index: registry_log_index, }); Ok(()) } @@ -380,38 +384,14 @@ impl DataStore for MemoryDataStore { &self, checkpoint_id: &AnyHash, checkpoint: SerdeEnvelope, - participants: &[LogLeaf], ) -> Result<(), DataStoreError> { let mut state = self.0.write().await; - let (index, prev) = state + let (_, prev) = state .checkpoints .insert_full(checkpoint_id.clone(), checkpoint); assert!(prev.is_none()); - for leaf in participants { - if let Some(log) = state.operators.get_mut(&leaf.log_id) { - log.checkpoint_indices.push(index); - } else if let Some(log) = state.packages.get_mut(&leaf.log_id) { - log.checkpoint_indices.push(index); - } else { - unreachable!("log not found"); - } - - match state - .records - .get_mut(&leaf.log_id) - .unwrap() - .get_mut(&leaf.record_id) - .unwrap() - { - RecordStatus::Validated(record) => { - record.checkpoint_index = Some(index); - } - _ => unreachable!(), - } - } - Ok(()) } @@ -435,20 +415,27 @@ impl DataStore for MemoryDataStore { .get(log_id) .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone()))?; - if let Some(checkpoint_index) = state.checkpoints.get_index_of(checkpoint_id) { - let start = match since { - Some(since) => match &state.records[log_id][since] { - RecordStatus::Validated(record) => record.index + 1, - _ => unreachable!(), - }, - None => 0, - }; - - let end = get_records_before_checkpoint(&log.checkpoint_indices, checkpoint_index); - Ok(log.entries[start..std::cmp::min(end, start + limit as usize)].to_vec()) - } else { - Err(DataStoreError::CheckpointNotFound(checkpoint_id.clone())) - } + let Some(checkpoint) = state.checkpoints.get(checkpoint_id) else { + return Err(DataStoreError::CheckpointNotFound(checkpoint_id.clone())); + }; + + let start_log_idx = match since { + Some(since) => match &state.records[log_id][since] { + RecordStatus::Validated(record) => record.index + 1, + _ => unreachable!(), + }, + None => 0, + }; + let end_registry_idx = checkpoint.as_ref().log_length; + + Ok(log + .entries + .iter() + .skip(start_log_idx) + .take_while(|entry| entry.registry_index < end_registry_idx) + .map(|entry| entry.record_content.clone()) + .take(limit as usize) + .collect()) } async fn get_package_records( @@ -465,20 +452,27 @@ impl DataStore for MemoryDataStore { .get(log_id) .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone()))?; - if let Some(checkpoint_index) = state.checkpoints.get_index_of(checkpoint_id) { - let start = match since { - Some(since) => match &state.records[log_id][since] { - RecordStatus::Validated(record) => record.index + 1, - _ => unreachable!(), - }, - None => 0, - }; - - let end = get_records_before_checkpoint(&log.checkpoint_indices, checkpoint_index); - Ok(log.entries[start..std::cmp::min(end, start + limit as usize)].to_vec()) - } else { - Err(DataStoreError::CheckpointNotFound(checkpoint_id.clone())) - } + let Some(checkpoint) = state.checkpoints.get(checkpoint_id) else { + return Err(DataStoreError::CheckpointNotFound(checkpoint_id.clone())); + }; + + let start_log_idx = match since { + Some(since) => match &state.records[log_id][since] { + RecordStatus::Validated(record) => record.index + 1, + _ => unreachable!(), + }, + None => 0, + }; + let end_registry_idx = checkpoint.as_ref().log_length; + + Ok(log + .entries + .iter() + .skip(start_log_idx) + .take_while(|entry| entry.registry_index < end_registry_idx) + .map(|entry| entry.record_content.clone()) + .take(limit as usize) + .collect()) } async fn get_operator_record( @@ -494,7 +488,7 @@ impl DataStore for MemoryDataStore { .get(record_id) .ok_or_else(|| DataStoreError::RecordNotFound(record_id.clone()))?; - let (status, envelope, checkpoint) = match status { + let (status, envelope, registry_log_index) = match status { RecordStatus::Pending(PendingRecord::Operator { record, .. }) => { (super::RecordStatus::Pending, record.clone().unwrap(), None) } @@ -509,16 +503,20 @@ impl DataStore for MemoryDataStore { .get(log_id) .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone()))?; - let checkpoint = r.checkpoint_index.map(|i| state.checkpoints[i].clone()); + let published_length = state + .checkpoints + .last() + .map(|(_, c)| c.as_ref().log_length) + .unwrap_or_default(); ( - if checkpoint.is_some() { + if r.registry_index < published_length { super::RecordStatus::Published } else { super::RecordStatus::Validated }, - log.entries[r.index].clone(), - checkpoint, + log.entries[r.index].record_content.clone(), + Some(r.registry_index), ) } _ => return Err(DataStoreError::RecordNotFound(record_id.clone())), @@ -527,7 +525,7 @@ impl DataStore for MemoryDataStore { Ok(super::Record { status, envelope, - checkpoint, + registry_log_index, }) } @@ -544,7 +542,7 @@ impl DataStore for MemoryDataStore { .get(record_id) .ok_or_else(|| DataStoreError::RecordNotFound(record_id.clone()))?; - let (status, envelope, checkpoint) = match status { + let (status, envelope, registry_log_index) = match status { RecordStatus::Pending(PendingRecord::Package { record, .. }) => { (super::RecordStatus::Pending, record.clone().unwrap(), None) } @@ -559,16 +557,20 @@ impl DataStore for MemoryDataStore { .get(log_id) .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone()))?; - let checkpoint = r.checkpoint_index.map(|i| state.checkpoints[i].clone()); + let published_length = state + .checkpoints + .last() + .map(|(_, c)| c.as_ref().log_length) + .unwrap_or_default(); ( - if checkpoint.is_some() { + if r.registry_index < published_length { super::RecordStatus::Published } else { super::RecordStatus::Validated }, - log.entries[r.index].clone(), - checkpoint, + log.entries[r.index].record_content.clone(), + Some(r.registry_index), ) } _ => return Err(DataStoreError::RecordNotFound(record_id.clone())), @@ -577,7 +579,7 @@ impl DataStore for MemoryDataStore { Ok(super::Record { status, envelope, - checkpoint, + registry_log_index, }) } diff --git a/crates/server/src/datastore/mod.rs b/crates/server/src/datastore/mod.rs index fa271741..f686a176 100644 --- a/crates/server/src/datastore/mod.rs +++ b/crates/server/src/datastore/mod.rs @@ -98,10 +98,10 @@ where pub status: RecordStatus, /// The envelope containing the record contents. pub envelope: ProtoEnvelope, - /// The checkpoint of the record. + /// The index of the record in the registry log. /// /// This is `None` if the record is not published. - pub checkpoint: Option>, + pub registry_log_index: Option, } /// Implemented by data stores. @@ -135,15 +135,16 @@ pub trait DataStore: Send + Sync { reason: &str, ) -> Result<(), DataStoreError>; - /// Validates the given operator record. + /// Commits the given operator record. /// /// The record must be in a pending state. /// /// If validation succeeds, the record will be considered part of the log. - async fn validate_operator_record( + async fn commit_operator_record( &self, log_id: &LogId, record_id: &RecordId, + registry_log_index: u32, ) -> Result<(), DataStoreError>; /// Stores the given package record. @@ -169,15 +170,16 @@ pub trait DataStore: Send + Sync { reason: &str, ) -> Result<(), DataStoreError>; - /// Validates the given package record. + /// Commits the given package record. /// /// The record must be in a pending state. /// /// If validation succeeds, the record will be considered part of the log. - async fn validate_package_record( + async fn commit_package_record( &self, log_id: &LogId, record_id: &RecordId, + registry_log_index: u32, ) -> Result<(), DataStoreError>; /// Determines if the given content digest is missing for the record. @@ -210,7 +212,6 @@ pub trait DataStore: Send + Sync { &self, checkpoint_id: &AnyHash, checkpoint: SerdeEnvelope, - participants: &[LogLeaf], ) -> Result<(), DataStoreError>; /// Gets the latest checkpoint. diff --git a/crates/server/src/datastore/postgres/migrations/2023-04-11-013700_initial_db/up.sql b/crates/server/src/datastore/postgres/migrations/2023-04-11-013700_initial_db/up.sql index 91d4946b..0d982847 100644 --- a/crates/server/src/datastore/postgres/migrations/2023-04-11-013700_initial_db/up.sql +++ b/crates/server/src/datastore/postgres/migrations/2023-04-11-013700_initial_db/up.sql @@ -33,6 +33,7 @@ CREATE TABLE records ( id SERIAL PRIMARY KEY, log_id INTEGER NOT NULL REFERENCES logs(id), record_id TEXT NOT NULL UNIQUE, + registry_log_index INTEGER UNIQUE, checkpoint_id INTEGER REFERENCES checkpoints(id), content BYTEA NOT NULL, status record_status NOT NULL DEFAULT 'pending', diff --git a/crates/server/src/datastore/postgres/mod.rs b/crates/server/src/datastore/postgres/mod.rs index 6538709d..e6efae80 100644 --- a/crates/server/src/datastore/postgres/mod.rs +++ b/crates/server/src/datastore/postgres/mod.rs @@ -1,6 +1,6 @@ use self::models::{ - Checkpoint, CheckpointData, NewCheckpoint, NewContent, NewLog, NewRecord, ParsedText, - RecordContent, RecordStatus, TextRef, + Checkpoint, NewCheckpoint, NewContent, NewLog, NewRecord, ParsedText, RecordContent, + RecordStatus, TextRef, }; use super::{DataStore, DataStoreError, InitialLeaf, Record}; use anyhow::{anyhow, Result}; @@ -190,16 +190,18 @@ async fn reject_record( Ok(()) } -async fn validate_record( +async fn commit_record( conn: &mut AsyncPgConnection, log_id: i32, record_id: &RecordId, + registry_log_index: u32, ) -> Result<(), DataStoreError> where V: Validator + 'static, ::Error: ToString + Send + Sync, DataStoreError: From<::Error>, { + let registry_log_index: i32 = registry_log_index.try_into().unwrap(); conn.transaction::<_, DataStoreError, _>(|conn| { async move { // Get the record content and validator @@ -242,7 +244,10 @@ where // Finally, mark the record as validated diesel::update(schema::records::table) .filter(schema::records::id.eq(id)) - .set(schema::records::status.eq(RecordStatus::Validated)) + .set(( + schema::records::status.eq(RecordStatus::Validated), + schema::records::registry_log_index.eq(Some(registry_log_index)), + )) .execute(conn) .await?; @@ -271,18 +276,14 @@ where .optional()? .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone()))?; - let (record, checkpoint) = schema::records::table - .left_join(schema::checkpoints::table) - .select(( - RecordContent::as_select(), - Option::::as_select(), - )) + let record = schema::records::table + .select(RecordContent::as_select()) .filter( schema::records::record_id .eq(TextRef(record_id)) .and(schema::records::log_id.eq(log_id)), ) - .first::<(RecordContent, Option)>(conn) + .first::(conn) .await .optional()? .ok_or_else(|| DataStoreError::RecordNotFound(record_id.clone()))?; @@ -309,7 +310,7 @@ where } } RecordStatus::Validated => { - if checkpoint.is_some() { + if record.registry_log_index.is_some() { super::RecordStatus::Published } else { super::RecordStatus::Validated @@ -325,17 +326,7 @@ where message: e.to_string(), } })?, - checkpoint: checkpoint.map(|c| { - SerdeEnvelope::from_parts_unchecked( - MapCheckpoint { - log_root: c.log_root.0, - log_length: c.log_length as u32, - map_root: c.map_root.0, - }, - c.key_id.0, - c.signature.0, - ) - }), + registry_log_index: record.registry_log_index.map(|idx| idx.try_into().unwrap()), }) } @@ -459,10 +450,11 @@ impl DataStore for PostgresDataStore { reject_record(conn.as_mut(), log_id, record_id, reason).await } - async fn validate_operator_record( + async fn commit_operator_record( &self, log_id: &LogId, record_id: &RecordId, + registry_log_index: u32, ) -> Result<(), DataStoreError> { let mut conn = self.pool.get().await?; let log_id = schema::logs::table @@ -473,7 +465,14 @@ impl DataStore for PostgresDataStore { .optional()? .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone()))?; - match validate_record::(conn.as_mut(), log_id, record_id).await { + match commit_record::( + conn.as_mut(), + log_id, + record_id, + registry_log_index, + ) + .await + { Ok(()) => Ok(()), Err(e) => { reject_record(conn.as_mut(), log_id, record_id, &e.to_string()).await?; @@ -520,10 +519,11 @@ impl DataStore for PostgresDataStore { reject_record(conn.as_mut(), log_id, record_id, reason).await } - async fn validate_package_record( + async fn commit_package_record( &self, log_id: &LogId, record_id: &RecordId, + registry_log_index: u32, ) -> Result<(), DataStoreError> { let mut conn = self.pool.get().await?; let log_id = schema::logs::table @@ -534,7 +534,14 @@ impl DataStore for PostgresDataStore { .optional()? .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone()))?; - match validate_record::(conn.as_mut(), log_id, record_id).await { + match commit_record::( + conn.as_mut(), + log_id, + record_id, + registry_log_index, + ) + .await + { Ok(()) => Ok(()), Err(e) => { reject_record(conn.as_mut(), log_id, record_id, &e.to_string()).await?; @@ -633,15 +640,7 @@ impl DataStore for PostgresDataStore { &self, checkpoint_id: &AnyHash, checkpoint: SerdeEnvelope, - participants: &[LogLeaf], ) -> Result<(), DataStoreError> { - let participants = participants - .iter() - .map(|l| l.record_id.to_string()) - .collect::>(); - - let expected_count = participants.len(); - assert!(expected_count > 0); let mut conn = self.pool.get().await?; conn.transaction::<_, DataStoreError, _>(|conn| { @@ -653,7 +652,7 @@ impl DataStore for PostgresDataStore { } = checkpoint.as_ref(); // Insert the checkpoint - let id = diesel::insert_into(schema::checkpoints::table) + diesel::insert_into(schema::checkpoints::table) .values(NewCheckpoint { checkpoint_id: TextRef(checkpoint_id), log_root: TextRef(log_root), @@ -666,18 +665,6 @@ impl DataStore for PostgresDataStore { .get_result::(conn) .await?; - // Update all the participants - let count = diesel::update(schema::records::table) - .filter(schema::records::record_id.eq_any(participants)) - .set(schema::records::checkpoint_id.eq(id)) - .execute(conn) - .await?; - - assert_eq!( - count, expected_count, - "failed to checkpoint: failed to update all participants" - ); - Ok(()) } .scope_boxed() diff --git a/crates/server/src/datastore/postgres/models.rs b/crates/server/src/datastore/postgres/models.rs index ea2066f2..6dae65b6 100644 --- a/crates/server/src/datastore/postgres/models.rs +++ b/crates/server/src/datastore/postgres/models.rs @@ -107,6 +107,7 @@ pub struct Checkpoint { #[diesel(table_name = records)] pub struct RecordContent { pub status: RecordStatus, + pub registry_log_index: Option, pub reason: Option, pub content: Vec, } diff --git a/crates/server/src/datastore/postgres/schema.rs b/crates/server/src/datastore/postgres/schema.rs index f486601a..b95fcd53 100644 --- a/crates/server/src/datastore/postgres/schema.rs +++ b/crates/server/src/datastore/postgres/schema.rs @@ -50,6 +50,7 @@ diesel::table! { id -> Int4, log_id -> Int4, record_id -> Text, + registry_log_index -> Nullable, checkpoint_id -> Nullable, content -> Bytea, status -> RecordStatus, @@ -63,4 +64,9 @@ diesel::joinable!(contents -> records (record_id)); diesel::joinable!(records -> checkpoints (checkpoint_id)); diesel::joinable!(records -> logs (log_id)); -diesel::allow_tables_to_appear_in_same_query!(checkpoints, contents, logs, records,); +diesel::allow_tables_to_appear_in_same_query!( + checkpoints, + contents, + logs, + records, +); diff --git a/crates/server/src/services/core.rs b/crates/server/src/services/core.rs index a90666e6..9f193097 100644 --- a/crates/server/src/services/core.rs +++ b/crates/server/src/services/core.rs @@ -176,20 +176,19 @@ struct Inner { impl Inner { // Load state from DataStore or initialize empty state, returning any // entries that are not yet part of a checkpoint. - async fn initialize(&mut self) -> Result, CoreServiceError> { + async fn initialize(&mut self) -> Result { let initial = self.store.get_initial_leaves().await?.peekable(); pin_mut!(initial); // If there are no stored log entries, initialize a new state if initial.as_mut().peek().await.is_none() { - self.initialize_new().await?; - return Ok(vec![]); + return self.initialize_new().await; } // Reconstruct internal state from previously-stored log entires let state = self.state.get_mut(); let mut pending_entries = vec![]; - let mut last_checkpoint = Hash::::of(()).into(); + let mut last_checkpoint = Hash::::default().into(); while let Some(res) = initial.next().await { let InitialLeaf { leaf, checkpoint } = res?; @@ -210,10 +209,10 @@ impl Inner { } } - Ok(pending_entries) + Ok(self.store.get_latest_checkpoint().await?.into_contents()) } - async fn initialize_new(&mut self) -> Result<(), CoreServiceError> { + async fn initialize_new(&mut self) -> Result { let state = self.state.get_mut(); // Construct operator init record @@ -236,22 +235,28 @@ impl Inner { .store_operator_record(&log_id, &record_id, &signed_init_record) .await?; self.store - .validate_operator_record(&log_id, &record_id) + .commit_operator_record(&log_id, &record_id, 0) .await?; // Update state with init record let entry = LogLeaf { log_id, record_id }; state.push_entry(entry.clone()); - self.update_checkpoint(vec![entry]).await; + // "zero" checkpoint to be updated + let mut checkpoint = MapCheckpoint { + log_root: Hash::::default().into(), + log_length: 0, + map_root: Hash::::default().into(), + }; + self.update_checkpoint(&mut checkpoint).await; - Ok(()) + Ok(checkpoint) } // Runs the service's state update loop. async fn process_state_updates( self: Arc, - mut pending_entries: Vec, + mut checkpoint: MapCheckpoint, mut submit_entry_rx: mpsc::Receiver, checkpoint_interval: Duration, ) { @@ -261,30 +266,28 @@ impl Inner { loop { tokio::select! { entry = submit_entry_rx.recv() => match entry { - Some(entry) => { - if self.process_package_entry(&entry).await.is_ok() { - pending_entries.push(entry); - } - } + Some(entry) => self.process_package_entry(&entry).await, None => break, // Channel closed }, - _ = checkpoint_interval.tick() => { - let new_entries = std::mem::take(&mut pending_entries); - self.update_checkpoint(new_entries).await; - }, + _ = checkpoint_interval.tick() => self.update_checkpoint(&mut checkpoint).await, } } } // Processes a submitted package entry - async fn process_package_entry(&self, entry: &LogLeaf) -> Result<(), ()> { + async fn process_package_entry(&self, entry: &LogLeaf) { + let mut state = self.state.write().await; let LogLeaf { log_id, record_id } = entry; // Validate and commit the package entry to the store - self.store - .validate_package_record(log_id, record_id) - .await - .map_err(|err| match err { + let registry_log_index = (state.log.length() - 1).try_into().unwrap(); + let commit_res = self + .store + .commit_package_record(log_id, record_id, registry_log_index) + .await; + + if let Err(err) = commit_res { + match err { DataStoreError::Rejection(_) | DataStoreError::OperatorValidationFailed(_) | DataStoreError::PackageValidationFailed(_) => { @@ -295,26 +298,26 @@ impl Inner { // queue with retry logic tracing::error!("failed to validate package record `{record_id}`: {e}"); } - })?; + } + } - let mut state = self.state.write().await; state.push_entry(entry.clone()); - Ok(()) } // Store a checkpoint including the given new entries - async fn update_checkpoint(&self, new_entries: Vec) { - if new_entries.is_empty() { + async fn update_checkpoint(&self, checkpoint: &mut MapCheckpoint) { + let next = self.state.write().await.checkpoint(); + if &next == checkpoint { return; } - let checkpoint = self.state.write().await.checkpoint(); let signed_checkpoint = SerdeEnvelope::signed_contents(&self.operator_key, checkpoint.clone()).unwrap(); let checkpoint_id = Hash::::of(signed_checkpoint.as_ref()).into(); self.store - .store_checkpoint(&checkpoint_id, signed_checkpoint, &new_entries) + .store_checkpoint(&checkpoint_id, signed_checkpoint) .await .unwrap(); + *checkpoint = next; } } diff --git a/crates/transparency/src/log/vec_log.rs b/crates/transparency/src/log/vec_log.rs index 8abdb5e2..e47380a0 100644 --- a/crates/transparency/src/log/vec_log.rs +++ b/crates/transparency/src/log/vec_log.rs @@ -34,6 +34,11 @@ where D: SupportedDigest, V: VisitBytes, { + /// Returns the number of entries in the log. + pub fn length(&self) -> usize { + self.length + } + fn get_digest(&self, node: Node) -> Hash { self.tree[node.index()].clone() }