-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Abstract over logical and physical plan representations in Ballista #1677
Conversation
Added some additional changes:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this proposal @thinkharderdev
While I am not an expert in Ballista and I will defer to others for this PR, I did have a few comments:
Using a trait that handled serializing an entire logical plan would allow use cases like using alternate serializers for the plan (like one could imagine using JSON for example), but I am not sure how it directly helps serializing extension points.
What would you think about something like the following:
/// Describes something that knows how to serialize / deserialize the
/// contents of DataFusion user defined extension points
pub trait ExtensionSerializer {
/// Serializes a [UserDefinedLogicalNode] into an opaque set of
/// bytes for transport over the network
fn serialize_extension_node(node: &dyn UserDefinedLogicalNode) -> Result<Bytes, BallistaError>;
/// Deserializes a set of bytes created by
/// [serialize_extension_node] into a new instance of a
/// [UserDefinedLogicalNode]
fn deserialize_extension_node(bytes: Bytes) -> Result<Arc<dyn UserDefinedLogicalNode>, BallistaError>;
// TBD Similar functions for the other extension points
}
struct DefaultExtensionSerializer {
}
impl ExtensionSerializer for DefaultExtensionSerializer {
fn serialize_extension_node(node: &dyn UserDefinedLogicalNode) -> Result<Bytes> {
Err(BallistaError::NotImplemented(format!("Default serializer does not know how to serialize {:?}", node)))
}
fn deserialize_extension_node(_bytes: Bytes) -> Result<Arc<dyn UserDefinedLogicalNode>> {
Err(BallistaError::NotImplemented("Default serializer does not know how to deserialize user defined extensions".to_string()))
}
}
And an instance of &dyn ExtensionSerializer
could be passed when serializing plans?
Yeah, this is what I tried to do originally but it didn't work out very well. The basic problem I was running into is that since we decoding the message recursively from the leaves, as soon as you "break out" of the specific types defined in Ballista, you can no longer use any of the decoding machinery. For example, if I define a logical node like:
I have to encode my |
What if we changed the signature to something like the following (where the inputs are already provided)? /// Describes something that knows how to serialize / deserialize the
/// contents of DataFusion user defined extension points
pub trait ExtensionSerializer {
/// Serializes a [UserDefinedLogicalNode] into an opaque set of
/// bytes for transport over the network
fn serialize_extension_node(node: &dyn UserDefinedLogicalNode, inputs: Vec<LogicalNode>) -> Result<Bytes, BallistaError>;
//....
} |
Something like that could work. You might package it up in something like:
to allow you to pull the same trick on the decoding side. I think this could be complementary to the approach in this PR. We could change the signature for
|
e1f0ae7
to
098b9ae
Compare
…he ExecutionContext global to the server
022cf79
to
112fcd9
Compare
… custom object store
112fcd9
to
8057fa1
Compare
😆 |
@alamb Does this approach seem sensible? I think this PR is ready to merge unless we're uncomfortable with the basic approach. |
Looks fine with me -- cc @realno @yahoNanJing @Ted-Jiang any thoughts or opinions? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thinkharderdev there are couple left over comments that I think could be removed?
I am a fan of the proposed extension node message and trait design. I am cool with us merging this design as is since it provides new functionality. But I think if we go with the extension node design, then we won't need to have both the AsLogicalPlan
and AsExecutionPlan
trait anymore? And we will still be able to encode logical plan and physical plan as a proper typed message field in ballista.proto
instead of bytes?
// (&self.plan) | ||
// .try_into() | ||
// .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be deleted?
ballista/rust/scheduler/src/lib.rs
Outdated
// let mut config_builder = BallistaConfig::builder(); | ||
// for kv_pair in &settings { | ||
// config_builder = config_builder.set(&kv_pair.key, &kv_pair.value); | ||
// } | ||
// let config = config_builder.build().map_err(|e| { | ||
// let msg = format!("Could not parse configs: {}", e); | ||
// error!("{}", msg); | ||
// tonic::Status::internal(msg) | ||
// })?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be removed?
ballista/rust/scheduler/src/lib.rs
Outdated
error!("{}", msg); | ||
tonic::Status::internal(msg) | ||
})? | ||
// let ctx = create_datafusion_context(&config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left over comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this logic was removed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was an oversight. I did not think these settings were actually used anywhere but was not accounting for the SQL case. Discussing fixes on #1848
That said, really cool working demo and thanks for taking a stab at this :) |
@houqp Thanks! Removed the commented code. I like the pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
fn try_decode(buf: &[u8]) -> Result<Self, BallistaError>
where
Self: Sized;
fn try_encode<B>(&self, buf: &mut B) -> Result<(), BallistaError>
where
B: BufMut,
Self: Sized;
fn try_into_logical_plan(
&self,
ctx: &ExecutionContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan, BallistaError>;
fn try_from_logical_plan(
plan: &LogicalPlan,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<Self, BallistaError>
where
Self: Sized;
}
pub trait LogicalExtensionCodec: Debug + Send + Sync {
fn try_decode(
&self,
buf: &[u8],
inputs: &Vec<LogicalPlan>,
) -> Result<Extension, BallistaError>;
fn try_encode(&self, node: Extension, buf: &mut Vec<u8>)
-> Result<(), BallistaError>;
} where we extend the protobuf encoding of
I think this will be useful as well if we ultimately want to migrate to Substrait for a serializable representation of the plans. We can experiment without having to do a "hard cutover". I can also work on integrating the |
I will plan to merge this later today unless anyone would like to comment further CC @realno @yahoNanJing @Ted-Jiang @matthewmturner @andygrove as you have expressed interest in Ballista in the past |
LGTM thanks @thinkharderdev @alamb |
Good for me. Thank you @thinkharderdev @alamb |
Thanks everyone for their help and review! Thank you @thinkharderdev for your patience |
It seems that registering objectstore cannot be used. Please take a look at this issue.#2136 |
Which issue does this PR close?
Partially addresses apache/datafusion-ballista#8
Closes #.
Rationale for this change
This is an initial draft at how we can make Ballista able to leverage the extensibility of DataFusion. This PR in particular addresses a few items in apache/datafusion-ballista#8 :
SchedulerServer
over serializable representations of DataFusionLogicalPlan
sAsLogicalPlan
which can be implemented by a user to shim a custom serialization into existing Ballista functionality.AsExecutionPlan
which can be implemented by a user to shim a custom serialization into existing Ballista functionality.LogicalPlan
andExecutionPlan
"context aware" which should, in the immediate term help sort out Ballista does not support external file systems datafusion-ballista#10ExecutionContext
toSchedulerServer
andExecutor
to allow for registration of extensions (ObjectStore
etc).This is just an initial draft to demonstrate/validate the basic approach. Ideally we would extend this to:
What changes are included in this PR?
Are there any user-facing changes?
This shouldn't break anything using the existing executables as we default to the existing plan representation.