Skip to content

Commit

Permalink
Fix lints
Browse files Browse the repository at this point in the history
  • Loading branch information
rbtcollins committed Apr 4, 2021
1 parent 056905a commit 5b92b3a
Show file tree
Hide file tree
Showing 16 changed files with 76 additions and 74 deletions.
1 change: 1 addition & 0 deletions src/cli/self_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ pub fn install(

fn rustc_or_cargo_exists_in_path() -> Result<()> {
// Ignore rustc and cargo if present in $HOME/.cargo/bin or a few other directories
#[allow(clippy::ptr_arg)]
fn ignore_paths(path: &PathBuf) -> bool {
!path
.components()
Expand Down
1 change: 1 addition & 0 deletions src/cli/self_update/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ fn _apply_new_path(new_path: Option<Vec<u16>>) -> Result<()> {
}

// Tell other processes to update their environment
#[allow(clippy::unnecessary_cast)]
unsafe {
SendMessageTimeoutA(
HWND_BROADCAST,
Expand Down
2 changes: 1 addition & 1 deletion src/cli/topical_doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn index_html(doc: &DocData<'_>, wpath: &Path) -> Option<PathBuf> {
}
}

fn dir_into_vec(dir: &PathBuf) -> Result<Vec<OsString>> {
fn dir_into_vec(dir: &Path) -> Result<Vec<OsString>> {
let entries = fs::read_dir(dir).chain_err(|| format!("Opening directory {:?}", dir))?;
let mut v = Vec::new();
for entry in entries {
Expand Down
7 changes: 3 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ impl<'a> OverrideCfg<'a> {
|| file.toolchain.components.is_some()
|| file.toolchain.profile.is_some()
{
return Err(ErrorKind::CannotSpecifyPathAndOptions(path.into()).into());
return Err(ErrorKind::CannotSpecifyPathAndOptions(path).into());
}
Some(Toolchain::from_path(cfg, cfg_path, &path)?)
}
(Some(channel), Some(path)) => {
return Err(ErrorKind::CannotSpecifyChannelAndPath(channel, path.into()).into())
return Err(ErrorKind::CannotSpecifyChannelAndPath(channel, path).into())
}
(None, None) => None,
},
Expand Down Expand Up @@ -172,8 +172,7 @@ impl PgpPublicKey {
Ok(ret)
}
use pgp::types::KeyTrait;
let mut ret = Vec::new();
ret.push(format!("from {}", self));
let mut ret = vec![format!("from {}", self)];
let key = self.key();
let keyid = format_hex(&key.key_id().to_vec(), "-", 4)?;
let algo = key.algorithm();
Expand Down
26 changes: 11 additions & 15 deletions src/diskio/immediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
time::Instant,
};

use super::{CompletedIO, Executor, Item};
use super::{CompletedIo, Executor, Item};

#[derive(Debug)]
pub struct _IncrementalFileState {
Expand All @@ -35,7 +35,7 @@ impl ImmediateUnpacker {
}
}

fn deque(&self) -> Box<dyn Iterator<Item = CompletedIO>> {
fn deque(&self) -> Box<dyn Iterator<Item = CompletedIo>> {
let mut guard = self.incremental_state.lock().unwrap();
// incremental file in progress
if let Some(ref mut state) = *guard {
Expand All @@ -52,16 +52,12 @@ impl ImmediateUnpacker {
if state.finished {
*guard = None;
}
Box::new(Some(CompletedIO::Item(item)).into_iter())
Box::new(Some(CompletedIo::Item(item)).into_iter())
} else {
// Case 2: pending chunks (which might be empty)
let mut completed_chunks = vec![];
completed_chunks.append(&mut state.completed_chunks);
Box::new(
completed_chunks
.into_iter()
.map(|size| CompletedIO::Chunk(size)),
)
Box::new(completed_chunks.into_iter().map(CompletedIo::Chunk))
}
} else {
Box::new(None.into_iter())
Expand All @@ -70,7 +66,7 @@ impl ImmediateUnpacker {
}

impl Executor for ImmediateUnpacker {
fn dispatch(&self, mut item: Item) -> Box<dyn Iterator<Item = CompletedIO> + '_> {
fn dispatch(&self, mut item: Item) -> Box<dyn Iterator<Item = CompletedIo> + '_> {
item.result = match &mut item.kind {
super::Kind::Directory => super::create_dir(&item.full_path),
super::Kind::File(ref contents) => {
Expand All @@ -89,7 +85,7 @@ impl Executor for ImmediateUnpacker {
.start
.map(|s| Instant::now().saturating_duration_since(s));
*guard = None;
Box::new(Some(CompletedIO::Item(item)).into_iter())
Box::new(Some(CompletedIo::Item(item)).into_iter())
} else {
state.item = Some(item);
Box::new(None.into_iter())
Expand All @@ -103,20 +99,20 @@ impl Executor for ImmediateUnpacker {
item.finish = item
.start
.map(|s| Instant::now().saturating_duration_since(s));
Box::new(Some(CompletedIO::Item(item)).into_iter())
Box::new(Some(CompletedIo::Item(item)).into_iter())
}

fn join(&mut self) -> Box<dyn Iterator<Item = CompletedIO>> {
fn join(&mut self) -> Box<dyn Iterator<Item = CompletedIo>> {
self.deque()
}

fn completed(&self) -> Box<dyn Iterator<Item = CompletedIO>> {
fn completed(&self) -> Box<dyn Iterator<Item = CompletedIo>> {
self.deque()
}

fn incremental_file_state(&self) -> super::IncrementalFileState {
let mut state = self.incremental_state.lock().unwrap();
if let Some(_) = *state {
if state.is_some() {
unreachable!();
} else {
*state = Some(_IncrementalFileState {
Expand Down Expand Up @@ -188,7 +184,7 @@ impl IncrementalFileWriter {
if let Some(ref mut state) = *state {
if let Some(ref mut file) = (&mut self.file).as_mut() {
// Length 0 vector is used for clean EOF signalling.
if chunk.len() == 0 {
if chunk.is_empty() {
trace_scoped!("close", "name:": self.path_display);
drop(std::mem::take(&mut self.file));
state.finished = true;
Expand Down
16 changes: 8 additions & 8 deletions src/diskio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub struct Item {
}

#[derive(Debug)]
pub enum CompletedIO {
pub enum CompletedIo {
/// A submitted Item has completed
Item(Item),
/// An IncrementalFile has completed a single chunk
Expand Down Expand Up @@ -211,14 +211,14 @@ impl IncrementalFileState {
mode: u32,
) -> Result<(Box<dyn FnMut(Vec<u8>) -> bool>, IncrementalFile)> {
use std::sync::mpsc::channel;
match self {
&IncrementalFileState::Threaded => {
match *self {
IncrementalFileState::Threaded => {
let (tx, rx) = channel::<Vec<u8>>();
let content_callback = IncrementalFile::ThreadedReceiver(rx);
let chunk_submit = move |chunk: Vec<u8>| tx.send(chunk).is_ok();
Ok((Box::new(chunk_submit), content_callback))
}
&IncrementalFileState::Immediate(ref state) => {
IncrementalFileState::Immediate(ref state) => {
let content_callback = IncrementalFile::ImmediateReceiver;
let mut writer = immediate::IncrementalFileWriter::new(path, mode, state.clone())?;
let chunk_submit = move |chunk: Vec<u8>| writer.chunk_submit(chunk);
Expand All @@ -236,24 +236,24 @@ pub trait Executor {
/// During overload situations previously queued items may
/// need to be completed before the item is accepted:
/// consume the returned iterator.
fn execute(&self, mut item: Item) -> Box<dyn Iterator<Item = CompletedIO> + '_> {
fn execute(&self, mut item: Item) -> Box<dyn Iterator<Item = CompletedIo> + '_> {
item.start = Some(Instant::now());
self.dispatch(item)
}

/// Actually dispatch a operation.
/// This is called by the default execute() implementation and
/// should not be called directly.
fn dispatch(&self, item: Item) -> Box<dyn Iterator<Item = CompletedIO> + '_>;
fn dispatch(&self, item: Item) -> Box<dyn Iterator<Item = CompletedIo> + '_>;

/// Wrap up any pending operations and iterate over them.
/// All operations submitted before the join will have been
/// returned either through ready/complete or join once join
/// returns.
fn join(&mut self) -> Box<dyn Iterator<Item = CompletedIO> + '_>;
fn join(&mut self) -> Box<dyn Iterator<Item = CompletedIo> + '_>;

/// Iterate over completed items.
fn completed(&self) -> Box<dyn Iterator<Item = CompletedIO> + '_>;
fn completed(&self) -> Box<dyn Iterator<Item = CompletedIo> + '_>;

/// Get any state needed for incremental file processing
fn incremental_file_state(&self) -> IncrementalFileState;
Expand Down
8 changes: 4 additions & 4 deletions src/diskio/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ fn test_incremental_file(io_threads: &str) -> Result<()> {
loop {
for work in io_executor.completed().collect::<Vec<_>>() {
match work {
super::CompletedIO::Chunk(size) => written += size,
super::CompletedIO::Item(item) => unreachable!(format!("{:?}", item)),
super::CompletedIo::Chunk(size) => written += size,
super::CompletedIo::Item(item) => unreachable!(format!("{:?}", item)),
}
}
if written == 20 {
Expand All @@ -48,8 +48,8 @@ fn test_incremental_file(io_threads: &str) -> Result<()> {
loop {
for work in io_executor.completed().collect::<Vec<_>>() {
match work {
super::CompletedIO::Chunk(_) => unreachable!(),
super::CompletedIO::Item(_) => {
super::CompletedIo::Chunk(_) => unreachable!(),
super::CompletedIo::Item(_) => {
file_finished = true;
}
}
Expand Down
22 changes: 11 additions & 11 deletions src/diskio/threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;

use super::{perform, CompletedIO, Executor, Item};
use super::{perform, CompletedIo, Executor, Item};
use crate::utils::notifications::Notification;
use crate::utils::units::Unit;

enum Task {
Request(CompletedIO),
Request(CompletedIo),
// Used to synchronise in the join method.
Sentinel,
}
Expand Down Expand Up @@ -60,19 +60,19 @@ impl<'a> Threaded<'a> {
let n_files = self.n_files.clone();
self.pool.execute(move || {
let chunk_complete_callback = |size| {
tx.send(Task::Request(CompletedIO::Chunk(size)))
tx.send(Task::Request(CompletedIo::Chunk(size)))
.expect("receiver should be listening")
};
perform(&mut item, chunk_complete_callback);
n_files.fetch_sub(1, Ordering::Relaxed);
tx.send(Task::Request(CompletedIO::Item(item)))
tx.send(Task::Request(CompletedIo::Item(item)))
.expect("receiver should be listening");
});
}
}

impl<'a> Executor for Threaded<'a> {
fn dispatch(&self, item: Item) -> Box<dyn Iterator<Item = CompletedIO> + '_> {
fn dispatch(&self, item: Item) -> Box<dyn Iterator<Item = CompletedIo> + '_> {
// Yield any completed work before accepting new work - keep memory
// pressure under control
// - return an iterator that runs until we can submit and then submits
Expand All @@ -83,7 +83,7 @@ impl<'a> Executor for Threaded<'a> {
})
}

fn join(&mut self) -> Box<dyn Iterator<Item = CompletedIO> + '_> {
fn join(&mut self) -> Box<dyn Iterator<Item = CompletedIo> + '_> {
// Some explanation is in order. Even though the tar we are reading from (if
// any) will have had its FileWithProgress download tracking
// completed before we hit drop, that is not true if we are unwinding due to a
Expand Down Expand Up @@ -149,7 +149,7 @@ impl<'a> Executor for Threaded<'a> {
})
}

fn completed(&self) -> Box<dyn Iterator<Item = CompletedIO> + '_> {
fn completed(&self) -> Box<dyn Iterator<Item = CompletedIo> + '_> {
Box::new(JoinIterator {
iter: self.rx.try_iter(),
consume_sentinel: true,
Expand All @@ -174,9 +174,9 @@ struct JoinIterator<T: Iterator<Item = Task>> {
}

impl<T: Iterator<Item = Task>> Iterator for JoinIterator<T> {
type Item = CompletedIO;
type Item = CompletedIo;

fn next(&mut self) -> Option<CompletedIO> {
fn next(&mut self) -> Option<CompletedIo> {
let task_o = self.iter.next();
match task_o {
None => None,
Expand All @@ -200,9 +200,9 @@ struct SubmitIterator<'a, 'b> {
}

impl<'a, 'b> Iterator for SubmitIterator<'a, 'b> {
type Item = CompletedIO;
type Item = CompletedIo;

fn next(&mut self) -> Option<CompletedIO> {
fn next(&mut self) -> Option<CompletedIo> {
// The number here is arbitrary; just a number to stop exhausting fd's on linux
// and still allow rapid decompression to generate work to dispatch
// This function could perhaps be tuned: e.g. it may wait in rx.iter()
Expand Down
35 changes: 19 additions & 16 deletions src/dist/component/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::path::{Path, PathBuf};

use tar::EntryType;

use crate::diskio::{get_executor, CompletedIO, Executor, Item, Kind};
use crate::diskio::{get_executor, CompletedIo, Executor, Item, Kind};
use crate::dist::component::components::*;
use crate::dist::component::transaction::*;
use crate::dist::temp;
Expand Down Expand Up @@ -205,14 +205,14 @@ impl MemoryBudget {
used: 0,
}
}
fn reclaim(&mut self, op: &CompletedIO) {
fn reclaim(&mut self, op: &CompletedIo) {
match &op {
CompletedIO::Item(op) => match &op.kind {
CompletedIo::Item(op) => match &op.kind {
Kind::Directory => {}
Kind::File(content) => self.used -= content.len(),
Kind::IncrementalFile(_) => {}
},
CompletedIO::Chunk(size) => self.used -= size,
CompletedIo::Chunk(size) => self.used -= size,
}
}

Expand All @@ -231,8 +231,8 @@ impl MemoryBudget {

/// Handle the async result of io operations
/// Replaces op.result with Ok(())
fn filter_result(op: &mut CompletedIO) -> io::Result<()> {
if let CompletedIO::Item(op) = op {
fn filter_result(op: &mut CompletedIo) -> io::Result<()> {
if let CompletedIo::Item(op) = op {
let result = mem::replace(&mut op.result, Ok(()));
match result {
Ok(_) => Ok(()),
Expand Down Expand Up @@ -265,10 +265,10 @@ fn trigger_children(
io_executor: &dyn Executor,
directories: &mut HashMap<PathBuf, DirStatus>,
budget: &mut MemoryBudget,
op: CompletedIO,
op: CompletedIo,
) -> Result<usize> {
let mut result = 0;
if let CompletedIO::Item(item) = op {
if let CompletedIo::Item(item) = op {
if let Kind::Directory = item.kind {
let mut pending = Vec::new();
directories
Expand Down Expand Up @@ -506,22 +506,25 @@ fn unpack_without_first_dir<'a, R: Read>(
budget.reclaim(&item);
filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?;
trigger_children(&*io_executor, &mut directories, &mut budget, item)?;
let mut incremental_finished = false;
if let Some(sender) = incremental_file_sender.as_mut() {
if budget.available() as u64 >= IO_CHUNK_SIZE {
let mut v = Vec::with_capacity(IO_CHUNK_SIZE as usize);
entry.read(&mut v)?;
if entry.read(&mut v)? == 0 {
incremental_finished = true;
}
if !sender(v) {
return Err(ErrorKind::DisconnectedChannel(full_path.clone()).into());
return Err(ErrorKind::DisconnectedChannel(full_path).into());
}
}
}
if incremental_finished {
incremental_file_sender.take();
}
}
let mut incremental_file_sender =
if let Some(incremental_file_sender) = incremental_file_sender {
Some((incremental_file_sender, &mut entry))
} else {
None
};

let mut incremental_file_sender = incremental_file_sender
.map(|incremental_file_sender| (incremental_file_sender, &mut entry));

// monitor io queue and feed in the content of the file (if needed)
while !flush_ios(
Expand Down
Loading

0 comments on commit 5b92b3a

Please sign in to comment.