-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DedicatedExecutor
to FlightSQL Server
#247
Add DedicatedExecutor
to FlightSQL Server
#247
Conversation
src/execution/dedicated_executor.rs
Outdated
// also register the IO runtime for the current thread, since it might be used as well (esp. for the | ||
// current thread RT) | ||
register_io_runtime(io_handle.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me what this means in practice. Maybe will become more obvious once I start plugging this in to the rest of the app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It basically means that the IO (e.g. for object store) will be done on the "current" tokio run time(aka the implicit one that is created by #[tokio::main]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that part i was clear on - i meant more under what circumstances its used in actual app code. no real concern right now - just memorializing my thoughts as i work on this.
src/execution/dedicated_executor.rs
Outdated
let runtime = runtime_builder | ||
.on_thread_start(move || register_io_runtime(io_handle.clone())) | ||
.build() | ||
.expect("Creating tokio runtime"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to allow configuring the number of threads - but i guess that could be done by the caller since this takes tokio::runtime::Builder
src/execution/dedicated_executor.rs
Outdated
if tx_handle.send(Handle::current()).is_err() { | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we want a log or something here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think typically when a send handle (tx) fails to send it means the receiving side has hung up (no one is there to get the message), which can happen during normal shutdown
src/execution/mod.rs
Outdated
@@ -15,6 +15,7 @@ | |||
// specific language governing permissions and limitations | |||
// under the License. | |||
|
|||
pub mod dedicated_executor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will probably end up including this in flightsql
feature
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool @matthewmturner -- thank you
cc @crepererum and @tustvold
src/execution/dedicated_executor.rs
Outdated
// also register the IO runtime for the current thread, since it might be used as well (esp. for the | ||
// current thread RT) | ||
register_io_runtime(io_handle.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It basically means that the IO (e.g. for object store) will be done on the "current" tokio run time(aka the implicit one that is created by #[tokio::main]
src/execution/dedicated_executor.rs
Outdated
if tx_handle.send(Handle::current()).is_err() { | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think typically when a send handle (tx) fails to send it means the receiving side has hung up (no one is there to get the message), which can happen during normal shutdown
src/execution/dedicated_executor.rs
Outdated
/// The state is only used by the "outer" API, not by the newly created runtime. The new runtime waits for | ||
/// [`start_shutdown`](Self::start_shutdown) and signals the completion via | ||
/// [`completed_shutdown`](Self::completed_shutdown) (for which is owns the sender side). | ||
struct State { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might be able to use a tokio JoinSet https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html instead of this now (I think this code predates JoinSet
)
src/execution/dedicated_executor.rs
Outdated
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use std::{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great -- thank you @matthewmturner
Once we get this sorted out I definitely think we should contemplate merging it back upstream in DataFusion (with documentation). I can totally help with that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good
BTW I wrote up a bunch of backstory about why a separate executor is needed in apache/datafusion#13423 I hope to get an example up soon (that will show why this DedicatedExecutor is much nicer) |
@alamb thanks for that - it looks great. Im just plugging along getting this integrated into the flightsql server here. My first objective is to get all the pipes working and all the CPU bound work being executed by the current implementation of dedicated executor. After that I will probably look into updating the implementation to use the |
DedicatedExecutor
to FlightSQL Server
pub async fn statement_to_logical_plan(&self, statement: Statement) -> Result<LogicalPlan> { | ||
let ctx = self.session_ctx.clone(); | ||
let task = async move { ctx.state().statement_to_plan(statement).await }; | ||
if let Some(executor) = &self.executor { | ||
let job = executor.spawn(task).map_err(|e| eyre::eyre!(e)); | ||
let job_res = job.await?; | ||
job_res.map_err(|e| eyre!(e)) | ||
} else { | ||
task.await.map_err(|e| eyre!(e)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using dedicated executor here for logical planning
/// Executes the provided `LogicalPlan` returning a `SendableRecordBatchStream`. Uses the [`DedicatedExecutor`] if it is available. | ||
pub async fn execute_logical_plan( | ||
&self, | ||
logical_plan: LogicalPlan, | ||
) -> Result<SendableRecordBatchStream> { | ||
let ctx = self.session_ctx.clone(); | ||
let task = async move { | ||
let df = ctx.execute_logical_plan(logical_plan).await?; | ||
df.execute_stream().await | ||
}; | ||
if let Some(executor) = &self.executor { | ||
let job = executor.spawn(task).map_err(|e| eyre!(e)); | ||
let job_res = job.await?; | ||
job_res.map_err(|e| eyre!(e)) | ||
} else { | ||
task.await.map_err(|e| eyre!(e)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using dedicated executor here for stream execution
.await | ||
{ | ||
|
||
let logical_plan = self.execution.statement_to_logical_plan(statement).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FlightSQL server calls the method that use dedicated executor here
match self | ||
let stream = self | ||
.execution | ||
.session_ctx() | ||
.execute_logical_plan(plan) | ||
.await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FlightSQL server calls the method that use dedicated executor here
src/config.rs
Outdated
fn default_dedicated_executor_threads_percent() -> f64 { | ||
0.75 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default we give the dedicated executor 75% of available CPU threads - since this in config it is configurable
src/execution/executor/dedicated.rs
Outdated
let cpus = num_cpus::get(); | ||
let cpu_threads = | ||
(config.dedicated_executor_threads_percent * cpus as f64) as usize; | ||
|
||
let mut runtime_builder = runtime_builder; | ||
let runtime = runtime_builder | ||
.worker_threads(cpu_threads) | ||
.on_thread_start(move || register_io_runtime(io_handle.clone())) | ||
.build() | ||
.expect("Creating tokio runtime"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set CPU worker threads here
src/main.rs
Outdated
let main_threads = if state.config.execution.dedicated_executor_enabled { | ||
// Just for IO | ||
(cpus as f64 * (1.0 - state.config.execution.dedicated_executor_threads_percent)) as usize | ||
} else { | ||
cpus | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set main / io worker threads here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe i could just build the threads logic into the config (i.e. have fields for main / dedicated executor threads) that are computed at config load time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW we found it is ok to slightly over commit the CPUs (like if you have 8 CPUs total, it is often fine to have an 8 CPUs in the dedicated worker, and 1 CPU).
For most workloads, leaving a single CPU to process network requests is totally fine (e.g. with 8 CPUs total, using 7 in the dedicated executor is fine as it will leave 1 entire CPU to do most of the real work)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes a lot of sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to this
@alamb if you get the chance to review this again would be great, i think its getting close. i plan to do a once over and add some more comments in the next day and hopefully merge after that. |
Will try and check it out tomorrow morning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @matthewmturner -- I think this looks very cool. It is also inspiring me to port this stuff upstream for the "different threadpool example"
src/main.rs
Outdated
let main_threads = if state.config.execution.dedicated_executor_enabled { | ||
// Just for IO | ||
(cpus as f64 * (1.0 - state.config.execution.dedicated_executor_threads_percent)) as usize | ||
} else { | ||
cpus | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW we found it is ok to slightly over commit the CPUs (like if you have 8 CPUs total, it is often fine to have an 8 CPUs in the dedicated worker, and 1 CPU).
For most workloads, leaving a single CPU to process network requests is totally fine (e.g. with 8 CPUs total, using 7 in the dedicated executor is fine as it will leave 1 entire CPU to do most of the real work)
ddl_path: Option<PathBuf>, | ||
/// Dedicated executor for running CPU intensive work | ||
executor: Option<DedicatedExecutor>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might make the code cleaner if you always created a DedicatedExecutor rather than only with the FlightSQL (a possible future enhancement)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going back and forth on this. I ended up going with this approach so there was still a "default" datafusion experience.
That being said, this is meant to be an opinionated datafusion implementation so it does probably make sense to just have one approach.
I also had in mind doing some benchmarks under different loads - and the baseline would be without the dedicated executor so the current setup would be useful for that.
For the time being I will live with this code smell until I figure out next steps but i do think i will end up getting rid of this.
/// | ||
/// # Panic | ||
/// Needs a IO runtime [registered](register_io_runtime). | ||
pub async fn spawn_io<Fut>(fut: Fut) -> Fut::Output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I don't see in this PR is actually calling spawn_io
- if this isn't called, then during an ExecutionPlan that is run on the dedicated executor, the IO will be done on the DedicatedExector's threadpool
Here is where influxdb3 calls this:
I think a better idea is likely to create some sort of ObjectStore wrapper that forwards all network calls to the IO runtime. I'll try and work up some version of that in the DataFusion example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this reference, very helpful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb i understand iOX heavily uses parquet files but based on what i see that means any other types of files would be run on the DedicatedExecutor
- correct? I think I will need to come up with a more general purpose solution as the idea with dft
is to be general purpose / work with all datafusion supported file types (CSV, JSON, Arrow IPC, etc) - perhaps i will create a wrapping ObjectStore
where all calls to the underlying ObjectStore
use spawn_io
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going to add this ObjectStore
wrapper in a follow on PR. Then I think this wrapper ObjectStore
could be used anywhere an ObjectStore
is needed - like the ParquetFileReaderFactory
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome -- thank you -- I think your idea is 👌 very good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do it!
Add's a dedicated executor for running CPU bound work on the FlightSQL server.
There is interest from the DataFusion community for this, it was already on our roadmap and I think the DFT FlightSQL server is a great place to have a reference implementation.
Initial inspiration and context can be found here.
Most of the initial implementation was copied from here with some tweaks for our current setup. In particular we dont have metrics yet in the FlightSQL server implementation (but it is on the roadmap) - I expect to do a follow on where metrics are integrated.