Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize implementation of image-rs/stream #264

Merged
merged 4 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion image-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ base64 = "0.13.0"
cfg-if = { version = "1.0.0", optional = true }
dircpy = { version = "0.3.12", optional = true }
flate2 = "1.0"
flume = "0.10.14"
fs_extra = { version = "1.2.0", optional = true }
futures = { version = "0.3.28", optional = true }
futures-util = "0.3"
Expand Down
43 changes: 43 additions & 0 deletions image-rs/src/digest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2022 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0

use sha2::Digest;

pub const DIGEST_SHA256_PREFIX: &str = "sha256:";
pub const DIGEST_SHA512_PREFIX: &str = "sha512:";

pub trait DigestHasher {
fn digest_update(&mut self, buf: &[u8]);
fn digest_finalize(self) -> String;
}

#[derive(Clone, Debug)]
pub enum LayerDigestHasher {
Sha256(sha2::Sha256),
Sha512(sha2::Sha512),
}

impl DigestHasher for LayerDigestHasher {
fn digest_update(&mut self, buf: &[u8]) {
match self {
LayerDigestHasher::Sha256(hasher) => {
hasher.update(buf);
}
LayerDigestHasher::Sha512(hasher) => {
hasher.update(buf);
}
}
}

fn digest_finalize(self) -> String {
match self {
LayerDigestHasher::Sha256(hasher) => {
format!("{}{:x}", DIGEST_SHA256_PREFIX, hasher.finalize())
}
LayerDigestHasher::Sha512(hasher) => {
format!("{}{:x}", DIGEST_SHA512_PREFIX, hasher.finalize())
}
}
}
}
1 change: 1 addition & 0 deletions image-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod bundle;
pub mod config;
pub mod decoder;
pub mod decrypt;
pub mod digest;
pub mod image;
pub mod meta_store;
#[cfg(feature = "nydus")]
Expand Down
24 changes: 11 additions & 13 deletions image-rs/src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@

use crate::decoder::Compression;
use crate::decrypt::Decryptor;
use crate::digest::{DIGEST_SHA256_PREFIX, DIGEST_SHA512_PREFIX};
use crate::image::LayerMeta;
use crate::meta_store::MetaStore;
use crate::stream::stream_processing;
use crate::unpack::unpack;

const DIGEST_SHA256: &str = "sha256";
const DIGEST_SHA512: &str = "sha512";

const ERR_NO_DECRYPT_CFG: &str = "decrypt_config is None";
const ERR_BAD_UNCOMPRESSED_DIGEST: &str = "unsupported uncompressed digest format";
const ERR_BAD_COMPRESSED_DIGEST: &str = "unsupported compressed digest format";
Expand Down Expand Up @@ -164,16 +162,16 @@
layer_meta.decoder = Compression::try_from(media_type_str)?;

if layer_meta.decoder == Compression::Uncompressed {
let digest = if diff_id.starts_with(DIGEST_SHA256) {
let digest = if diff_id.starts_with(DIGEST_SHA256_PREFIX) {
format!(
"{}:{:x}",
DIGEST_SHA256,
"{}{:x}",
DIGEST_SHA256_PREFIX,
sha2::Sha256::digest(plaintext_layer.as_slice())
)
} else if diff_id.starts_with(DIGEST_SHA512) {
} else if diff_id.starts_with(DIGEST_SHA512_PREFIX) {
format!(
"{}:{:x}",
DIGEST_SHA512,
"{}{:x}",
DIGEST_SHA512_PREFIX,
sha2::Sha512::digest(plaintext_layer.as_slice())
)
} else {
Expand All @@ -188,12 +186,12 @@
.decoder
.decompress(plaintext_layer.as_slice(), &mut out)?;

if diff_id.starts_with(DIGEST_SHA256) {
if diff_id.starts_with(DIGEST_SHA256_PREFIX) {
layer_meta.uncompressed_digest =
format!("{DIGEST_SHA256}:{:x}", sha2::Sha256::digest(&out));
} else if diff_id.starts_with(DIGEST_SHA512) {
format!("{DIGEST_SHA256_PREFIX}{:x}", sha2::Sha256::digest(&out));
} else if diff_id.starts_with(DIGEST_SHA512_PREFIX) {
layer_meta.uncompressed_digest =
format!("{DIGEST_SHA512}:{:x}", sha2::Sha512::digest(&out));
format!("{DIGEST_SHA512_PREFIX}{:x}", sha2::Sha512::digest(&out));
} else {
bail!("{}: {:?}", ERR_BAD_COMPRESSED_DIGEST, diff_id);
}
Expand Down Expand Up @@ -380,7 +378,7 @@

for image_url in oci_images.iter() {
let tempdir = tempfile::tempdir().unwrap();
let image = Reference::try_from(image_url.clone()).expect("create reference failed");

Check warning on line 381 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (beta)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type

Check warning on line 381 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (beta)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type

Check warning on line 381 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (beta)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type

Check warning on line 381 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (beta)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type

Check warning on line 381 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (nightly)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type
let mut client = PullClient::new(
image,
tempdir.path(),
Expand Down Expand Up @@ -413,7 +411,7 @@

for image_url in oci_images.iter() {
let tempdir = tempfile::tempdir().unwrap();
let image = Reference::try_from(image_url.clone()).expect("create reference failed");

Check warning on line 414 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (beta)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type

Check warning on line 414 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (beta)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type

Check warning on line 414 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (nightly)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type
let mut client = PullClient::new(
image,
tempdir.path(),
Expand Down Expand Up @@ -457,7 +455,7 @@

for image_url in oci_images.iter() {
let tempdir = tempfile::tempdir().unwrap();
let image = Reference::try_from(image_url.clone()).expect("create reference failed");

Check warning on line 458 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (beta)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type

Check warning on line 458 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (beta)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type

Check warning on line 458 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (beta)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type

Check warning on line 458 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (nightly)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type
let mut client = PullClient::new(
image,
tempdir.path(),
Expand Down Expand Up @@ -490,7 +488,7 @@

for image_url in oci_images.iter() {
let tempdir = tempfile::tempdir().unwrap();
let image = Reference::try_from(image_url.clone()).expect("create reference failed");

Check warning on line 491 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (beta)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type

Check warning on line 491 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (beta)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type

Check warning on line 491 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (beta)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type

Check warning on line 491 in image-rs/src/pull.rs

View workflow job for this annotation

GitHub Actions / Check (nightly)

using `.clone()` on a double reference, which returns `&str` instead of cloning the inner type
let mut client = PullClient::new(
image,
tempdir.path(),
Expand Down
92 changes: 26 additions & 66 deletions image-rs/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,30 @@
//
// SPDX-License-Identifier: Apache-2.0

use crate::unpack::unpack;
use anyhow::{anyhow, bail, Context, Result};
use sha2::Digest;
use std::fs;
use std::io::{Cursor, Read};
use std::path::{Path, PathBuf};
use std::sync::mpsc::{channel, Receiver};
use tokio::io::{AsyncRead, AsyncReadExt};

const CAPACITY: usize = 32768;
const DIGEST_SHA256: &str = "sha256";
const DIGEST_SHA512: &str = "sha512";
use crate::digest::{DigestHasher, LayerDigestHasher, DIGEST_SHA256_PREFIX, DIGEST_SHA512_PREFIX};
use crate::unpack::unpack;

const CAPACITY: usize = 32768;
const ERR_BAD_UNCOMPRESSED_DIGEST: &str = "unsupported uncompressed digest format";

pub trait DigestHasher {
fn digest_update(&mut self, buf: &[u8]);
fn digest_finalize(self) -> String;
}

#[derive(Clone, Debug)]
pub enum LayerDigestHasher {
Sha256(sha2::Sha256),
Sha512(sha2::Sha512),
}

impl DigestHasher for LayerDigestHasher {
fn digest_update(&mut self, buf: &[u8]) {
match self {
LayerDigestHasher::Sha256(hasher) => {
hasher.update(buf);
}
LayerDigestHasher::Sha512(hasher) => {
hasher.update(buf);
}
}
}

fn digest_finalize(self) -> String {
match self {
LayerDigestHasher::Sha256(hasher) => {
format!("{}:{:x}", DIGEST_SHA256, hasher.finalize())
}
LayerDigestHasher::Sha512(hasher) => {
format!("{}:{:x}", DIGEST_SHA512, hasher.finalize())
}
}
}
}

// Wrap a flume channel with [`Read`](std::io::Read) support.
// Wrap a channel with [`Read`](std::io::Read) support.
// This can bridge the [`AsyncRead`](tokio::io::AsyncRead) from
// decrypt/decompress and impl Read for unpack.
struct ChannelRead {
rx: flume::Receiver<Vec<u8>>,
rx: Receiver<Vec<u8>>,
current: Cursor<Vec<u8>>,
}

impl ChannelRead {
fn new(rx: flume::Receiver<Vec<u8>>) -> ChannelRead {
fn new(rx: Receiver<Vec<u8>>) -> ChannelRead {
ChannelRead {
rx,
current: Cursor::new(vec![]),
Expand Down Expand Up @@ -93,35 +58,30 @@ pub async fn stream_processing(
destination: &Path,
) -> Result<String> {
let dest = destination.to_path_buf();
let digest_str = if diff_id.starts_with(DIGEST_SHA256) {
let hasher = LayerDigestHasher::Sha256(sha2::Sha256::new());

channel_processing(layer_reader, hasher, dest)
.await
.map_err(|e| anyhow!("hasher {} : {:?}", DIGEST_SHA256, e))?
} else if diff_id.starts_with(DIGEST_SHA512) {
let hasher = LayerDigestHasher::Sha512(sha2::Sha512::new());

channel_processing(layer_reader, hasher, dest)
.await
.map_err(|e| anyhow!("hasher {} : {:?}", DIGEST_SHA512, e))?
let hasher = if diff_id.starts_with(DIGEST_SHA256_PREFIX) {
LayerDigestHasher::Sha256(sha2::Sha256::new())
} else if diff_id.starts_with(DIGEST_SHA512_PREFIX) {
LayerDigestHasher::Sha512(sha2::Sha512::new())
} else {
bail!("{}: {:?}", ERR_BAD_UNCOMPRESSED_DIGEST, diff_id);
};

Ok(digest_str)
channel_processing(layer_reader, hasher, dest)
.await
.map_err(|e| anyhow!("hasher {} {:?}", DIGEST_SHA256_PREFIX, e))
}

async fn channel_processing(
mut layer_reader: (impl AsyncRead + Unpin),
mut hasher: LayerDigestHasher,
destination: PathBuf,
) -> Result<String> {
let (tx, rx) = flume::unbounded();
let (tx, rx) = channel();
let unpack_thread = std::thread::spawn(move || {
let mut input = ChannelRead::new(rx);

if let Err(e) = unpack(&mut input, destination.as_path()) {
// TODO
fs::remove_dir_all(destination.as_path())
.context("Failed to roll back when unpacking")?;
return Err(e);
Expand All @@ -130,8 +90,8 @@ async fn channel_processing(
Result::<()>::Ok(())
});

let mut buffer = [0; CAPACITY];
loop {
let mut buffer = vec![0u8; CAPACITY];
let n = layer_reader
.read(&mut buffer)
.await
Expand All @@ -140,9 +100,9 @@ async fn channel_processing(
break;
}

hasher.digest_update(&buffer[..n]);
tx.send_async(buffer[..n].to_vec())
.await
buffer.resize(n, 0);
hasher.digest_update(&buffer);
tx.send(buffer)
.map_err(|e| anyhow!("channel: send failed {:?}", e))?;
}

Expand Down Expand Up @@ -182,8 +142,8 @@ mod tests {
let layer_data = ar.into_inner().unwrap();

let layer_digest = format!(
"{}:{:x}",
DIGEST_SHA256,
"{}{:x}",
DIGEST_SHA256_PREFIX,
sha2::Sha256::digest(layer_data.as_slice())
);

Expand Down Expand Up @@ -222,8 +182,8 @@ mod tests {
let layer_data = ar.into_inner().unwrap();

let layer_digest = format!(
"{}:{:x}",
DIGEST_SHA256,
"{}{:x}",
DIGEST_SHA256_PREFIX,
sha2::Sha256::digest(layer_data.as_slice())
);

Expand All @@ -238,8 +198,8 @@ mod tests {
let tempdir = tempfile::tempdir().unwrap();
let file_path = tempdir.path().join("layer1");
let layer_digest = format!(
"{}:{:x}",
DIGEST_SHA512,
"{}{:x}",
DIGEST_SHA512_PREFIX,
sha2::Sha512::digest(layer_data.as_slice())
);

Expand Down
Loading