Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: return status when importing #494

Merged
merged 2 commits into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/fuzz/src/crdt_fuzzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use arbitrary::Arbitrary;
use fxhash::FxHashSet;
use loro::{ContainerType, Frontiers, LoroError, LoroResult};
use loro::{ContainerType, Frontiers, ImportStatus, LoroError, LoroResult};
use tabled::TableIteratorExt;
use tracing::{info, info_span, trace};

Expand Down Expand Up @@ -271,7 +271,7 @@ impl CRDTFuzzer {
}
}

fn handle_import_result(e: LoroResult<()>) {
fn handle_import_result(e: LoroResult<ImportStatus>) {
match e {
Ok(_) => {}
Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion) => {
Expand Down
3 changes: 2 additions & 1 deletion crates/loro-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,8 @@ impl TryFrom<&str> for TreeID {

#[cfg(feature = "wasm")]
pub mod wasm {
use crate::{LoroError, TreeID};
use crate::{IdSpanVector, LoroError, TreeID};
use js_sys::Map;
use wasm_bindgen::JsValue;
impl From<TreeID> for JsValue {
fn from(value: TreeID) -> Self {
Expand Down
28 changes: 28 additions & 0 deletions crates/loro-common/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ impl CounterSpan {
}
}

pub fn extend_include(&mut self, new_start: Counter, new_end: Counter) {
self.set_start(new_start);
self.set_end(new_end);
}

/// if we can merge element on the left, this method return the last atom of it
fn prev_pos(&self) -> i32 {
if self.start < self.end {
Expand Down Expand Up @@ -509,6 +514,29 @@ impl From<ID> for IdSpan {
}
}

#[cfg(feature = "wasm")]
mod wasm {
use js_sys::Object;
use wasm_bindgen::JsValue;

use super::CounterSpan;

impl From<CounterSpan> for JsValue {
fn from(value: CounterSpan) -> Self {
let obj = Object::new();
js_sys::Reflect::set(
&obj,
&JsValue::from_str("start"),
&JsValue::from(value.start),
)
.unwrap();
js_sys::Reflect::set(&obj, &JsValue::from_str("end"), &JsValue::from(value.end))
.unwrap();
obj.into()
}
}
}

#[cfg(test)]
mod test_id_span {
use super::*;
Expand Down
12 changes: 6 additions & 6 deletions crates/loro-ffi/src/doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::{
};

use loro::{
cursor::CannotFindRelativePosition, DocAnalysis, FrontiersNotIncluded, IdSpan, JsonPathError,
JsonSchema, Lamport, LoroDoc as InnerLoroDoc, LoroEncodeError, LoroError, LoroResult, PeerID,
Timestamp, ID,
cursor::CannotFindRelativePosition, DocAnalysis, FrontiersNotIncluded, IdSpan, ImportStatus,
JsonPathError, JsonSchema, Lamport, LoroDoc as InnerLoroDoc, LoroEncodeError, LoroError,
LoroResult, PeerID, Timestamp, ID,
};

use crate::{
Expand Down Expand Up @@ -253,7 +253,7 @@ impl LoroDoc {

/// Import updates/snapshot exported by [`LoroDoc::export_snapshot`] or [`LoroDoc::export_from`].
#[inline]
pub fn import(&self, bytes: &[u8]) -> Result<(), LoroError> {
pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
self.doc.import_with(bytes, "")
}

Expand All @@ -262,11 +262,11 @@ impl LoroDoc {
/// It marks the import with a custom `origin` string. It can be used to track the import source
/// in the generated events.
#[inline]
pub fn import_with(&self, bytes: &[u8], origin: &str) -> Result<(), LoroError> {
pub fn import_with(&self, bytes: &[u8], origin: &str) -> Result<ImportStatus, LoroError> {
self.doc.import_with(bytes, origin)
}

pub fn import_json_updates(&self, json: &str) -> Result<(), LoroError> {
pub fn import_json_updates(&self, json: &str) -> Result<ImportStatus, LoroError> {
self.doc.import_json_updates(json)
}

Expand Down
54 changes: 48 additions & 6 deletions crates/loro-internal/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ pub(crate) mod value_register;
pub(crate) use outdated_encode_reordered::{
decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
};
use outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult};
pub(crate) use value::OwnedValue;

use crate::op::OpWithId;
use crate::version::Frontiers;
use crate::version::{Frontiers, VersionRange, VersionVectorDiff};
use crate::LoroDoc;
use crate::{oplog::OpLog, LoroError, VersionVector};
use loro_common::{IdLpSpan, IdSpan, LoroEncodeError, LoroResult, PeerID, ID};
use loro_common::{
CounterSpan, HasCounter, HasCounterSpan, IdLpSpan, IdSpan, IdSpanVector, LoroEncodeError,
LoroResult, PeerID, ID,
};
use num_traits::{FromPrimitive, ToPrimitive};
use rle::{HasLength, Sliceable};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -166,6 +170,12 @@ impl TryFrom<[u8; 2]> for EncodeMode {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct ImportStatus {
pub success: IdSpanVector,
pub pending: Option<IdSpanVector>,
}

/// The encoder used to encode the container states.
///
/// Each container state can be represented by a sequence of operations.
Expand Down Expand Up @@ -243,16 +253,44 @@ pub(crate) fn encode_oplog(oplog: &OpLog, vv: &VersionVector, mode: EncodeMode)
pub(crate) fn decode_oplog(
oplog: &mut OpLog,
parsed: ParsedHeaderAndBody,
) -> Result<(), LoroError> {
) -> Result<ImportStatus, LoroError> {
let before_vv = oplog.vv().clone();
let ParsedHeaderAndBody { mode, body, .. } = parsed;
match mode {
let changes = match mode {
EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
outdated_encode_reordered::decode_updates(oplog, body)
}
EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
EncodeMode::Auto => unreachable!(),
}?;
let ImportChangesResult {
latest_ids,
pending_changes,
changes_that_deps_on_trimmed_history,
} = import_changes_to_oplog(changes, oplog);

let mut pending = IdSpanVector::default();
pending_changes.iter().for_each(|c| {
let peer = c.id.peer;
let start = c.ctr_start();
let end = c.ctr_end();
pending
.entry(peer)
.or_insert_with(|| CounterSpan::new(start, end))
.extend_include(start, end);
});
// TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
oplog.try_apply_pending(latest_ids);
oplog.import_unknown_lamport_pending_changes(pending_changes)?;
let after_vv = oplog.vv();
if !changes_that_deps_on_trimmed_history.is_empty() {
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}
Ok(ImportStatus {
success: before_vv.diff(after_vv).right,
pending: (!pending.is_empty()).then_some(pending),
})
}

pub(crate) struct ParsedHeaderAndBody<'a> {
Expand Down Expand Up @@ -427,12 +465,16 @@ pub(crate) fn decode_snapshot(
doc: &LoroDoc,
mode: EncodeMode,
body: &[u8],
) -> Result<(), LoroError> {
) -> Result<ImportStatus, LoroError> {
match mode {
EncodeMode::OutdatedSnapshot => outdated_encode_reordered::decode_snapshot(doc, body),
EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot(doc, body.to_vec().into()),
_ => unreachable!(),
}
};
Ok(ImportStatus {
success: doc.oplog_vv().diff(&Default::default()).left,
pending: None,
})
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
34 changes: 7 additions & 27 deletions crates/loro-internal/src/encoding/fast_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
//!
use std::io::{Read, Write};

use crate::{encoding::trimmed_snapshot, oplog::ChangeStore, LoroDoc, OpLog, VersionVector};
use crate::{
change::Change, encoding::trimmed_snapshot, oplog::ChangeStore, LoroDoc, OpLog, VersionVector,
};
use bytes::{Buf, Bytes};
use loro_common::{IdSpan, LoroError, LoroResult};
use tracing::trace;
Expand Down Expand Up @@ -208,7 +210,7 @@ pub(crate) fn encode_snapshot<W: std::io::Write>(doc: &LoroDoc, w: &mut W) {
}
}

pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<(), LoroError> {
pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<Vec<Change>, LoroError> {
let oplog_len = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
let oplog_bytes = &bytes[4..4 + oplog_len as usize];
let mut changes = ChangeStore::decode_snapshot_for_updates(
Expand All @@ -217,18 +219,7 @@ pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<(), LoroEr
oplog.vv(),
)?;
changes.sort_unstable_by_key(|x| x.lamport);
let ImportChangesResult {
latest_ids,
pending_changes,
changes_that_deps_on_trimmed_history,
} = import_changes_to_oplog(changes, oplog);
// TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
oplog.try_apply_pending(latest_ids);
oplog.import_unknown_lamport_pending_changes(pending_changes)?;
if !changes_that_deps_on_trimmed_history.is_empty() {
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}
Ok(())
Ok(changes)
}

pub(crate) fn encode_updates<W: std::io::Write>(doc: &LoroDoc, vv: &VersionVector, w: &mut W) {
Expand All @@ -244,7 +235,7 @@ pub(crate) fn encode_updates_in_range<W: std::io::Write>(
oplog.export_blocks_in_range(spans, w);
}

pub(crate) fn decode_updates(oplog: &mut OpLog, body: Bytes) -> Result<(), LoroError> {
pub(crate) fn decode_updates(oplog: &mut OpLog, body: Bytes) -> Result<Vec<Change>, LoroError> {
let mut reader: &[u8] = body.as_ref();
let mut index = 0;
let self_vv = oplog.vv();
Expand All @@ -263,16 +254,5 @@ pub(crate) fn decode_updates(oplog: &mut OpLog, body: Bytes) -> Result<(), LoroE
}

changes.sort_unstable_by_key(|x| x.lamport);
let ImportChangesResult {
latest_ids,
pending_changes,
changes_that_deps_on_trimmed_history,
} = import_changes_to_oplog(changes, oplog);
// TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
oplog.try_apply_pending(latest_ids);
oplog.import_unknown_lamport_pending_changes(pending_changes)?;
if !changes_that_deps_on_trimmed_history.is_empty() {
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}
Ok(())
Ok(changes)
}
35 changes: 25 additions & 10 deletions crates/loro-internal/src/encoding/json_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::sync::Arc;

use either::Either;
use loro_common::{
ContainerID, ContainerType, HasCounterSpan, IdLp, LoroError, LoroResult, LoroValue, PeerID,
TreeID, ID,
ContainerID, ContainerType, CounterSpan, HasCounter, HasCounterSpan, IdLp, IdSpanVector,
LoroError, LoroResult, LoroValue, PeerID, TreeID, ID,
};
use rle::{HasLength, RleVec, Sliceable};

Expand All @@ -22,8 +22,9 @@ use crate::{
OpLog, VersionVector,
};

use super::outdated_encode_reordered::{
import_changes_to_oplog, ImportChangesResult, ValueRegister,
use super::{
outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult, ValueRegister},
ImportStatus,
};
use json::{JsonOpContent, JsonSchema};

Expand Down Expand Up @@ -66,20 +67,34 @@ pub(crate) fn export_json<'a, 'c: 'a>(
}
}

pub(crate) fn import_json(oplog: &mut OpLog, json: JsonSchema) -> LoroResult<()> {
pub(crate) fn import_json(oplog: &mut OpLog, json: JsonSchema) -> LoroResult<ImportStatus> {
let before_vv = oplog.vv().clone();
let changes = decode_changes(json, &oplog.arena)?;
let ImportChangesResult {
latest_ids,
pending_changes,
changes_that_deps_on_trimmed_history,
} = import_changes_to_oplog(changes, oplog);
let mut pending = IdSpanVector::default();
pending_changes.iter().for_each(|c| {
let peer = c.id.peer;
let start = c.ctr_start();
let end = c.ctr_end();
pending
.entry(peer)
.or_insert_with(|| CounterSpan::new(start, end))
.extend_include(start, end);
});
oplog.try_apply_pending(latest_ids);
oplog.import_unknown_lamport_pending_changes(pending_changes)?;
if changes_that_deps_on_trimmed_history.is_empty() {
Ok(())
} else {
Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion)
}
if !changes_that_deps_on_trimmed_history.is_empty() {
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
};
let after_vv = oplog.vv();
Ok(ImportStatus {
success: before_vv.diff(after_vv).right,
pending: (!pending.is_empty()).then_some(pending),
})
}

fn init_encode<'s, 'a: 's>(
Expand Down
22 changes: 8 additions & 14 deletions crates/loro-internal/src/encoding/outdated_encode_reordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{

use self::encode::{encode_changes, encode_ops, init_encode, TempOp};

use super::ImportStatus;
use super::{
arena::*,
parse_header_and_body,
Expand Down Expand Up @@ -136,7 +137,7 @@ pub(crate) fn encode_updates(oplog: &OpLog, vv: &VersionVector) -> Vec<u8> {
}

#[instrument(skip_all)]
pub(crate) fn decode_updates(oplog: &mut OpLog, bytes: &[u8]) -> LoroResult<()> {
pub(crate) fn decode_updates(oplog: &mut OpLog, bytes: &[u8]) -> LoroResult<Vec<Change>> {
let iter = serde_columnar::iter_from_bytes::<EncodedDoc>(bytes)?;
let mut arenas = decode_arena(&iter.arenas)?;
let ops_map = extract_ops(
Expand All @@ -163,19 +164,8 @@ pub(crate) fn decode_updates(oplog: &mut OpLog, bytes: &[u8]) -> LoroResult<()>
deps,
ops_map,
)?;
let ImportChangesResult {
latest_ids,
pending_changes,
changes_that_deps_on_trimmed_history,
} = import_changes_to_oplog(changes, oplog);
// TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
oplog.try_apply_pending(latest_ids);
oplog.import_unknown_lamport_pending_changes(pending_changes)?;
if !changes_that_deps_on_trimmed_history.is_empty() {
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}

Ok(())
Ok(changes)
}

pub fn decode_import_blob_meta(bytes: &[u8]) -> LoroResult<ImportBlobMetadata> {
Expand Down Expand Up @@ -734,7 +724,11 @@ pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: &[u8]) -> LoroResult<()> {
doc.update_oplog_and_apply_delta_to_state_if_needed(
|oplog| {
oplog.try_apply_pending(latest_ids);
Ok(())
// ImportStatus is unnecessary
Ok(ImportStatus {
success: Default::default(),
pending: None,
})
},
"".into(),
)?;
Expand Down
Loading
Loading