Skip to content

Commit

Permalink
Driver: Remove RwLock from ThreadPool (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
kane50613 authored Nov 27, 2023
1 parent 1f616da commit 1ec569b
Showing 1 changed file with 9 additions and 16 deletions.
25 changes: 9 additions & 16 deletions src/driver/tasks/mixer/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
Config,
};
use flume::Sender;
use parking_lot::RwLock;
use rusty_pool::ThreadPool;
use std::{result::Result as StdResult, sync::Arc, time::Duration};
use symphonia_core::{
formats::{SeekMode, SeekTo},
Expand All @@ -16,18 +16,14 @@ use tokio::runtime::Handle;

#[derive(Clone)]
pub struct BlockyTaskPool {
pool: Arc<RwLock<rusty_pool::ThreadPool>>,
pool: ThreadPool,
handle: Handle,
}

impl BlockyTaskPool {
pub fn new(handle: Handle) -> Self {
Self {
pool: Arc::new(RwLock::new(rusty_pool::ThreadPool::new(
0,
64,
Duration::from_secs(5),
))),
pool: ThreadPool::new(0, 64, Duration::from_secs(5)),
handle,
}
}
Expand All @@ -52,8 +48,7 @@ impl BlockyTaskPool {
far_pool.send_to_parse(out, lazy, callback, seek_time, config);
});
} else {
let pool = self.pool.read();
pool.execute(move || {
self.pool.execute(move || {
let out = lazy.create();
far_pool.send_to_parse(out, lazy, callback, seek_time, config);
});
Expand Down Expand Up @@ -91,10 +86,9 @@ impl BlockyTaskPool {
seek_time: Option<SeekTo>,
) {
let pool_clone = self.clone();
let pool = self.pool.read();

pool.execute(
move || match input.promote(config.codec_registry, config.format_registry) {
self.pool.execute(move || {
match input.promote(config.codec_registry, config.format_registry) {
Ok(LiveInput::Parsed(parsed)) => match seek_time {
// If seek time is zero, then wipe it out.
// Some formats (MKV) make SeekTo(0) require a backseek to realign with the
Expand All @@ -110,8 +104,8 @@ impl BlockyTaskPool {
Err(e) => {
drop(callback.send(MixerInputResultMessage::ParseErr(e.into())));
},
},
);
}
});
}

pub fn seek(
Expand All @@ -126,9 +120,8 @@ impl BlockyTaskPool {
config: Arc<Config>,
) {
let pool_clone = self.clone();
let pool = self.pool.read();

pool.execute(move || match rec {
self.pool.execute(move || match rec {
Some(rec) if (!input.supports_backseek) && backseek_needed => {
pool_clone.create(callback, Input::Lazy(rec), Some(seek_time), config);
},
Expand Down

0 comments on commit 1ec569b

Please sign in to comment.