Skip to content

Commit

Permalink
cleanup async (openzfs#459)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahrens authored Sep 25, 2021
1 parent 8fdbf69 commit ab26732
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 38 deletions.
20 changes: 10 additions & 10 deletions cmd/zfs_object_agent/zettaobject/src/kernel_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,17 @@ impl KernelConnectionState {
.as_ref()
.ok_or_else(|| anyhow!("no pool open"))?
.clone();
// Need to write_block() before spawning, so that the Pool knows what's been written before resume_complete()
let fut = pool.write_block(block, slice.to_vec());
// XXX copying data
let vec = slice.to_vec();
Ok(Box::pin(async move {
fut.await;
let mut nvl = NvList::new_unique_names();
nvl.insert("Type", "write done").unwrap();
nvl.insert("block", &block.0).unwrap();
nvl.insert("request_id", &request_id).unwrap();
nvl.insert("token", &token).unwrap();
trace!("sending response: {:?}", nvl);
Ok(Some(nvl))
pool.write_block(block, vec).await;
let mut response = NvList::new_unique_names();
response.insert("Type", "write done").unwrap();
response.insert("block", &block.0).unwrap();
response.insert("request_id", &request_id).unwrap();
response.insert("token", &token).unwrap();
trace!("sending response: {:?}", response);
Ok(Some(response))
}))
}

Expand Down
51 changes: 23 additions & 28 deletions cmd/zfs_object_agent/zettaobject/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ impl Pool {
Self::check_pending_flushes(state, syncing_state);
}

pub fn write_block(&self, block: BlockId, data: Vec<u8>) -> impl Future<Output = ()> {
pub async fn write_block(&self, block: BlockId, data: Vec<u8>) {
let data2 = data.clone(); // XXX copying
let receiver = self.state.with_syncing_state(|syncing_state| {
// XXX change to return error
Expand All @@ -1463,39 +1463,34 @@ impl Pool {
receiver
});
let guid = self.state.shared_state.guid;
// XXX Cloning the zettacache has to clone several Arc's; maybe should
// have one that covers all the members? Or find a way to use the
// Pool's zettacache?
let cache = match *WRITES_INGEST_TO_ZETTACACHE {
true => self.state.zettacache.as_ref().cloned(),
true => self.state.zettacache.as_ref(),
false => None,
};
async move {
if let Some(cache) = cache {
match cache.lookup(guid, block).await {
LookupResponse::Present(_) => {
// Surprisingly, the BlockId may be in the cache even
// when writing a "new" block, if the system crashed or
// the pool rewound, causing a BlockId that was already
// persisted to the cache to be reused.
//
// XXX Ideally we would force-evict it and then insert
// again. For now, we ignore the insertion request.
// Subsequent lookups will return the wrong data, and we
// rely on the checksum in the blkptr_t to catch it.
// (Lookups without a preceeding insertion will also
// return the wrong data, so this is no worse.)
trace!(
"writing block already in zettacache: {:?} {:?}",
guid,
block
);
}
LookupResponse::Absent(key) => cache.insert(key, data2).await,
if let Some(cache) = cache {
match cache.lookup(guid, block).await {
LookupResponse::Present(_) => {
// Surprisingly, the BlockId may be in the cache even
// when writing a "new" block, if the system crashed or
// the pool rewound, causing a BlockId that was already
// persisted to the cache to be reused.
//
// XXX Ideally we would force-evict it and then insert
// again. For now, we ignore the insertion request.
// Subsequent lookups will return the wrong data, and we
// rely on the checksum in the blkptr_t to catch it.
// (Lookups without a preceeding insertion will also
// return the wrong data, so this is no worse.)
trace!(
"writing block already in zettacache: {:?} {:?}",
guid,
block
);
}
LookupResponse::Absent(key) => cache.insert(key, data2).await,
}
receiver.await.unwrap();
}
receiver.await.unwrap();
}

async fn read_block_impl(&self, block: BlockId, bypass_cache: bool) -> Vec<u8> {
Expand Down

0 comments on commit ab26732

Please sign in to comment.