Skip to content

Commit

Permalink
Listen for SQLite commit confirmation (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
losfair authored Oct 17, 2022
1 parent c78349e commit 78d5112
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 6 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,11 @@ jobs:
--mountpoint ./fusemnt \
--namespaces "db=test" &
sleep 1
./speedtest1 ./fusemnt/db
set +e
./speedtest1 ./fusemnt/db 2>&1 | tee log
set -e
grep "SQL error: no such table: t1" log
echo "Got the expected error."
build-ycsb:
name: Build ycsb
runs-on: ubuntu-20.04
Expand Down
28 changes: 24 additions & 4 deletions mvfs/src/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl MultiVersionVfs {
fixed_version,
txn: None,
lock: LockKind::None,
commit_confirmed: false,
history: TransitionHistory::default(),
sector_size: self.sector_size,
first_page,
Expand All @@ -136,6 +137,7 @@ pub struct Connection {
fixed_version: Option<String>,
txn: Option<Transaction>,
lock: LockKind,
commit_confirmed: bool,
history: TransitionHistory,
sector_size: usize,

Expand Down Expand Up @@ -368,7 +370,7 @@ impl Connection {
Ok(())
}

async fn finalize_transaction(&mut self) -> bool {
async fn finalize_transaction(&mut self, commit: bool) -> bool {
if self.write_buffer.len() > WRITE_CHUNK_SIZE.load(Ordering::Relaxed) {
self.force_flush_write_buffer().await;
}
Expand All @@ -394,15 +396,25 @@ impl Connection {
.chain(self.write_buffer.keys().copied())
.collect::<HashSet<_>>();

if self.fixed_version.is_some() {
let mut discard_reason: Option<&'static str> = None;

if !commit {
// sqlite didn't confirm to commit
discard_reason = Some("did not receive confirmation from sqlite");
} else if self.fixed_version.is_some() {
// Disallow write to snapshot
discard_reason = Some("write to snapshot is dropped");
}

if let Some(discard_reason) = discard_reason {
for index in &dirty_pages {
self.page_cache.invalidate(index);
}
self.write_buffer.clear();
tracing::warn!(
dirty_page_count = dirty_pages.len(),
"discarding write to snapshot"
reason = discard_reason,
"discarding transaction"
);
return true;
}
Expand Down Expand Up @@ -654,6 +666,7 @@ impl Connection {
self.virtual_version_counter = self.virtual_version_counter.wrapping_add(2);
}
self.lock = lock;
assert!(!self.commit_confirmed);

Ok(true)
}
Expand All @@ -669,7 +682,9 @@ impl Connection {

let reserved_level = LockKind::Reserved.level();
let commit_ok = if prev_lock.level() >= reserved_level && lock.level() < reserved_level {
self.finalize_transaction().await
let commit_confirmed = self.commit_confirmed;
self.commit_confirmed = false;
self.finalize_transaction(commit_confirmed).await
} else {
true
};
Expand All @@ -683,6 +698,11 @@ impl Connection {
Ok(commit_ok)
}

pub fn confirm_commit(&mut self) {
assert!(!self.commit_confirmed);
self.commit_confirmed = true;
}

pub fn reserved(&mut self) -> Result<bool, std::io::Error> {
Ok(false)
}
Expand Down
7 changes: 6 additions & 1 deletion mvsqlite/src/sqlite_vfs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub trait DatabaseHandle: Sync {
self.lock(lock)
}

fn commit_phasetwo(&mut self) {}

/// Check if the database this handle points to holds a [LockKind::Reserved],
/// [LockKind::Pending] or [LockKind::Exclusive] lock.
fn reserved(&mut self) -> Result<bool, std::io::Error>;
Expand Down Expand Up @@ -1392,7 +1394,10 @@ mod io {

// Sent to the VFS after a transaction has been committed immediately but before the
// database is unlocked. Silently ignored.
ffi::SQLITE_FCNTL_COMMIT_PHASETWO => ffi::SQLITE_OK,
ffi::SQLITE_FCNTL_COMMIT_PHASETWO => {
state.file.commit_phasetwo();
ffi::SQLITE_OK
}

// Used for debugging. Swap the file handle with the one pointed to by the pArg
// argument. This capability is used during testing and only needs to be supported when
Expand Down
8 changes: 8 additions & 0 deletions mvsqlite/src/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ impl DatabaseHandle for Connection {
self.io.run(async { self.inner.unlock(lock.into()).await })
}

fn commit_phasetwo(&mut self) {
self.inner.confirm_commit();
}

fn reserved(&mut self) -> Result<bool, std::io::Error> {
Ok(false)
}
Expand Down Expand Up @@ -155,6 +159,10 @@ impl<W: WalIndex + 'static> DatabaseHandle for Box<dyn DatabaseHandle<WalIndex =
(**self).unlock(lock)
}

fn commit_phasetwo(&mut self) {
(**self).commit_phasetwo()
}

fn reserved(&mut self) -> Result<bool, std::io::Error> {
(**self).reserved()
}
Expand Down

0 comments on commit 78d5112

Please sign in to comment.