Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service: refine block device implementation #1332

Merged
merged 3 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ tar = "0.4.38"
tokio = { version = "1.24", features = ["macros"] }

# Build static linked openssl library
openssl = { version = "0.10.48", features = ["vendored"] }
openssl = { version = "0.10.55", features = ["vendored"] }
# pin openssl-src to bring in fix for https://rustsec.org/advisories/RUSTSEC-2022-0032
#openssl-src = { version = "111.22" }

Expand Down
6 changes: 3 additions & 3 deletions rafs/src/metadata/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use nydus_utils::digest::RafsDigest;

use crate::metadata::cached_v5::CachedChunkInfoV5;
use crate::metadata::direct_v5::DirectChunkInfoV5;
use crate::metadata::direct_v6::{DirectChunkInfoV6, PlainChunkInfoV6};
use crate::metadata::direct_v6::{DirectChunkInfoV6, TarfsChunkInfoV6};
use crate::metadata::layout::v5::RafsV5ChunkInfo;
use crate::metadata::{RafsStore, RafsVersion};
use crate::RafsIoWrite;
Expand Down Expand Up @@ -352,7 +352,7 @@ impl ChunkWrapper {
*self = Self::V6(to_rafs_v5_chunk_info(cki_v6));
} else if let Some(cki_v6) = cki.as_any().downcast_ref::<DirectChunkInfoV6>() {
*self = Self::V6(to_rafs_v5_chunk_info(cki_v6));
} else if let Some(cki_v6) = cki.as_any().downcast_ref::<PlainChunkInfoV6>() {
} else if let Some(cki_v6) = cki.as_any().downcast_ref::<TarfsChunkInfoV6>() {
*self = Self::V6(to_rafs_v5_chunk_info(cki_v6));
} else if let Some(cki_v5) = cki.as_any().downcast_ref::<CachedChunkInfoV5>() {
*self = Self::V5(to_rafs_v5_chunk_info(cki_v5));
Expand All @@ -370,7 +370,7 @@ fn as_blob_v5_chunk_info(cki: &dyn BlobChunkInfo) -> &dyn BlobV5ChunkInfo {
cki_v6
} else if let Some(cki_v6) = cki.as_any().downcast_ref::<DirectChunkInfoV6>() {
cki_v6
} else if let Some(cki_v6) = cki.as_any().downcast_ref::<PlainChunkInfoV6>() {
} else if let Some(cki_v6) = cki.as_any().downcast_ref::<TarfsChunkInfoV6>() {
cki_v6
} else if let Some(cki_v5) = cki.as_any().downcast_ref::<CachedChunkInfoV5>() {
cki_v5
Expand Down
32 changes: 18 additions & 14 deletions rafs/src/metadata/direct_v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,6 @@ impl OndiskInodeWrapper {
is_tail: bool,
) -> Option<BlobIoDesc> {
let blob_index = chunk_addr.blob_index();
let chunk_index = chunk_addr.blob_ci_index();

match state.blob_table.get(blob_index) {
Err(e) => {
Expand All @@ -550,13 +549,12 @@ impl OndiskInodeWrapper {
}
Ok(blob) => {
if is_tarfs_mode {
let offset = (chunk_addr.block_addr() as u64) << EROFS_BLOCK_BITS_9;
let size = if is_tail {
(self.size() % self.chunk_size() as u64) as u32
} else {
self.chunk_size()
};
let chunk = PlainChunkInfoV6::new(blob_index, chunk_index, offset, size);
let chunk = TarfsChunkInfoV6::from_chunk_addr(chunk_addr, size);
let chunk = Arc::new(chunk) as Arc<dyn BlobChunkInfo>;
Some(BlobIoDesc::new(
blob,
Expand All @@ -566,6 +564,7 @@ impl OndiskInodeWrapper {
user_io,
))
} else {
let chunk_index = chunk_addr.blob_ci_index();
device
.create_io_chunk(blob.blob_index(), chunk_index)
.map(|v| BlobIoDesc::new(blob, v, content_offset, content_len, user_io))
Expand Down Expand Up @@ -1322,16 +1321,14 @@ impl RafsInodeExt for OndiskInodeWrapper {
))
})
} else if state.is_tarfs() {
let blob_index = chunk_addr.blob_index();
let chunk_index = chunk_addr.blob_ci_index();
let offset = (chunk_addr.block_addr() as u64) << EROFS_BLOCK_BITS_9;
let size = if idx == self.get_chunk_count() - 1 {
(self.size() % self.chunk_size() as u64) as u32
} else {
self.chunk_size()
};
let chunk = PlainChunkInfoV6::new(blob_index, chunk_index, offset, size);
Ok(Arc::new(chunk))
Ok(Arc::new(TarfsChunkInfoV6::from_chunk_addr(
chunk_addr, size,
)))
} else {
let mut chunk_map = self.mapping.info.chunk_map.lock().unwrap();
if chunk_map.is_none() {
Expand Down Expand Up @@ -1453,28 +1450,35 @@ impl BlobV5ChunkInfo for DirectChunkInfoV6 {
}

/// Rafs v6 fake ChunkInfo for Tarfs.
pub(crate) struct PlainChunkInfoV6 {
pub(crate) struct TarfsChunkInfoV6 {
blob_index: u32,
chunk_index: u32,
offset: u64,
size: u32,
}

impl PlainChunkInfoV6 {
/// Create a new instance of [PlainChunkInfoV6].
impl TarfsChunkInfoV6 {
/// Create a new instance of [TarfsChunkInfoV6].
pub fn new(blob_index: u32, chunk_index: u32, offset: u64, size: u32) -> Self {
PlainChunkInfoV6 {
TarfsChunkInfoV6 {
blob_index,
chunk_index,
offset,
size,
}
}

fn from_chunk_addr(chunk_addr: &RafsV6InodeChunkAddr, size: u32) -> Self {
let blob_index = chunk_addr.blob_index();
let chunk_index = chunk_addr.blob_ci_index();
let offset = (chunk_addr.block_addr() as u64) << EROFS_BLOCK_BITS_9;
TarfsChunkInfoV6::new(blob_index, chunk_index, offset, size)
}
}

const TARFS_DIGEST: RafsDigest = RafsDigest { data: [0u8; 32] };

impl BlobChunkInfo for PlainChunkInfoV6 {
impl BlobChunkInfo for TarfsChunkInfoV6 {
fn chunk_id(&self) -> &RafsDigest {
&TARFS_DIGEST
}
Expand Down Expand Up @@ -1516,7 +1520,7 @@ impl BlobChunkInfo for PlainChunkInfoV6 {
}
}

impl BlobV5ChunkInfo for PlainChunkInfoV6 {
impl BlobV5ChunkInfo for TarfsChunkInfoV6 {
fn index(&self) -> u32 {
self.chunk_index
}
Expand Down
13 changes: 7 additions & 6 deletions rafs/src/metadata/layout/v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,18 @@ impl RafsV6SuperBlock {
meta_size
)));
}
if meta_size & (EROFS_BLOCK_SIZE_4096 - 1) != 0 {
let block_size = if self.s_blkszbits == EROFS_BLOCK_BITS_9 {
EROFS_BLOCK_SIZE_512
} else {
EROFS_BLOCK_SIZE_4096
};
if meta_size & (block_size - 1) != 0 {
return Err(einval!(format!(
"invalid Rafs v6 metadata size: bootstrap size {} is not aligned",
meta_size
)));
}
let meta_addr = if self.s_blkszbits == EROFS_BLOCK_BITS_9 {
u32::from_le(self.s_meta_blkaddr) as u64 * EROFS_BLOCK_SIZE_512
} else {
u32::from_le(self.s_meta_blkaddr) as u64 * EROFS_BLOCK_SIZE_4096
};
let meta_addr = u32::from_le(self.s_meta_blkaddr) as u64 * block_size;
if meta_addr > meta_size {
return Err(einval!(format!(
"invalid Rafs v6 meta block address 0x{:x}, meta file size 0x{:x}",
Expand Down
6 changes: 2 additions & 4 deletions service/src/blob_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,7 @@ struct BlobCacheState {

impl BlobCacheState {
fn new() -> Self {
Self {
id_to_config_map: HashMap::new(),
}
Self::default()
}

fn try_add(&mut self, config: BlobConfig) -> Result<()> {
Expand Down Expand Up @@ -322,7 +320,7 @@ impl BlobCacheMgr {
let config = entry
.blob_config
.as_ref()
.ok_or_else(|| einval!("missing blob cache configuration information"))?;
.ok_or_else(|| einval!("blob_cache: missing blob cache configuration information"))?;

if entry.blob_id.contains(ID_SPLITTER) {
return Err(einval!("blob_cache: `blob_id` for meta blob is invalid"));
Expand Down
63 changes: 33 additions & 30 deletions service/src/block_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,23 @@ pub struct BlockDevice {

impl BlockDevice {
/// Create a new instance of [BlockDevice].
pub fn new(blob_id: String, cache_mgr: Arc<BlobCacheMgr>) -> Result<Self> {
pub fn new(blob_entry: BlobCacheEntry) -> Result<Self> {
let cache_mgr = Arc::new(BlobCacheMgr::new());
cache_mgr.add_blob_entry(&blob_entry).map_err(|e| {
eother!(format!(
"block_device: failed to add blob into CacheMgr, {}",
e
))
})?;
let blob_id = generate_blob_key(&blob_entry.domain_id, &blob_entry.blob_id);

BlockDevice::new_with_cache_manager(blob_id, cache_mgr)
}

/// Create a new instance of [BlockDevice] with provided blob cache manager.
pub fn new_with_cache_manager(blob_id: String, cache_mgr: Arc<BlobCacheMgr>) -> Result<Self> {
let mut ranges = IntervalTree::new();
ranges.insert(Range::new(0, u32::MAX), None);
ranges.insert(Range::new(0, u32::MAX - 1), None);

let meta_blob_config = match cache_mgr.get_config(&blob_id) {
None => {
Expand Down Expand Up @@ -288,31 +302,19 @@ impl BlockDevice {
pub fn export(
blob_entry: BlobCacheEntry,
output: Option<String>,
localfs_dir: Option<String>,
data_dir: Option<String>,
threads: u32,
verity: bool,
) -> Result<()> {
let cache_mgr = Arc::new(BlobCacheMgr::new());
cache_mgr.add_blob_entry(&blob_entry).map_err(|e| {
eother!(format!(
"block_device: failed to add blob into CacheMgr, {}",
e
))
})?;
let blob_id = generate_blob_key(&blob_entry.domain_id, &blob_entry.blob_id);
let block_device = BlockDevice::new(blob_id.clone(), cache_mgr.clone()).map_err(|e| {
eother!(format!(
"block_device: failed to create block device object, {}",
e
))
})?;
let block_device = BlockDevice::new(blob_entry)?;
let block_device = Arc::new(block_device);
let blocks = block_device.blocks();
let blob_id = block_device.meta_blob_id();

let path = match output {
Some(v) => PathBuf::from(v),
None => {
let path = match cache_mgr.get_config(&blob_id) {
let path = match block_device.cache_mgr.get_config(&blob_id) {
Some(BlobConfig::MetaBlob(meta)) => meta.path().to_path_buf(),
_ => return Err(enoent!("block_device: failed to get meta blob")),
};
Expand All @@ -337,15 +339,15 @@ impl BlockDevice {
path.display()
))
})?;
let dir = localfs_dir
.ok_or_else(|| einval!("block_device: parameter `localfs_dir` is missing"))?;
let dir = data_dir
.ok_or_else(|| einval!("block_device: parameter `data_dir` is missing"))?;
let path = PathBuf::from(dir);
path.join(name.to_string() + ".disk")
}
};

let output_file = OpenOptions::new()
.create_new(true)
.create(true)
.read(true)
.write(true)
.open(&path)
Expand Down Expand Up @@ -401,8 +403,8 @@ impl BlockDevice {

for _i in 0..threads {
let count = min(blocks - pos, step);
let mgr = cache_mgr.clone();
let id = blob_id.clone();
let mgr = block_device.cache_mgr.clone();
let id = blob_id.to_string();
let path = path.to_path_buf();
let generator = generator.clone();

Expand All @@ -419,12 +421,13 @@ impl BlockDevice {
))
})?;
let file = Arc::new(tokio_uring::fs::File::from_std(output_file));
let block_device = BlockDevice::new(id, mgr).map_err(|e| {
eother!(format!(
"block_device: failed to create block device object, {}",
e
))
})?;
let block_device =
BlockDevice::new_with_cache_manager(id, mgr).map_err(|e| {
eother!(format!(
"block_device: failed to create block device object, {}",
e
))
})?;
let device = Arc::new(block_device);

tokio_uring::start(async move {
Expand Down Expand Up @@ -594,7 +597,7 @@ mod tests {
assert!(mgr.get_config(&key).is_some());

let mgr = Arc::new(mgr);
let device = BlockDevice::new(blob_id, mgr).unwrap();
let device = BlockDevice::new_with_cache_manager(blob_id, mgr).unwrap();
assert_eq!(device.blocks(), 0x209);

tokio_uring::start(async move {
Expand Down
26 changes: 14 additions & 12 deletions service/src/block_nbd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,18 @@ pub struct NbdWorker {
impl NbdWorker {
/// Run the event loop to handle NBD requests from kernel in asynchronous mode.
pub async fn run(self) {
let device = match BlockDevice::new(self.blob_id.clone(), self.cache_mgr.clone()) {
Ok(v) => v,
Err(e) => {
error!(
"block_nbd: failed to create block device for {}, {}",
self.blob_id, e
);
return;
}
};
let device =
match BlockDevice::new_with_cache_manager(self.blob_id.clone(), self.cache_mgr.clone())
{
Ok(v) => v,
Err(e) => {
error!(
"block_nbd: failed to create block device for {}, {}",
self.blob_id, e
);
return;
}
};

// Safe because the RawFd is valid during the lifetime of run().
let mut sock = unsafe { UnixStream::from_raw_fd(self.sock_user.as_raw_fd()) };
Expand Down Expand Up @@ -300,7 +302,7 @@ impl NbdDaemon {
let blob_id = generate_blob_key(&blob_entry.domain_id, &blob_entry.blob_id);
let cache_mgr = Arc::new(BlobCacheMgr::new());
cache_mgr.add_blob_entry(&blob_entry)?;
let block_device = BlockDevice::new(blob_id.clone(), cache_mgr.clone())?;
let block_device = BlockDevice::new_with_cache_manager(blob_id.clone(), cache_mgr.clone())?;
let nbd_service = NbdService::new(Arc::new(block_device), nbd_path)?;

Ok(NbdDaemon {
Expand Down Expand Up @@ -591,7 +593,7 @@ mod tests {
assert!(mgr.get_config(&key).is_some());

let mgr = Arc::new(mgr);
let device = BlockDevice::new(blob_id.clone(), mgr).unwrap();
let device = BlockDevice::new_with_cache_manager(blob_id.clone(), mgr).unwrap();

Ok(Arc::new(device))
}
Expand Down