-
Notifications
You must be signed in to change notification settings - Fork 308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(flow): make from_substrait_*
async& worker handle refactor
#4210
Conversation
from_substrait_*
async& worker handle refactorfrom_substrait_*
async& worker handle refactor
WalkthroughThe recent update introduces substantial asynchronous capabilities into the project by changing multiple synchronous functions to asynchronous, enabled by the Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Worker
participant InterThreadCallClient
participant InterThreadCallServer
participant RequestHandler
Client->>+Worker: Send Request
Worker->>+InterThreadCallClient: Forward Request
InterThreadCallClient->>+RequestHandler: Handle Request asynchronously
RequestHandler-->>-InterThreadCallClient: Send Response
InterThreadCallClient-->>-Worker: Receive Response
Worker-->>-Client: Send Response
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Outside diff range and nitpick comments (2)
src/flow/src/transform/aggr.rs (2)
Line range hint
90-122
: Review offrom_substrait_agg_measures
async implementationThe conversion of
from_substrait_agg_measures
to async is well executed. The handling of the optional filter using.await
and.transpose()
is a clean approach to conditionally process asynchronous operations. However, the commentTODO(discord9): impl order_by
suggests that order by functionality is yet to be implemented, which might be critical depending on the use case.Would you like assistance in implementing the order by functionality or should we track this as an issue?
Line range hint
150-165
: Async conversion infrom_substrait_agg_func
This function's conversion to async is well-handled, especially with the asynchronous fetching of arguments using
TypedExpr::from_substrait_rex(e, input_schema, extensions).await
. The error handling withnot_impl_err!
for unsupported argument types is a good defensive programming practice. However, the commentTODO(discord9): impl filter
suggests that filter functionality is not yet implemented. This could be important for the function's completeness and correctness.Would you like me to help implement the filtering functionality or should we create a tracking issue for this?
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
Files selected for processing (8)
- src/flow/Cargo.toml (1 hunks)
- src/flow/src/adapter/worker.rs (14 hunks)
- src/flow/src/expr/scalar.rs (5 hunks)
- src/flow/src/transform.rs (1 hunks)
- src/flow/src/transform/aggr.rs (23 hunks)
- src/flow/src/transform/expr.rs (22 hunks)
- src/flow/src/transform/literal.rs (1 hunks)
- src/flow/src/transform/plan.rs (7 hunks)
Files skipped from review due to trivial changes (1)
- src/flow/Cargo.toml
Additional comments not posted (19)
src/flow/src/transform/literal.rs (1)
175-175
: Refactor to asynchronous function call approved.The change to make
from_substrait_plan
asynchronous aligns with the PR's goal to enhance asynchronous processing. This is a crucial step in improving the responsiveness and scalability of the system.src/flow/src/transform.rs (1)
143-143
: Asynchronous function implementation approved.Converting
sql_to_flow_plan
to an asynchronous function is in line with modern Rust async programming practices, especially for IO-bound tasks. This change should help in managing more complex flows efficiently.src/flow/src/transform/plan.rs (4)
35-35
: Asynchronous conversion of function approved.Converting
from_substrait_plan
to async improves the system's ability to handle concurrent operations efficiently, aligning with the PR's objectives.
68-68
: Asynchronous function implementation approved.Making
from_substrait_project
asynchronous is a significant improvement for handling complex projection operations that may involve IO or other asynchronous operations.
136-136
: Asynchronous function implementation approved.The async refactor of
from_substrait_filter
enhances the handling of filter operations, which can be particularly beneficial when these operations are dependent on external asynchronous calls or data fetching.
155-155
: Asynchronous function implementation approved.The conversion of
from_substrait_read
to an asynchronous function is crucial for efficient data retrieval operations, which are often IO-bound.src/flow/src/adapter/worker.rs (1)
18-18
: Comprehensive refactor of worker mechanisms approved.The changes in
worker.rs
enhance the system's capability to handle asynchronous operations and inter-thread communications more robustly. The use ofoneshot
channels for response handling and the simplification of atomic operations are in line with modern Rust practices and contribute to better scalability and error handling.Also applies to: 24-25, 42-42, 109-109, 125-125, 139-140, 156-156, 161-161, 174-176, 188-188, 257-285, 303-341, 394-394, 400-400, 405-405, 408-418, 424-424, 428-428, 432-432
src/flow/src/transform/expr.rs (5)
Line range hint
62-81
: Refactor to asynchronous function signature and implementation.The function
from_scalar_fn_to_df_fn_impl
has been correctly refactored to be asynchronous, which aligns with the PR's objectives to improve asynchronous handling. The usage of.await
on thesubstrait::df_logical_plan::consumer::from_substrait_rex
call is appropriate for handling asynchronous operations.
Line range hint
140-154
: Refactor to asynchronous function signature and implementation.The function
from_substrait_to_datafusion_scalar_func
has been refactored to be asynchronous. This change is part of the broader effort to make the codebase more asynchronous and responsive. The function now properly awaits the asynchronous functionRawDfScalarFn::from_proto
andDfScalarFunction::try_from_raw_fn
, ensuring that all operations that could block are handled asynchronously.
Line range hint
165-299
: Refactor to asynchronous function signature and implementation.The function
from_substrait_scalar_func
has been converted to an asynchronous function. This is in line with the PR's objectives and the modernization of the codebase to use asynchronous programming patterns. The function effectively handles asynchronous operations, particularly with the.await
calls onTypedExpr::from_substrait_rex
within a loop, which is a good practice for operations that might block.
307-344
: Refactor to asynchronous function signature and implementation.The function
from_substrait_ifthen_rex
has been converted to asynchronous, which is consistent with the rest of the changes in the PR. The use of.await
in potentially blocking calls within conditional logic is correctly implemented. This ensures that the function does not block the runtime while waiting for these operations to complete.
Line range hint
368-422
: Refactor to asynchronous function signature and implementation.The function
from_substrait_rex
has been marked with theasync_recursion
attribute and refactored to be asynchronous. This change supports recursive asynchronous calls within the function, which is crucial given the nature of the operations it performs. The handling of differentRexType
cases and their respective asynchronous operations is well implemented.src/flow/src/expr/scalar.rs (3)
194-196
: Approved the conversion to async fortry_from_raw_fn
.The conversion to asynchronous function is correctly implemented and aligns with the PR's objectives to handle computational heavy tasks more efficiently.
284-286
: Consider alternatives toblock_on
in deserialization.While the use of
block_on
here is understood due to the computational heaviness of the task, it's generally recommended to avoid blocking in async contexts as it can lead to performance bottlenecks. Consider if there are alternative approaches that could be used here.
[REFACTOR_SUGGESTion]
Line range hint
317-326
: Approved the conversion to async forget_fn_impl
.Converting
get_fn_impl
to async is appropriate given the I/O bound nature of decoding a function from bytes. This aligns with the PR's objectives to improve asynchronous handling.src/flow/src/transform/aggr.rs (4)
Line range hint
61-72
: Makefrom_substrait_agg_grouping
asynchronousThe function
from_substrait_agg_grouping
has been correctly refactored to be asynchronous, which aligns with the PR's objective to enhance async handling. The use of.await
onTypedExpr::from_substrait_rex
is appropriate for handling asynchronous operations within the loop.
Line range hint
314-346
: Async handling infrom_substrait_agg_rel
The method
from_substrait_agg_rel
has been refactored to be asynchronous, which is in line with the PR's objectives. The async recursion is correctly applied, and the method handles complex asynchronous calls effectively, such as fetching group expressions and aggregate measures concurrently. The chaining and expansion of results are handled cleanly. However, ensure that the error handling innot_impl_err!
for "Aggregate without an input is not supported" is sufficient and that such cases are appropriately managed upstream.
490-492
: Ensure error handling in test for missing keysThis test case checks for error conditions when keys are missing, which is crucial for robustness. It's good to see async-await being used effectively in test scenarios to simulate real-world usage.
502-504
: Basic functionality test forfrom_substrait_plan
The test verifies basic functionality and ensures that the async transformation works as expected. The result comparison using
assert_eq!
is a good practice for unit tests.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4210 +/- ##
==========================================
- Coverage 85.05% 84.58% -0.47%
==========================================
Files 1031 1038 +7
Lines 181276 183041 +1765
==========================================
+ Hits 154176 154829 +653
- Misses 27100 28212 +1112 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Outside diff range and nitpick comments (2)
src/flow/src/utils.rs (2)
43-43
: Review the removal of serialization traits.The removal of
Serialize
andDeserialize
traits fromKeyExpiryManager
could affect serialization and deserialization processes across the system. Ensure that this change aligns with the overall architecture and that any serialization needs are handled appropriately elsewhere.
160-160
: Consider the implications of struct changes on existing functionality.The
Arrangement
struct now omitsSerialize
andDeserialize
traits. This change should be carefully reviewed to ensure it does not break any existing functionality that relies on these traits, especially in distributed or persisted environments.
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (8)
- src/flow/src/expr/func.rs (1 hunks)
- src/flow/src/expr/linear.rs (2 hunks)
- src/flow/src/expr/relation.rs (2 hunks)
- src/flow/src/expr/scalar.rs (7 hunks)
- src/flow/src/plan.rs (2 hunks)
- src/flow/src/plan/join.rs (3 hunks)
- src/flow/src/plan/reduce.rs (4 hunks)
- src/flow/src/utils.rs (2 hunks)
Files skipped from review due to trivial changes (2)
- src/flow/src/expr/func.rs
- src/flow/src/plan/reduce.rs
Additional comments not posted (2)
src/flow/src/plan/join.rs (1)
21-21
: Serialization traits removed from join-related structs.The removal of
Serialize
andDeserialize
traits fromJoinPlan
,JoinFilter
,LinearJoinPlan
, andLinearStagePlan
could impact serialization processes elsewhere in the codebase. Ensure that all serialization requirements are met with alternative implementations if necessary.Also applies to: 27-27, 40-40, 63-63
src/flow/src/expr/scalar.rs (1)
46-46
: Asynchronous function and serialization traits changes in scalar expressions.The
try_from_raw_fn
function has been made asynchronous, which aligns with the overall goal of making the system more asynchronous. Additionally, the removal ofSerialize
andDeserialize
traits fromTypedExpr
andScalarExpr
could impact serialization processes elsewhere in the codebase. Ensure that all serialization requirements are met with alternative implementations if necessary.Also applies to: 132-132, 194-194, 267-267
Verification successful
Asynchronous function and serialization traits changes in scalar expressions.
The
try_from_raw_fn
function has been made asynchronous, which aligns with the overall goal of making the system more asynchronous. Additionally, the removal ofSerialize
andDeserialize
traits fromTypedExpr
andScalarExpr
could impact serialization processes elsewhere in the codebase. However, after an extensive search, no direct or indirect serialization methods forTypedExpr
andScalarExpr
were found in the codebase.
- Ensure that all serialization requirements are met with alternative implementations if necessary.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify if `TypedExpr` and `ScalarExpr` are serialized elsewhere in the codebase. # Test: Search for serialization of `TypedExpr` and `ScalarExpr`. Expect: No occurrences. rg --type rust 'TypedExpr\|ScalarExpr' -A 3 | grep -i 'serialize'Length of output: 65
Script:
#!/bin/bash # Description: Verify if `TypedExpr` and `ScalarExpr` are serialized indirectly or through custom methods. # Test: Search for custom serialization implementations for `TypedExpr`. rg --type rust 'impl.*Serialize.*for.*TypedExpr' # Test: Search for custom serialization implementations for `ScalarExpr`. rg --type rust 'impl.*Serialize.*for.*ScalarExpr' # Test: Search for any references to `TypedExpr` and `ScalarExpr` in context with serialization. rg --type rust 'TypedExpr\|ScalarExpr' -A 10 -B 10 | grep -i 'serialize'Length of output: 175
@coderabbitai resolve |
Actions performedComments resolved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DLJB
…GreptimeTeam#4210) * refactor: use oneshot to receive result * refactor: make from_substrait_* async * refacrot: remove serde for plan&expr
…GreptimeTeam#4210) * refactor: use oneshot to receive result * refactor: make from_substrait_* async * refacrot: remove serde for plan&expr
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
make
from_substrait_*
async&refactor flow worker handle implPlease explain IN DETAIL what the changes are in this PR and why they are needed:
from_substrait_*
asyncblock_on
in deser to call async methodoneshot
to send back response to requestChecklist
Summary by CodeRabbit
New Features
async-recursion
dependency for enhanced async functionality.Refactor
Style
Chores
Serialize
andDeserialize
traits from multiple structs and enums, indicating a change in serialization strategy.