Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: change the first param of travel change from id to ids #492

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ jobs:
with:
version: "0.2.92"
- uses: Swatinem/rust-cache@v2
- name: Check
run: cargo clippy --all-features
- name: Build
run: cargo build --verbose
- name: Run rust tests
run: deno task test
- name: Run wasm tests
run: deno task test-wasm
run: deno task test-all
1 change: 1 addition & 0 deletions crates/loro-ffi/src/doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ impl LoroDoc {
let s = self.doc.subscribe_local_update(Box::new(move |update| {
// TODO: should it be cloned?
callback.on_local_update(update.to_vec());
true
}));
Arc::new(Subscription(Arc::new(Mutex::new(s))))
}
Expand Down
3 changes: 3 additions & 0 deletions crates/loro-internal/src/allocation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#![allow(dead_code)]
#![allow(unused)]

mod bfs;
pub(crate) use bfs::calc_critical_version_bfs as calc_critical_version;

Expand Down
7 changes: 3 additions & 4 deletions crates/loro-internal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub use state::{TreeNode, TreeNodeWithChildren, TreeParentId};
use subscription::{LocalUpdateCallback, Observer, PeerIdUpdateCallback};
use txn::Transaction;
pub use undo::UndoManager;
use utils::subscription::SubscriberSet;
use utils::subscription::SubscriberSetWithQueue;
pub use utils::subscription::Subscription;
pub mod allocation;
pub mod awareness;
Expand Down Expand Up @@ -120,7 +120,6 @@ pub struct LoroDoc {
txn: Arc<Mutex<Option<Transaction>>>,
auto_commit: AtomicBool,
detached: AtomicBool,

local_update_subs: SubscriberSet<(), LocalUpdateCallback>,
peer_id_change_subs: SubscriberSet<(), PeerIdUpdateCallback>,
local_update_subs: SubscriberSetWithQueue<(), LocalUpdateCallback, Vec<u8>>,
peer_id_change_subs: SubscriberSetWithQueue<(), PeerIdUpdateCallback, ID>,
}
60 changes: 37 additions & 23 deletions crates/loro-internal/src/loro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::{
subscription::{LocalUpdateCallback, Observer, Subscriber},
txn::Transaction,
undo::DiffBatch,
utils::subscription::{SubscriberSet, Subscription},
utils::subscription::{SubscriberSetWithQueue, Subscription},
version::{shrink_frontiers, Frontiers, ImVersionVector},
ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroError, MapHandler,
VersionVector,
Expand Down Expand Up @@ -90,8 +90,8 @@ impl LoroDoc {
diff_calculator: Arc::new(Mutex::new(DiffCalculator::new(true))),
txn: global_txn,
arena,
local_update_subs: SubscriberSet::new(),
peer_id_change_subs: SubscriberSet::new(),
local_update_subs: SubscriberSetWithQueue::new(),
peer_id_change_subs: SubscriberSetWithQueue::new(),
}
}

Expand Down Expand Up @@ -120,9 +120,8 @@ impl LoroDoc {
txn,
auto_commit: AtomicBool::new(false),
detached: AtomicBool::new(self.is_detached()),

local_update_subs: SubscriberSet::new(),
peer_id_change_subs: SubscriberSet::new(),
local_update_subs: SubscriberSetWithQueue::new(),
peer_id_change_subs: SubscriberSetWithQueue::new(),
};

if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
Expand Down Expand Up @@ -276,11 +275,7 @@ impl LoroDoc {

let new_txn = self.txn().unwrap();
self.txn.try_lock().unwrap().replace(new_txn);

self.peer_id_change_subs.retain(&(), &mut |callback| {
callback(peer, next_id.counter);
true
});
self.peer_id_change_subs.emit(&(), next_id);
return Ok(());
}

Expand All @@ -297,10 +292,7 @@ impl LoroDoc {
.peer
.store(peer, std::sync::atomic::Ordering::Relaxed);
drop(doc_state);
self.peer_id_change_subs.retain(&(), &mut |callback| {
callback(peer, next_id.counter);
true
});
self.peer_id_change_subs.emit(&(), next_id);
Ok(())
}

Expand Down Expand Up @@ -978,7 +970,7 @@ impl LoroDoc {
}

pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
let (sub, activate) = self.local_update_subs.insert((), callback);
let (sub, activate) = self.local_update_subs.inner().insert((), callback);
activate();
sub
}
Expand Down Expand Up @@ -1504,24 +1496,36 @@ impl LoroDoc {
0
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum ChangeTravelError {
#[error("Target id not found {0:?}")]
TargetIdNotFound(ID),
#[error("History on the target version is trimmed")]
TargetVersionTrimmed,
}

impl LoroDoc {
pub fn travel_change_ancestors(
&self,
id: ID,
ids: &[ID],
f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
) {
) -> Result<(), ChangeTravelError> {
struct PendingNode(ChangeMeta);
impl PartialEq for PendingNode {
fn eq(&self, other: &Self) -> bool {
self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
}
}

impl Eq for PendingNode {}
impl PartialOrd for PendingNode {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for PendingNode {
fn cmp(&self, other: &Self) -> Ordering {
self.0
Expand All @@ -1531,15 +1535,23 @@ impl LoroDoc {
}
}

if !self.oplog().try_lock().unwrap().vv().includes_id(id) {
return;
for id in ids {
let op_log = &self.oplog().try_lock().unwrap();
if !op_log.vv().includes_id(*id) {
return Err(ChangeTravelError::TargetIdNotFound(*id));
}
if op_log.dag.trimmed_vv().includes_id(*id) {
return Err(ChangeTravelError::TargetVersionTrimmed);
}
}

let mut visited = FxHashSet::default();
let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
pending.push(PendingNode(ChangeMeta::from_change(
&self.oplog().try_lock().unwrap().get_change_at(id).unwrap(),
)));
for id in ids {
pending.push(PendingNode(ChangeMeta::from_change(
&self.oplog().try_lock().unwrap().get_change_at(*id).unwrap(),
)));
}
while let Some(PendingNode(node)) = pending.pop() {
let deps = node.deps.clone();
if f(node).is_break() {
Expand All @@ -1558,6 +1570,8 @@ impl LoroDoc {
pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
}
}

Ok(())
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/loro-internal/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@ use crate::{
Subscription,
};
use fxhash::FxHashMap;
use loro_common::{ContainerID, Counter, PeerID};
use loro_common::{ContainerID, ID};
use smallvec::SmallVec;
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
};

/// The callback of the local update.
pub type LocalUpdateCallback = Box<dyn Fn(&[u8]) + Send + Sync + 'static>;
pub type LocalUpdateCallback = Box<dyn Fn(&Vec<u8>) -> bool + Send + Sync + 'static>;
/// The callback of the peer id change. The second argument is the next counter for the peer.
pub type PeerIdUpdateCallback = Box<dyn Fn(PeerID, Counter) + Send + Sync + 'static>;
pub type PeerIdUpdateCallback = Box<dyn Fn(&ID) -> bool + Send + Sync + 'static>;
pub type Subscriber = Arc<dyn (for<'a> Fn(DiffEvent<'a>)) + Send + Sync>;

impl LoroDoc {
/// Subscribe to the changes of the peer id.
pub fn subscribe_peer_id_change(&self, callback: PeerIdUpdateCallback) -> Subscription {
let (s, enable) = self.peer_id_change_subs.insert((), callback);
let (s, enable) = self.peer_id_change_subs.inner().insert((), callback);
enable();
s
}
Expand Down
22 changes: 11 additions & 11 deletions crates/loro-internal/src/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use loro_common::{ContainerType, IdLp, IdSpan, LoroResult};
use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder};
use rle::{HasLength, Mergable, RleVec};
use smallvec::{smallvec, SmallVec};
use tracing::trace;

use crate::{
change::{Change, Lamport, Timestamp},
Expand Down Expand Up @@ -70,24 +69,25 @@ impl crate::LoroDoc {
);

let obs = self.observer.clone();
let local_update_subs = self.local_update_subs.clone();
let local_update_subs_weak = self.local_update_subs.downgrade();
txn.set_on_commit(Box::new(move |state, oplog, id_span| {
trace!("on_commit!");
let mut state = state.try_lock().unwrap();
let events = state.take_events();
drop(state);
for event in events {
trace!("on_commit! {:#?}", &event);
obs.emit(event);
}

if !local_update_subs.is_empty() {
let bytes =
{ export_fast_updates_in_range(&oplog.try_lock().unwrap(), &[id_span]) };
local_update_subs.retain(&(), &mut |callback| {
callback(&bytes);
true
});
if id_span.atom_len() == 0 {
return;
}

if let Some(local_update_subs) = local_update_subs_weak.upgrade() {
if !local_update_subs.inner().is_empty() {
let bytes =
{ export_fast_updates_in_range(&oplog.try_lock().unwrap(), &[id_span]) };
local_update_subs.emit(&(), bytes);
}
}
}));

Expand Down
7 changes: 4 additions & 3 deletions crates/loro-internal/src/undo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,12 +505,13 @@ impl UndoManager {
}
}));

let sub = doc.subscribe_peer_id_change(Box::new(move |peer_id, counter| {
let sub = doc.subscribe_peer_id_change(Box::new(move |id| {
let mut inner = inner_clone2.try_lock().unwrap();
inner.undo_stack.clear();
inner.redo_stack.clear();
inner.next_counter = Some(counter);
peer_clone2.store(peer_id, std::sync::atomic::Ordering::Relaxed);
inner.next_counter = Some(id.counter);
peer_clone2.store(id.peer, std::sync::atomic::Ordering::Relaxed);
true
}));

UndoManager {
Expand Down
35 changes: 27 additions & 8 deletions crates/loro-internal/src/utils/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,12 @@ Apache License
END OF TERMS AND CONDITIONS

*/
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::error::Error;
use std::ptr::eq;
use smallvec::SmallVec;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::sync::{Mutex, Weak};
use std::{fmt::Debug, mem, sync::Arc};

use smallvec::SmallVec;

#[derive(Debug)]
pub enum SubscriptionError {
CannotEmitEventDueToRecursiveCall,
Expand Down Expand Up @@ -474,18 +471,40 @@ pub(crate) struct SubscriberSetWithQueue<EmitterKey, Callback, Payload> {
queue: Arc<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
}

pub(crate) struct WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
subscriber_set: Weak<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
queue: Weak<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
}

impl<EmitterKey, Callback, Payload> WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
pub fn upgrade(self) -> Option<SubscriberSetWithQueue<EmitterKey, Callback, Payload>> {
Some(SubscriberSetWithQueue {
subscriber_set: SubscriberSet(self.subscriber_set.upgrade()?),
queue: self.queue.upgrade()?,
})
}
}

impl<EmitterKey, Callback, Payload> SubscriberSetWithQueue<EmitterKey, Callback, Payload>
where
EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
Callback: 'static + Send + Sync + FnMut(&Payload) -> bool,
Payload: Send + Sync,
Callback: 'static + Send + Sync + for<'a> FnMut(&'a Payload) -> bool,
Payload: Send + Sync + Debug,
{
pub fn new() -> Self {
Self {
subscriber_set: SubscriberSet::new(),
queue: Arc::new(Mutex::new(Default::default())),
}
}

pub fn downgrade(&self) -> WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
WeakSubscriberSetWithQueue {
subscriber_set: Arc::downgrade(&self.subscriber_set.0),
queue: Arc::downgrade(&self.queue),
}
}

pub fn inner(&self) -> &SubscriberSet<EmitterKey, Callback> {
&self.subscriber_set
}
Expand Down
3 changes: 2 additions & 1 deletion crates/loro-wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![allow(non_snake_case)]
#![allow(clippy::empty_docs)]
#![allow(clippy::doc_lazy_continuation)]
#![warn(missing_docs)]
// #![warn(missing_docs)]

use convert::{js_to_version_vector, resolved_diff_to_js};
use js_sys::{Array, Object, Promise, Reflect, Uint8Array};
Expand Down Expand Up @@ -1241,6 +1241,7 @@ impl LoroDoc {
if let Err(e) = observer.call1(&arr.into()) {
console_error!("Error: {:?}", e);
}
true
})));

let closure = Closure::wrap(Box::new(move || {
Expand Down
9 changes: 5 additions & 4 deletions crates/loro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use loro_internal::cursor::PosQueryResult;
use loro_internal::cursor::Side;
use loro_internal::handler::HandlerTrait;
use loro_internal::handler::ValueOrHandler;
use loro_internal::loro::ChangeTravelError;
use loro_internal::undo::{OnPop, OnPush};
pub use loro_internal::version::ImVersionVector;
use loro_internal::DocState;
Expand Down Expand Up @@ -798,14 +799,14 @@ impl LoroDoc {
///
/// # Arguments
///
/// * `id` - The ID of the Change to start the traversal from.
/// * `ids` - The IDs of the Change to start the traversal from.
/// * `f` - A mutable function that is called for each ancestor. It can return `ControlFlow::Break(())` to stop the traversal.
pub fn travel_change_ancestors(
&self,
id: ID,
ids: &[ID],
f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
) {
self.doc.travel_change_ancestors(id, f)
) -> Result<(), ChangeTravelError> {
self.doc.travel_change_ancestors(ids, f)
}
}

Expand Down
Loading
Loading