Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: flow perf&fix df func call #4347

Merged
merged 6 commits into from
Jul 15, 2024
Merged

Conversation

discord9
Copy link
Contributor

@discord9 discord9 commented Jul 11, 2024

I hereby agree to the terms of the GreptimeDB CLA.

Refer to a related PR or issue link (optional)

What's changed and what's your intention?

rewrite flow's worker running strategy and flow's source impl to prevent blocking, also fix some df function call and added tests.

  • make flow worker running freq related to data source's buf size
  • fix calling DATE_BIN datafusion function given wrong argument&add related sqlness tests

Checklist

  • I have written the necessary rustdoc comments.
  • I have added the necessary unit tests and integration tests.
  • This PR requires documentation updates.

Summary by CodeRabbit

  • New Features

    • Introduced batch processing for rows in SourceSender for improved efficiency.
    • Added error handling capabilities in the mfp_subgraph function.
    • New method is_temporal in MfpPlan struct to identify temporal predicates.
  • Improvements

    • Enhanced flow control and row count tracking in FlowWorkerManager.
    • Updated logging mechanism in src_sink.rs to provide clearer insights on buffered rows.
    • Refactored reduce_subgraph to utilize batch processing for key-value extraction.
  • Bug Fixes

    • Corrected and unified the handling of different literal types and scalar values.
  • Tests

    • Added new test cases for interval interpretation and filtering, and functions like date_bin and date_trunc.

Copy link
Contributor

coderabbitai bot commented Jul 11, 2024

Warning

Rate limit exceeded

@discord9 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 9 minutes and 8 seconds before requesting another review.

How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

Commits

Files that changed from the base of the PR and between c092b72 and a40915e.

Walkthrough

The changes enhance the processing efficiency and error handling within the FlowWorkerManager and related components by optimizing batch processing, refining flow control, and improving data conversion handling. Channel-based communication is introduced in the SourceSender struct for better performance, while new functions and constants like BATCH_SIZE streamline various operations. Error handling is improved across multiple modules, and new tests ensure robust functionality for temporal and interval data processing.

Changes

Files Change Summary
src/flow/src/adapter.rs Modified imports, added BATCH_SIZE constant, functions get_buf_size, run, and run_available updated for improved timing, row count handling, and flow control.
src/flow/src/adapter/node_context.rs Refactored SourceSender struct to use channels, enhanced try_send_all and send_rows methods for batch processing efficiency.
src/flow/src/compute/render/map.rs Added error handling for temporal filtering in mfp_subgraph function.
src/flow/src/compute/render/reduce.rs Introduced batch_split_rows_to_key_val function for efficient key-value extraction.
src/flow/src/compute/render/src_sink.rs Adjusted logging logic to reflect the number of rows buffered by the Source Operator.
src/flow/src/expr/linear.rs Added is_temporal method to MfpPlan struct for checking temporal predicates.
src/flow/src/repr.rs Introduced BATCH_SIZE constant.
src/flow/src/transform/expr.rs Improved error handling, added data type handling for timestamps, refactored functions for better literal argument processing and constant folding.
src/flow/src/transform/literal.rs Added to_substrait_literal function and modified from_substrait_literal function for better literal conversion handling.
tests/cases/standalone/common/flow/basic.sql Added tests for interval interpretation and data filtering, removed outdated insertions and selections.
tests/cases/standalone/common/flow/df_func.sql Added SQL queries for testing date_bin and date_trunc functions with timestamp intervals and number calculations.

Poem

In the code where data flows,
Channels hum and logic grows.
Rows are batched with nimble grace,
Errors caught, no time to waste.
Temporal checks ensure the fight,
Data shines in every byte.
A rabbit’s joy, this code's delight! 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot added the docs-not-required This change does not impact docs. label Jul 11, 2024
feat: make source sender `send` non-blocking

feat: better control of flow worker freq

feat: support transform interval

fix: const folding df func args&tests

tests: update cast const fold

chore: adjust flow work's freq

refactor: batch split

feat: adaptive run freq flow worker&check for errors

chore: better debug log
@discord9 discord9 marked this pull request as ready for review July 12, 2024 07:45
@discord9 discord9 requested review from zhongzc, waynexia and a team as code owners July 12, 2024 07:45
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Outside diff range, codebase verification and nitpick comments (2)
src/flow/src/adapter.rs (1)

Line range hint 565-602: Ensure proper error handling for flush_all_sender.

The function handles errors from flush_all_sender but does not provide specific handling or logging for different error types. Ensure that all potential errors are logged and handled appropriately.

// Add specific error handling and logging for different error types
Err(err) => {
    common_telemetry::error!("Flush send buf errors: {:?}", err);
    // Handle specific error types if needed
    break;
}
src/flow/src/transform/expr.rs (1)

Line range hint 46-74: Ensure comprehensive type mappings.

The function typename_to_cdt handles various type names but lacks comprehensive mappings for all possible types. Ensure that all possible types are covered.

// Add handling for remaining types
"Time32(Second)" => CDT::time_second_datatype(),
"Time32(Millisecond)" => CDT::time_millisecond_datatype(),
// Add similar handling for other time types, intervals, etc.
Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 63acc30 and 6dee935.

Files selected for processing (13)
  • src/flow/src/adapter.rs (6 hunks)
  • src/flow/src/adapter/node_context.rs (5 hunks)
  • src/flow/src/compute/render/map.rs (1 hunks)
  • src/flow/src/compute/render/reduce.rs (2 hunks)
  • src/flow/src/compute/render/src_sink.rs (1 hunks)
  • src/flow/src/expr/linear.rs (1 hunks)
  • src/flow/src/repr.rs (1 hunks)
  • src/flow/src/transform/expr.rs (7 hunks)
  • src/flow/src/transform/literal.rs (2 hunks)
  • tests/cases/standalone/common/flow/basic.result (2 hunks)
  • tests/cases/standalone/common/flow/basic.sql (1 hunks)
  • tests/cases/standalone/common/flow/df_func.result (1 hunks)
  • tests/cases/standalone/common/flow/df_func.sql (1 hunks)
Additional comments not posted (31)
tests/cases/standalone/common/flow/basic.sql (4)

33-33: LGTM!

The comment clearly indicates the purpose of the following SQL statements.


41-43: LGTM!

The SQL statements for creating the out_num_cnt table are correct.


45-47: LGTM!

The SQL statement for creating the filter_numbers flow correctly tests interval interpretation.


49-53: LGTM!

The SQL statements for dropping the flow and tables are correct.

tests/cases/standalone/common/flow/basic.result (4)

62-62: LGTM!

The comment clearly indicates the purpose of the following SQL statement results.


72-74: LGTM!

The results of the SQL statements for creating the out_num_cnt table are correct.


78-90: LGTM!

The results of the SQL statements for creating the filter_numbers flow and showing its creation are correct.


92-100: LGTM!

The results of the SQL statements for dropping the flow and tables are correct.

tests/cases/standalone/common/flow/df_func.sql (6)

68-68: LGTM!

The comment clearly indicates the purpose of the following SQL statements.


70-75: LGTM!

The SQL statements for creating the numbers_input table are correct.


77-80: LGTM!

The SQL statement for creating the test_numbers flow correctly tests the date_bin function.


82-96: LGTM!

The SQL statements for inserting data and querying the out_num_cnt table are correct.


98-100: LGTM!

The SQL statements for dropping the flow and tables are correct.


102-134: LGTM!

The SQL statements for testing the date_trunc function are correct.

tests/cases/standalone/common/flow/df_func.result (7)

127-127: LGTM!

The comment clearly indicates the purpose of the following SQL statement results.


135-135: LGTM!

The results of the SQL statements for creating the numbers_input table are correct.


137-143: LGTM!

The results of the SQL statement for creating the test_numbers flow correctly test the date_bin function.


144-150: LGTM!

The results of the SQL statements for inserting data and querying the out_num_cnt table are correct.


151-176: LGTM!

The results of the SQL statements for testing the date_bin function are correct.


177-188: LGTM!

The results of the SQL statements for dropping the flow and tables are correct.


189-248: LGTM!

The results of the SQL statements for testing the date_trunc function are correct.

src/flow/src/compute/render/src_sink.rs (1)

99-100: Added logging for buffered rows in Source Operator.

The debug log provides useful information about the number of rows buffered by the Source Operator, which can help in diagnosing performance issues.

src/flow/src/repr.rs (1)

59-60: Introduced constant BATCH_SIZE.

The new constant BATCH_SIZE is defined as half of BROADCAST_CAP, which could help in optimizing memory usage and processing efficiency.

src/flow/src/compute/render/map.rs (1)

149-162: Enhanced error handling and temporal filtering in mfp_subgraph.

The changes ensure that the output from the future is empty if a temporal filter is not applied, enhancing the robustness of the function.

src/flow/src/adapter/node_context.rs (3)

68-81: Enhanced SourceSender with unbounded channels for send buffer.

The changes introduce unbounded channels for the send buffer, which can improve the efficiency of data sending and receiving.


96-114: Improved try_send_all method to handle send buffer logic.

The changes ensure that rows are sent from the buffer until it is empty or the broadcast channel is full, enhancing the robustness of the method.


130-137: Enhanced send_rows method to handle send buffer logic.

The changes ensure that rows are sent to the buffer and handled appropriately, enhancing the robustness of the method.

src/flow/src/transform/literal.rs (1)

194-276: Ensure robust handling of user-defined types.

User-defined types are handled, but ensure that all possible variations and edge cases are covered to avoid unexpected errors.

Do you have unit tests covering all variations of user-defined types?

src/flow/src/adapter.rs (1)

487-489: LGTM!

The function get_buf_size looks good and is correctly implemented.

src/flow/src/transform/expr.rs (1)

171-207: Ensure robust handling of scalar function arguments.

The function rewrite_scalar_function handles scalar function arguments but lacks comprehensive error handling for all potential cases. Ensure that all possible cases are covered to avoid unexpected errors.

Do you have unit tests covering all variations of scalar function arguments?

src/flow/src/expr/linear.rs (1)

591-593: LGTM! But verify the method usage in the codebase.

The code changes are approved.

However, ensure the new method is_temporal is used correctly in the codebase.

Verification successful

The is_temporal method is used correctly in the codebase.

The method is_temporal is used in a logical and appropriate manner within the src/flow/src/compute/render/map.rs file. The conditional check ensures that the output is empty if the mfp_plan is not temporal, which is consistent with the intended functionality.

  • src/flow/src/compute/render/map.rs
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of the `is_temporal` method in the codebase.

# Test: Search for the method usage. Expect: Ensure the method is used correctly.
rg --type rust -A 5 $'is_temporal'

Length of output: 940


Script:

#!/bin/bash
# Description: Extract more lines around the usage of the `is_temporal` method in `src/flow/src/compute/render/map.rs`.

# Extract 10 lines before and after the usage of `is_temporal` method.
rg --type rust -A 10 -B 10 $'is_temporal' src/flow/src/compute/render/map.rs

Length of output: 689

src/flow/src/compute/render/reduce.rs Show resolved Hide resolved
src/flow/src/transform/literal.rs Show resolved Hide resolved
src/flow/src/transform/literal.rs Show resolved Hide resolved
src/flow/src/adapter.rs Outdated Show resolved Hide resolved
Copy link

codecov bot commented Jul 12, 2024

Codecov Report

Attention: Patch coverage is 28.48101% with 226 lines in your changes missing coverage. Please review.

Project coverage is 84.86%. Comparing base (63acc30) to head (a40915e).
Report is 13 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4347      +/-   ##
==========================================
- Coverage   85.18%   84.86%   -0.32%     
==========================================
  Files        1061     1068       +7     
  Lines      189477   191512    +2035     
==========================================
+ Hits       161397   162533    +1136     
- Misses      28080    28979     +899     

@discord9
Copy link
Contributor Author

Test this branch with tsbs and get result as follow: standalone 498610 rows/sec with one flow task, baseline is 529158.32 rows/sec, distributed(1 frontend, 1 datanode, 1 metasrv, 1 flownode all on local machine) at 410452.36 rows/sec with baseline being 506380.45 rows/sec

src/flow/src/adapter.rs Outdated Show resolved Hide resolved
src/flow/src/adapter.rs Outdated Show resolved Hide resolved
src/flow/src/transform/expr.rs Outdated Show resolved Hide resolved
src/flow/src/transform/expr.rs Outdated Show resolved Hide resolved
tests/cases/standalone/common/flow/df_func.sql Outdated Show resolved Hide resolved
src/flow/src/transform/literal.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 6dee935 and 40d8614.

Files selected for processing (5)
  • src/flow/src/adapter.rs (6 hunks)
  • src/flow/src/transform/expr.rs (6 hunks)
  • src/flow/src/transform/literal.rs (2 hunks)
  • tests/cases/standalone/common/flow/basic.sql (1 hunks)
  • tests/cases/standalone/common/flow/df_func.sql (1 hunks)
Files skipped from review as they are similar to previous changes (3)
  • src/flow/src/adapter.rs
  • src/flow/src/transform/literal.rs
  • tests/cases/standalone/common/flow/basic.sql
Additional comments not posted (7)
tests/cases/standalone/common/flow/df_func.sql (2)

69-101: LGTM! Test case for date_bin is complete and correctly structured.

The SQL syntax and logic appear accurate, and the test case effectively validates the date_bin function.


103-134: LGTM! Test case for date_trunc is complete and correctly structured.

The SQL syntax and logic appear accurate, and the test case effectively validates the date_trunc function.

src/flow/src/transform/expr.rs (5)

Line range hint 46-75: LGTM! Function typename_to_cdt handles type name conversions correctly.

The function logic is comprehensive and correctly maps type names to their corresponding CDT values.


157-166: LGTM! Function build_proto_lit correctly builds a proto literal.

The function logic is straightforward and accurately constructs a proto literal from a substrait literal.


171-207: LGTM! Function rewrite_scalar_function correctly handles argument rewriting.

The function logic is comprehensive and accurately rewrites scalar function arguments to columns.


213-216: LGTM! Function from_substrait_to_datafusion_scalar_func correctly converts scalar functions.

The function logic is comprehensive and accurately converts substrait scalar functions to DataFusion scalar functions.


302-316: LGTM! Function typename_to_cdt handles type name conversions correctly.

The function logic is comprehensive and correctly maps type names to their corresponding CDT values.

@discord9 discord9 mentioned this pull request Jul 15, 2024
13 tasks
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 40d8614 and 33cdeff.

Files selected for processing (2)
  • src/flow/src/adapter/node_context.rs (6 hunks)
  • src/flow/src/compute/render/map.rs (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • src/flow/src/compute/render/map.rs
Additional comments not posted (4)
src/flow/src/adapter/node_context.rs (4)

68-71: LGTM! Verify the usage of new fields in SourceSender.

The new fields send_buf_tx and send_buf_rx are correctly added to handle backpressure and improve data flow.

However, ensure that these fields are used correctly throughout the codebase.

Verification successful

Verified: Proper usage of send_buf_tx and send_buf_rx in the SourceSender struct.

The fields send_buf_tx and send_buf_rx are correctly integrated and used within the SourceSender struct to handle data flow efficiently.

  • send_buf_tx is used to send data.
  • send_buf_rx is used to receive data with proper locking mechanisms.
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of `send_buf_tx` and `send_buf_rx` in the codebase.

# Test: Search for the usage of `send_buf_tx` and `send_buf_rx`. Expect: Proper usage in the codebase.
rg --type rust -A 5 $'send_buf_tx'
rg --type rust -A 5 $'send_buf_rx'

Length of output: 3901


171-173: LGTM! Verify the usage of get_send_buf_size.

The get_send_buf_size method is well-implemented, ensuring thread safety and returning the total number of rows in all sender buffers.

However, ensure that this method is used correctly throughout the codebase.

Verification successful

LGTM! Verify the usage of get_send_buf_size.

The get_send_buf_size method is well-implemented, ensuring thread safety and returning the total number of rows in all sender buffers. Its usage in the src/flow/src/adapter.rs file appears to be correct and consistent with its intended functionality.

  • src/flow/src/adapter.rs:
    • Line 1: self.node_context.read().await.get_send_buf_size().await
    • Line 2: (ctx.flush_all_sender().await, ctx.get_send_buf_size().await)
    • Line 3: Ok(ctx) => (ctx.flush_all_sender().await, ctx.get_send_buf_size().await)
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of `get_send_buf_size` in the codebase.

# Test: Search for the usage of `get_send_buf_size`. Expect: Proper usage in the codebase.
rg --type rust -A 5 $'get_send_buf_size'

Length of output: 1509


162-164: LGTM! Verify the usage of flush_all_sender.

The flush_all_sender method is well-implemented, handling errors and ensuring that all sender buffers are flushed.

However, ensure that this method is used correctly throughout the codebase.

Verification successful

The usage of flush_all_sender is correct.

The method flush_all_sender is properly awaited and its result is handled correctly in both blocking and non-blocking contexts. Errors are appropriately logged and managed.

  • src/flow/src/adapter.rs: Lines around the usage of flush_all_sender.
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of `flush_all_sender` in the codebase.

# Test: Search for the usage of `flush_all_sender`. Expect: Proper usage in the codebase.
rg --type rust -A 5 $'flush_all_sender'

Length of output: 1108


Script:

#!/bin/bash
# Description: Retrieve more context around the usage of `flush_all_sender` in `src/flow/src/adapter.rs`.

# Extracting 10 lines before and after each occurrence of `flush_all_sender` for better context.
rg --type rust -A 10 -B 10 'flush_all_sender' src/flow/src/adapter.rs

Length of output: 1089


130-135: LGTM! Verify the usage of send_rows.

The send_rows method is well-implemented, handling errors and ensuring that rows are sent to the buffer.

However, ensure that this method is used correctly throughout the codebase.

Verification successful

Verified: The send_rows method is used correctly within the codebase.

The send_rows method is only used within the src/flow/src/adapter/node_context.rs file, and its implementation and usage are correct.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of `send_rows` in the codebase.

# Test: Search for the usage of `send_rows`. Expect: Proper usage in the codebase.
rg --type rust -A 5 $'send_rows'

Length of output: 874


Script:

#!/bin/bash
# Description: Verify the usage of `send_rows` throughout the entire codebase.

# Test: Search for the usage of `send_rows`. Expect: Proper usage in the codebase.
rg --type rust 'send_rows'

Length of output: 217

src/flow/src/adapter/node_context.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 33cdeff and c092b72.

Files selected for processing (3)
  • src/flow/src/adapter.rs (10 hunks)
  • src/flow/src/adapter/node_context.rs (5 hunks)
  • src/operator/src/insert.rs (1 hunks)
Files skipped from review due to trivial changes (1)
  • src/operator/src/insert.rs
Files skipped from review as they are similar to previous changes (2)
  • src/flow/src/adapter.rs
  • src/flow/src/adapter/node_context.rs

@zhongzc zhongzc enabled auto-merge July 15, 2024 09:04
@discord9 discord9 mentioned this pull request Jul 15, 2024
3 tasks
@zhongzc zhongzc added this pull request to the merge queue Jul 15, 2024
Merged via the queue into GreptimeTeam:main with commit 2b912d9 Jul 15, 2024
46 of 53 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs-not-required This change does not impact docs.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants