Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: flow cli for distributed #4226

Merged
merged 15 commits into from
Jul 1, 2024
Merged

Conversation

discord9
Copy link
Contributor

@discord9 discord9 commented Jun 28, 2024

I hereby agree to the terms of the GreptimeDB CLA.

Refer to a related PR or issue link (optional)

What's changed and what's your intention?

Add command line options for flownode, but didn't add FrontendInvoker for it now since PR size is alreay too large

Please explain IN DETAIL what the changes are in this PR and why they are needed:

Summary

NOTE: flownode service is still not working, since the FrontendInvoker is not yet set, so the flow's computing result will not be send back to frontend yet, this is planned in a future PR

Checklist

  • I have written the necessary rustdoc comments.
  • I have added the necessary unit tests and integration tests.
  • This PR requires documentation updates.

Summary by CodeRabbit

  • New Features

    • Introduced a new module for managing Greptime Flownode, including starting, stopping, and configuration.
    • Added a new Flownode subcommand to enhance command-line functionality.
  • Bug Fixes

    • Improved error handling for server start, shutdown, and address parsing.
  • Refactor

    • Renamed FlownodeManager to FlowWorkerManager and updated related references.
    • Streamlined import paths and reorganized module structures for better maintainability.

Copy link
Contributor

coderabbitai bot commented Jun 28, 2024

Walkthrough

The recent updates enhance system structure, error handling, and server management across various files in the project. Key changes include adding new modules for flownode management, refining error responses, and improving server initialization processes with advanced configuration options. Additionally, the heartbeating functionality for flownodes gets a refactor, improving how services communicate and maintain their state within a distributed environment.

Changes

File Path Summary of Changes
src/flow/src/adapter.rs Major updates to structures and management of flow workers.
src/flow/src/adapter/node_context.rs Minor updates for error handling in node contexts.
src/flow/src/error.rs Introduced new error handling variants for server operations.
src/flow/src/heartbeat.rs Enhanced heartbeating mechanism with added atomic control.
src/flow/src/server.rs & src/cmd/src/flownode.rs Refactored server setup and initialization; added flownode module with comprehensive management setup.
src/cmd/src/... Multiple CLI updates for error handling and command structuring.
src/flow/src/expr/... Streamlined error module import paths in expression handling files.
src/flow/src/lib.rs & others Adjusted module structure and public exports for clearer access and modular control.

Sequence Diagram(s)

sequenceDiagram
    participant CLI as Command Line Interface
    participant FD as Flownode
    participant FM as FlowWorkerManager
    participant MC as MetaClient
    participant HB as HeartbeatTask

    CLI->>FD: Start
    activate FD
    FD->>FM: Initialize
    activate FM
    FM->>MC: Setup MetaClient
    activate MC
    MC-->>FM: Client Ready
    deactivate MC
    FM->>HB: Launch Heartbeat
    activate HB
    HB-->>FM: Heartbeat Running
    deactivate HB
    FD-->>CLI: Flownode Running
    deactivate FM
    deactivate FD
    Note over FD,HB: Flownode operates with\nactive heartbeating and meta client connectivity.
Loading

Poem

🐇✨ While the moon softly glowed above,
This little rabbit tweaked the code with love.
🌿 Now servers hum with lesser fuss,
Errors shy away, no more to discuss.
In fields wide, where data roams,
🚀 Flownodes now find their homes.

Warning

Review ran into problems

Problems (1)
  • Git: Failed to clone repository. Please contact CodeRabbit support.

Tip

Early access features: enabled

We are currently testing the following features in early access:

  • OpenAI gpt-4o model for code reviews and chat: OpenAI claims that this model is better at understanding and generating code than the previous models. We seek your feedback over the next few weeks before making it generally available.

Note:

  • You can enable or disable early access features from the CodeRabbit UI or by updating the CodeRabbit configuration file.
  • Please join our Discord Community to provide feedback and report issues.
  • OSS projects are currently opted into early access features by default.

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot added the docs-not-required This change does not impact docs. label Jun 28, 2024
@discord9 discord9 changed the title feat: flow cli for distrbuted feat: flow cli for distributed Jun 28, 2024
@discord9 discord9 marked this pull request as ready for review June 28, 2024 09:36
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Outside diff range and nitpick comments (2)
src/flow/src/compute/render/map.rs (2)

Line range hint 67-67: Clarify the purpose of Error import here.

The Error import seems to be standalone without direct usage in the visible code. If it's used, ensure it's clear in the context; otherwise, consider removing it to clean up the imports.


Line range hint 457-463: Check error handling in the build method call.

The use of BoxedError::new might obscure the underlying error type, which can complicate debugging. Consider using more specific error handling or propagation techniques to maintain clarity.

- .map_err(BoxedError::new)
+ .context(SpecificErrorContext)?
Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between b6585e3 and db85f81.

Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
Files selected for processing (33)
  • src/cmd/src/bin/greptime.rs (3 hunks)
  • src/cmd/src/error.rs (2 hunks)
  • src/cmd/src/flownode.rs (1 hunks)
  • src/cmd/src/lib.rs (1 hunks)
  • src/cmd/src/standalone.rs (5 hunks)
  • src/flow/Cargo.toml (1 hunks)
  • src/flow/src/adapter.rs (8 hunks)
  • src/flow/src/adapter/flownode_impl.rs (1 hunks)
  • src/flow/src/adapter/node_context.rs (1 hunks)
  • src/flow/src/adapter/table_source.rs (1 hunks)
  • src/flow/src/adapter/util.rs (1 hunks)
  • src/flow/src/adapter/worker.rs (1 hunks)
  • src/flow/src/compute/render.rs (1 hunks)
  • src/flow/src/compute/render/map.rs (1 hunks)
  • src/flow/src/compute/render/reduce.rs (1 hunks)
  • src/flow/src/compute/render/src_sink.rs (1 hunks)
  • src/flow/src/error.rs (2 hunks)
  • src/flow/src/expr/func.rs (1 hunks)
  • src/flow/src/expr/linear.rs (1 hunks)
  • src/flow/src/expr/relation/func.rs (1 hunks)
  • src/flow/src/expr/scalar.rs (4 hunks)
  • src/flow/src/heartbeat.rs (5 hunks)
  • src/flow/src/lib.rs (1 hunks)
  • src/flow/src/plan.rs (1 hunks)
  • src/flow/src/repr/relation.rs (1 hunks)
  • src/flow/src/server.rs (4 hunks)
  • src/flow/src/transform.rs (1 hunks)
  • src/flow/src/transform/aggr.rs (1 hunks)
  • src/flow/src/transform/expr.rs (1 hunks)
  • src/flow/src/transform/literal.rs (1 hunks)
  • src/flow/src/transform/plan.rs (1 hunks)
  • src/meta-client/src/client.rs (1 hunks)
  • tests-integration/src/standalone.rs (2 hunks)
Files skipped from review due to trivial changes (12)
  • src/cmd/src/lib.rs
  • src/flow/Cargo.toml
  • src/flow/src/adapter/util.rs
  • src/flow/src/adapter/worker.rs
  • src/flow/src/compute/render.rs
  • src/flow/src/compute/render/src_sink.rs
  • src/flow/src/expr/func.rs
  • src/flow/src/expr/linear.rs
  • src/flow/src/repr/relation.rs
  • src/flow/src/transform/aggr.rs
  • src/flow/src/transform/expr.rs
  • src/flow/src/transform/plan.rs
Additional comments not posted (43)
src/flow/src/lib.rs (2)

30-30: Make heartbeat module public

This change exposes the heartbeat module, which likely contains functionality that needs to be accessible due to the new flownode features.


37-39: Expose flownode-related types

Exposing FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions, Error, Result, FlownodeBuilder, FlownodeInstance, and FlownodeServer from their respective modules. This is a necessary change to facilitate easier access and integration of the flownode functionality across the project.

src/cmd/src/bin/greptime.rs (3)

20-20: Include flownode in imports

Adding flownode to the imports list is crucial for using its functionality within this file, particularly for the CLI commands that manage the flownode service.


40-42: Add flownode service to CLI

This addition to the SubCommand enum enables the CLI to handle commands related to the flownode service, aligning with the new distributed system features.


79-84: Implement flownode service start command

This function implements the asynchronous operation to start the flownode service, using the configuration loaded from the CLI. It's a critical part of enabling the flownode functionality from the command line.

src/flow/src/adapter/table_source.rs (1)

23-24: Refine imports and error handling

Importing TableName and additional error types improves the clarity and maintainability of error handling within the TableSource class.

src/flow/src/error.rs (2)

166-192: Add new error types for server and client operations

These new error types (StartServer, ShutdownServer, MetaClientInit, ParseAddr) are essential for robust error handling in the context of starting and stopping servers, and initializing meta clients. This is particularly important for the reliability and maintainability of the flownode functionality.


Line range hint 200-224: Update status code function for new errors

The status_code function update is crucial to ensure that the new error types are correctly mapped to their respective status codes, which is important for consistent error handling across the system.

src/flow/src/adapter/flownode_impl.rs (3)

29-30: Refine imports and error handling in flownode implementation

Importing FlowWorkerManager and InternalSnafu enhances the clarity and maintainability of the flownode implementation, particularly in handling internal errors.


41-41: Implement handle function for flownode

This implementation of the handle function in the Flownode trait allows for handling various types of requests (create, drop) asynchronously, which is essential for the flownode's operation within the system.


41-41: Implement handle_inserts function for flownode

This implementation of the handle_inserts function allows the flownode to process insert requests asynchronously, handling data consistency and error situations effectively.

src/flow/src/plan.rs (4)

26-26: Imports are correctly added.

The new imports for error handling and expression utilities are correctly placed and used effectively in the file.


26-26: Method mfp is well-implemented.

The method correctly handles the application of map-filter-project operations with appropriate error handling and logical flow.


26-26: Method projection is correctly implemented.

The projection logic and error handling in this method are correctly implemented, maintaining the integrity of the data flow plan.


26-26: Method filter functions as expected.

The addition of a filter to the plan is implemented correctly, with appropriate error handling and logical consistency.

src/flow/src/heartbeat.rs (5)

17-31: Imports are correctly added.

The new imports for synchronization, error handling, and meta client utilities are correctly placed and used effectively in the file.


Line range hint 49-66: Constructor new is well-implemented.

The constructor of HeartbeatTask correctly initializes all fields and handles default values appropriately.


71-78: Method start functions as expected.

The start logic and error handling in this method are correctly implemented, ensuring the heartbeat task starts appropriately.


99-109: Method close functions as expected.

The close operation in this method is implemented correctly, with appropriate error handling and logical consistency.


227-259: Function new_metasrv_client is well-implemented.

The logic for creating a new meta server client and error handling in this function are correctly implemented, ensuring robust client creation.

tests-integration/src/standalone.rs (1)

Line range hint 159-224: Method build_with is well-implemented.

The method correctly handles the building of a standalone database instance with appropriate error handling and logical flow.

src/flow/src/transform/literal.rs (1)

29-29: Function from_substrait_literal is well-implemented.

The function correctly handles the conversion of Substrait literals into Value and ConcreteDataType with appropriate error handling and logical flow.

src/flow/src/server.rs (3)

106-116: Constructor new is well-implemented.

The constructor of FlownodeServer correctly initializes all fields and handles default values appropriately.


177-205: Method start functions as expected.

The start logic and error handling in this method are correctly implemented, ensuring the flow node server starts appropriately.


185-206: Method start in FlownodeInstance is correctly implemented.

The start operation in this method is implemented correctly, with appropriate error handling and logical consistency.

src/flow/src/expr/relation/func.rs (1)

27-27: Review of Added Imports:

The newly added import statement introduces several error handling modules which are essential for managing exceptions in the aggregate functions defined in this file. This is a good practice as it ensures robust error handling.

src/cmd/src/flownode.rs (6)

15-35: Review of Imports:

The imports are appropriately organized and cover a broad range of functionalities from error handling to configuration management, which are necessary for the operations defined in this file.


47-69: Check Implementation of Instance Struct:

The Instance struct is well-implemented with clear encapsulation of flownode and logging guards. The constructor and accessor methods are correctly defined, promoting good encapsulation practices.


71-87: Review App Implementation for Instance:

The App trait implementation for Instance is concise and leverages contextual error handling using Snafu. The methods start and stop are straightforward, making the code easy to understand and maintain.


89-105: Check Command Struct and Methods:

The Command struct is well-defined with clear subcommand handling. The methods build and load_options are efficiently implemented, utilizing pattern matching to delegate to the appropriate subcommand. This structure enhances modularity and maintainability.


107-118: Review SubCommand Enum and Its Implementation:

The SubCommand enum is minimalistic and well-suited for extension. The build method in its implementation effectively delegates the building process to the nested StartCommand, adhering to the single responsibility principle.


120-296: Detailed Check on StartCommand Struct and Methods:

The StartCommand struct handles CLI options and their merging into FlownodeOptions. The methods are well-documented and handle edge cases, such as missing configuration, appropriately. The async build method is particularly well-implemented, integrating various components and handling potential errors gracefully.

src/flow/src/transform.rs (2)

40-40: Review of Added Import:

The import of FlownodeContext is crucial for the functionality of transforming SQL to a flow plan as it likely provides necessary context for the operation. This import is justified and correctly placed.


41-41: Check Error Handling Modifications:

The expanded error handling coverage in this file is appropriate for the additional complexities introduced by the new functionalities. It ensures robust and user-friendly error reporting.

src/cmd/src/error.rs (2)

90-102: Review New Error Variants for flownode:

The new error variants StartFlownode and ShutdownFlownode are well-defined and include appropriate metadata for debugging. These additions are necessary for robust error handling in the newly introduced flownode functionalities.


397-399: Check Status Code Method Update:

The update to the status_code method to include the new flownode errors is correctly implemented. This ensures that the error handling remains consistent and informative across the system.

src/flow/src/adapter/node_context.rs (1)

27-27: Review of Added Imports:

The added imports for error handling are essential for the operations defined in this file, particularly for handling table not found errors and internal errors. This is a good practice as it ensures robust error handling.

src/flow/src/adapter.rs (1)

80-101: Ensure default values align with system expectations.

The default configuration for FlownodeOptions is critical. Verify that these defaults align with the expected production and development configurations to prevent unexpected behaviors.

src/cmd/src/standalone.rs (1)

525-530: Set up proper lifecycle management for flownode.

The method set_frontend_invoker is crucial for integrating the flownode with the frontend. Ensure that this integration is tested thoroughly to prevent issues in production.

src/meta-client/src/client.rs (1)

103-108: New Method: flownode_default_options

The addition of the flownode_default_options method aligns with the PR objectives to support the flownode functionality. This method correctly initializes a MetaClientBuilder instance with the Flownode role and enables necessary features like store and heartbeat.

src/flow/src/expr/scalar.rs (2)

35-35: Updated Error Imports and Definitions

The inclusion of additional error types such as UnsupportedTemporalFilterSnafu in the imports section is crucial for handling specific error scenarios in scalar expressions. This change supports enhanced error granularity and better error messaging, which is essential for debugging and user feedback.


307-307: Enhanced Error Handling in Scalar Function Implementations

The updates to error handling in methods like from_proto and get_fn_impl improve the clarity and specificity of error messages. This is crucial for troubleshooting and maintaining the robustness of the system. The use of context-specific error tags helps in quickly identifying the source of issues.

Also applies to: 318-318, 394-394

src/flow/src/compute/render/reduce.rs (1)

28-28: Imports and struct declaration look appropriate.

The new imports and the ReduceArrange struct are well-integrated and serve the expected functionalities within the file.

src/flow/src/compute/render/reduce.rs Show resolved Hide resolved
src/flow/src/compute/render/map.rs Show resolved Hide resolved
src/flow/src/adapter.rs Show resolved Hide resolved
src/cmd/src/error.rs Outdated Show resolved Hide resolved
src/flow/src/heartbeat.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@killme2008 killme2008 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job! After this PR, can we add some integration tests for flow into tests-integration?

src/cmd/src/flownode.rs Outdated Show resolved Hide resolved
src/cmd/src/flownode.rs Outdated Show resolved Hide resolved
src/flow/src/error.rs Outdated Show resolved Hide resolved
src/flow/src/error.rs Outdated Show resolved Hide resolved
src/flow/src/error.rs Outdated Show resolved Hide resolved
src/flow/src/heartbeat.rs Outdated Show resolved Hide resolved
src/flow/src/server.rs Show resolved Hide resolved
src/flow/src/server.rs Outdated Show resolved Hide resolved
src/flow/src/server.rs Outdated Show resolved Hide resolved
src/flow/src/server.rs Outdated Show resolved Hide resolved
@discord9 discord9 changed the title feat: flow cli for distributed feat!: flow cli for distributed Jul 1, 2024
@github-actions github-actions bot added breaking-change This pull request contains breaking changes. docs-required This change requires docs update. and removed docs-not-required This change does not impact docs. labels Jul 1, 2024
@github-actions github-actions bot added docs-not-required This change does not impact docs. and removed docs-required This change requires docs update. labels Jul 1, 2024
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Outside diff range and nitpick comments (1)
src/flow/src/heartbeat.rs (1)

Line range hint 211-290: Handling of heartbeat response stream in start_handle_resp_stream.

The function properly handles various states of the response stream, including errors and stream completion. The retry logic on error is a critical feature for maintaining the service's robustness. Consider adding more detailed logging at each step to improve traceability and debugging.

- error!(e; "Occur error while reading heartbeat response");
+ error!(e; "Error occurred while reading heartbeat response. Retrying...");
Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between db85f81 and 63691f6.

Files selected for processing (10)
  • src/cmd/src/datanode.rs (1 hunks)
  • src/cmd/src/error.rs (2 hunks)
  • src/cmd/src/flownode.rs (1 hunks)
  • src/cmd/src/frontend.rs (2 hunks)
  • src/flow/src/adapter.rs (8 hunks)
  • src/flow/src/adapter/node_context.rs (3 hunks)
  • src/flow/src/error.rs (2 hunks)
  • src/flow/src/heartbeat.rs (5 hunks)
  • src/flow/src/server.rs (4 hunks)
  • tests/runner/src/env.rs (2 hunks)
Files skipped from review as they are similar to previous changes (4)
  • src/cmd/src/error.rs
  • src/flow/src/adapter.rs
  • src/flow/src/adapter/node_context.rs
  • src/flow/src/error.rs
Additional comments not posted (13)
src/flow/src/heartbeat.rs (4)

49-49: Addition of running field to HeartbeatTask struct.

The new running field is a good addition for controlling the execution state of the heartbeat task. Ensure that it is used correctly in the start and shutdown methods to prevent race conditions.


99-107: Concurrency control in HeartbeatTask.shutdown().

Similar to the start method, the use of compare_exchange in shutdown is appropriate for ensuring that the task is not shut down multiple times. However, adding an informational log when shutdown is called multiple times could be beneficial for debugging.


228-260: New method new_metasrv_client for creating MetaService clients.

This method efficiently handles the creation of MetaService clients, including setting timeouts and managing TCP connections. Ensure that the error handling is robust, especially for network-related errors.


71-78: Concurrency control in HeartbeatTask.start().

The use of compare_exchange for managing concurrency in starting the heartbeat task is appropriate. However, consider adding an error log or exception if starting the task multiple times is considered an error state.

src/flow/src/server.rs (3)

53-59: Addition of manager field to FlowService struct.

The addition of the manager field allows for centralized management of flow operations. Ensure that access to this manager is synchronized across different parts of the application to prevent race conditions.


107-117: Constructor method for FlownodeServer.

The constructor method correctly initializes the FlownodeServer with its dependencies. Ensure that all necessary fields are properly initialized to prevent null reference errors.


214-299: Builder pattern implementation in FlownodeBuilder.

The builder pattern is well implemented, providing a clear and flexible way to construct a FlownodeInstance. Ensure that all optional components are handled correctly to avoid runtime errors.

src/cmd/src/flownode.rs (2)

47-60: Addition of _guard field to Instance struct.

The _guard field is used to manage the lifecycle of logging workers, which is essential for proper logging functionality. Ensure that this field is properly maintained throughout the application's lifecycle to prevent premature dropping of logs.


71-87: Implementation of App trait for Instance.

The implementation of the App trait allows for standardized application control flows such as start and stop. Ensure that error handling is robust, particularly for operations that might fail during startup or shutdown.

src/cmd/src/frontend.rs (2)

150-150: Addition of metasrv_addrs to StartCommand.

The addition of metasrv_addrs aligns with the PR's objective to handle multiple meta service addresses. Ensure that this change is well-documented and tested, especially since it is a breaking change.


286-286: Cache registry creation in StartCommand.build().

The method for creating cache registries is well implemented, ensuring efficient management of metadata caching. Consider adding helper functions to simplify this complex setup, as indicated by the TODO comment.

- // TODO(discord9): add helper function to ease the creation of cache registry&such
+ // Refactor: Consider implementing a helper function to streamline cache registry creation.

Likely invalid or redundant comment.

src/cmd/src/datanode.rs (1)

128-128: Approved: Addition of metasrv_addrs CLI option.

The new CLI option metasrv_addrs correctly expects a list of strings, facilitating the transition to a distributed metadata server setup. Ensure that this new option is well integrated and tested with the rest of the system, particularly where metadata server addresses are utilized.

Verification successful

Verified: Integration of metasrv_addrs CLI option.

The new metasrv_addrs CLI option is well integrated across the system, including configurations, heartbeat modules, and tests. The widespread and consistent usage of metasrv_addrs confirms that the transition to support distributed metadata servers has been thoroughly implemented and tested.

  • Integration points include:
    • Configuration handling (e.g., src/common/config/src/config.rs)
    • Heartbeat modules (e.g., src/flow/src/heartbeat.rs)
    • Tests (e.g., src/cmd/tests/load_config_test.rs)
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify integration of `metasrv_addrs` with system components.
rg --type rust 'metasrv_addrs'

Length of output: 3205

tests/runner/src/env.rs (1)

230-230: Approved: Integration of metasrv_addrs in test environment setup.

The use of metasrv_addrs in the test environment setup scripts is consistent with the changes in the main application. This ensures that tests will accurately reflect the distributed nature of the metadata servers. Verify that this setup correctly initializes all components during testing.

Also applies to: 306-306

src/flow/src/server.rs Outdated Show resolved Hide resolved
src/cmd/src/flownode.rs Show resolved Hide resolved
@discord9
Copy link
Contributor Author

discord9 commented Jul 1, 2024

Good job! After this PR, can we add some integration tests for flow into tests-integration?

Will add some tests&docs update after impl FrontendInvoker and make flownode actually runnable in next PR

src/cmd/src/flownode.rs Show resolved Hide resolved
src/flow/src/heartbeat.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Outside diff range and nitpick comments (4)
src/flow/src/compute/render/reduce.rs (4)

Line range hint 41-43: Optimize the handling of distinct inputs.

The function can be optimized to handle distinct inputs more efficiently, possibly by caching results or reducing redundant calculations.

- let distinct_input = self.add_accum_distinct_input_arrange(&reduce_plan);
+ let distinct_input = self.compute_state.cache_or_create_distinct_input(&reduce_plan);

Line range hint 45-46: Improve error handling in reduce_subgraph.

Consider enhancing the error handling in this function to provide more informative error messages and recover from potential failures more gracefully.

- reduce_subgraph(
-     &reduce_arrange,
+ if let Err(e) = reduce_subgraph(
+     &reduce_arrange,
      data,
      &key_val_plan,
      &reduce_plan,
      SubgraphArg {
          now: *now.borrow(),
          err_collector: &err_collector,
          scheduler: &scheduler_inner,
          send,
-     },
- );
+     },
+ ) {
+     err_collector.push_err(e);
+ }

Line range hint 48-51: Enhance performance and clarity of reduce_subgraph.

The function can be optimized for better performance and clarity. Consider refactoring to improve the structure and efficiency of the reduce operations.

- reduce_subgraph(
-     &reduce_arrange,
+ let reduced_data = reduce_subgraph(
      data,
      &key_val_plan,
      &reduce_plan,
      SubgraphArg {
          now: *now.borrow(),
          err_collector: &err_collector,
          scheduler: &scheduler_inner,
          send,
-     },
- );
+     },
+ )?;
+ send.give(reduced_data);

Line range hint 53-56: Consider simplifying render_reduce function.

The function is complex and handles multiple operations. Consider breaking it down into smaller, more manageable functions to improve readability and maintainability.

- pub fn render_reduce(
+ pub fn render_reduce(
    &mut self,
    input: Box<TypedPlan>,
    key_val_plan: KeyValPlan,
    reduce_plan: ReducePlan,
    output_type: RelationType,
) -> Result<CollectionBundle, Error> {
    let input = self.render_plan(*input)?;
    let bundle = self.reduce_plan_to_bundle(input, key_val_plan, reduce_plan, output_type)?;
    Ok(bundle)
}

fn reduce_plan_to_bundle(
    &mut self,
    input: Collection,
    key_val_plan: KeyValPlan,
    reduce_plan: ReducePlan,
    output_type: RelationType,
) -> Result<CollectionBundle, Error> {
    // existing implementation
}
Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 63691f6 and 6cd9c75.

Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
Files selected for processing (38)
  • src/cmd/src/bin/greptime.rs (3 hunks)
  • src/cmd/src/datanode.rs (1 hunks)
  • src/cmd/src/error.rs (2 hunks)
  • src/cmd/src/flownode.rs (1 hunks)
  • src/cmd/src/frontend.rs (2 hunks)
  • src/cmd/src/lib.rs (1 hunks)
  • src/cmd/src/standalone.rs (5 hunks)
  • src/flow/Cargo.toml (1 hunks)
  • src/flow/src/adapter.rs (8 hunks)
  • src/flow/src/adapter/flownode_impl.rs (1 hunks)
  • src/flow/src/adapter/node_context.rs (3 hunks)
  • src/flow/src/adapter/table_source.rs (1 hunks)
  • src/flow/src/adapter/util.rs (1 hunks)
  • src/flow/src/adapter/worker.rs (1 hunks)
  • src/flow/src/compute/render.rs (1 hunks)
  • src/flow/src/compute/render/map.rs (1 hunks)
  • src/flow/src/compute/render/reduce.rs (1 hunks)
  • src/flow/src/compute/render/src_sink.rs (1 hunks)
  • src/flow/src/error.rs (2 hunks)
  • src/flow/src/expr/func.rs (1 hunks)
  • src/flow/src/expr/linear.rs (1 hunks)
  • src/flow/src/expr/relation/func.rs (1 hunks)
  • src/flow/src/expr/scalar.rs (4 hunks)
  • src/flow/src/heartbeat.rs (5 hunks)
  • src/flow/src/lib.rs (1 hunks)
  • src/flow/src/plan.rs (1 hunks)
  • src/flow/src/repr/relation.rs (1 hunks)
  • src/flow/src/server.rs (4 hunks)
  • src/flow/src/transform.rs (1 hunks)
  • src/flow/src/transform/aggr.rs (1 hunks)
  • src/flow/src/transform/expr.rs (1 hunks)
  • src/flow/src/transform/literal.rs (1 hunks)
  • src/flow/src/transform/plan.rs (1 hunks)
  • src/meta-client/src/client.rs (1 hunks)
  • src/meta-srv/src/cache_invalidator.rs (1 hunks)
  • src/meta-srv/src/service/mailbox.rs (5 hunks)
  • tests-integration/src/standalone.rs (2 hunks)
  • tests/runner/src/env.rs (2 hunks)
Files skipped from review due to trivial changes (9)
  • src/cmd/src/lib.rs
  • src/flow/Cargo.toml
  • src/flow/src/adapter/util.rs
  • src/flow/src/adapter/worker.rs
  • src/flow/src/compute/render/map.rs
  • src/flow/src/expr/func.rs
  • src/flow/src/repr/relation.rs
  • src/flow/src/transform/aggr.rs
  • src/flow/src/transform/expr.rs
Additional comments not posted (72)
src/flow/src/lib.rs (2)

28-28: Add Documentation for New Module.

The heartbeat module has been added. Consider adding a brief comment explaining its purpose.

mod heartbeat; // Handles heartbeat tasks for Flownode.

30-33: Update Documentation for Server Module.

The server module now includes FlowWorkerManager. Update the documentation to reflect this.

mod server; // Manages server operations including FlowWorkerManager.
src/meta-srv/src/cache_invalidator.rs (1)

71-74: Add Logging for Flownode Broadcast.

Consider adding logging for the Flownode broadcast to help with debugging and monitoring.

self.mailbox
    .broadcast(&BroadcastChannel::Flownode, msg)
    .await
    .map_err(BoxedError::new)
    .context(meta_error::ExternalSnafu)?;
log::info!("Broadcasted to Flownode: {:?}", msg);
src/cmd/src/bin/greptime.rs (2)

20-20: Sort Imports Alphabetically.

Consider sorting the imports alphabetically for better readability and maintenance.

use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};

40-43: Add Documentation for Flownode Subcommand.

Consider adding a brief comment explaining the purpose of the flownode subcommand.

/// Start flownode service.
#[clap(name = "flownode")]
Flownode(flownode::Command),
src/meta-srv/src/service/mailbox.rs (5)

36-36: Add Documentation for Flownode Channel.

Consider adding a brief comment explaining the purpose of the Flownode channel variant.

Flownode(u64), // Channel for Flownode communication

48-50: Add Documentation for Flownode Channel Display.

Consider adding a brief comment explaining the Display implementation for the Flownode channel.

Channel::Flownode(id) => {
    write!(f, "Flownode-{}", id)
}

67-67: Add Documentation for Flownode BroadcastChannel.

Consider adding a brief comment explaining the purpose of the Flownode broadcast channel variant.

Flownode, // Broadcast channel for Flownode communication

81-84: Add Documentation for Flownode BroadcastChannel Range.

Consider adding a brief comment explaining the pusher_range implementation for the Flownode broadcast channel.

BroadcastChannel::Flownode => Range {
    start: format!("{}-", Role::Flownode as i32),
    end: format!("{}-", Role::Flownode as i32 + 1),
},

60-60: Ensure Consistent Role Naming.

Ensure that the role naming for Flownode is consistent with other roles.

Verification successful

Ensure Consistent Role Naming.

The role naming for Flownode is consistent with other roles within the Role enum.

  • src/common/meta/src/cluster.rs:100-104
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the role naming consistency for Flownode.

# Test: Search for the role definition. Expect: Consistent naming.
rg --type rust $'Role::Flownode'

Length of output: 1020



Script:

#!/bin/bash
# Description: Locate the definition of the Role enum and analyze its structure and usage.

# Locate the definition of the Role enum.
ast-grep --lang rust --pattern 'enum Role { $$$ }'

Length of output: 52



Script:

#!/bin/bash
# Description: Locate the definition of the Role enum using ripgrep.

# Search for the definition of the Role enum.
rg --type rust 'enum Role'

Length of output: 75



Script:

#!/bin/bash
# Description: Extract the definition of the Role enum from the located file.

# Extract the definition of the Role enum from src/common/meta/src/cluster.rs.
ast-grep --lang rust --pattern 'pub enum Role { $$$ }' src/common/meta/src/cluster.rs

Length of output: 369

src/flow/src/adapter/table_source.rs (5)

Line range hint 72-80: Verify error handling consistency.

Ensure that the use of with_context for error handling is consistent with the rest of the codebase.

Verification successful

Line range hint 72-80: Verified error handling consistency.

The use of with_context for error handling is consistent with the rest of the codebase.

  • src/servers/src/tls.rs
  • src/store-api/src/region_request.rs
  • src/common/time/src/timestamp.rs
  • src/table/src/metadata.rs
  • src/datanode/src/region_server.rs
  • src/common/meta/src/ddl_manager.rs
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the consistency of error handling using `with_context`.

# Test: Search for other instances of `with_context` usage. Expect: Consistent error handling.
rg --type rust 'with_context'

Length of output: 28072


Line range hint 85-115: Verify error handling consistency.

Ensure that the use of with_context for error handling is consistent with the rest of the codebase.

Verification successful

Consistent Error Handling with with_context Verified

The use of with_context for error handling in the provided snippet is consistent with the rest of the codebase, particularly in similar contexts involving table-related operations.

  • Instances of with_context usage for table-related errors can be found in various files, such as src/query/src/sql.rs, src/datanode/src/region_server.rs, src/script/src/table.rs, and src/frontend/src/instance.rs.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the consistency of error handling using `with_context`.

# Test: Search for other instances of `with_context` usage. Expect: Consistent error handling.
rg --type rust 'with_context'

Length of output: 28072


Line range hint 31-39: Verify error handling consistency.

Ensure that the use of with_context for error handling is consistent with the rest of the codebase.

Verification successful

Line range hint 31-39: Verified error handling consistency.

The usage of with_context for error handling in src/flow/src/adapter/table_source.rs is consistent with the rest of the codebase.

  • Instances of with_context found in src/flow/src/adapter/table_source.rs:
    • src/flow/src/adapter/table_source.rs: .with_context(|_| TableNotFoundMetaSnafu {
    • src/flow/src/adapter/table_source.rs: .with_context(|| UnexpectedSnafu {
    • src/flow/src/adapter/table_source.rs: .with_context(|_| TableNotFoundMetaSnafu {
    • src/flow/src/adapter/table_source.rs: .with_context(|| UnexpectedSnafu {
    • src/flow/src/adapter/table_source.rs: .with_context(|_| TableNotFoundMetaSnafu {
    • src/flow/src/adapter/table_source.rs: .with_context(|| TableNotFoundSnafu {

These instances align with the general pattern observed in other files.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the consistency of error handling using `with_context`.

# Test: Search for other instances of `with_context` usage. Expect: Consistent error handling.
rg --type rust 'with_context'

Length of output: 28072


Line range hint 44-52: Verify error handling consistency.

Ensure that the use of with_context for error handling is consistent with the rest of the codebase.

Verification successful

Verify error handling consistency in src/flow/src/adapter/table_source.rs.

The usage of with_context for error handling in the function get_table_id_from_proto_name appears consistent with other instances in the codebase. This can be confirmed by examining the specific instances in src/flow/src/adapter/table_source.rs:

  • src/flow/src/adapter/table_source.rs: .with_context(|_| TableNotFoundMetaSnafu {
  • src/flow/src/adapter/table_source.rs: .with_context(|| UnexpectedSnafu {
  • src/flow/src/adapter/table_source.rs: .with_context(|_| TableNotFoundMetaSnafu {
  • src/flow/src/adapter/table_source.rs: .with_context(|| UnexpectedSnafu {
  • src/flow/src/adapter/table_source.rs: .with_context(|_| TableNotFoundMetaSnafu {
  • src/flow/src/adapter/table_source.rs: .with_context(|| TableNotFoundSnafu {

These usages indicate that the error handling pattern is consistent within the file and aligns with the rest of the codebase.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the consistency of error handling using `with_context`.

# Test: Search for other instances of `with_context` usage. Expect: Consistent error handling.
rg --type rust 'with_context'

Length of output: 28072


Line range hint 57-67: Verify error handling consistency.

Ensure that the use of context for error handling is consistent with the rest of the codebase.

src/flow/src/error.rs (4)

166-171: Verify error handling consistency.

Ensure that the error handling for StartServer is consistent with the rest of the codebase.

Verification successful

Verified error handling consistency.

The error handling for StartServer is consistent with the rest of the codebase.

  • src/frontend/src/server.rs
  • src/frontend/src/instance.rs
  • src/flow/src/server.rs
  • src/datanode/src/datanode.rs
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the consistency of error handling for `StartServer`.

# Test: Search for other instances of `StartServer` usage. Expect: Consistent error handling.
rg --type rust 'StartServer'

Length of output: 2036


173-178: Verify error handling consistency.

Ensure that the error handling for ShutdownServer is consistent with the rest of the codebase.

Verification successful

Error handling for ShutdownServer is consistent.

The error handling for the ShutdownServer variant is consistent across the codebase.

  • src/frontend/src/instance.rs
  • src/meta-srv/src/bootstrap.rs
  • src/flow/src/server.rs
  • src/datanode/src/datanode.rs
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the consistency of error handling for `ShutdownServer`.

# Test: Search for other instances of `ShutdownServer` usage. Expect: Consistent error handling.
rg --type rust 'ShutdownServer'

Length of output: 1170


180-185: Verify error handling consistency.

Ensure that the error handling for MetaClientInit is consistent with the rest of the codebase.

Verification successful

Error handling for MetaClientInit is consistent.

The usage of MetaClientInitSnafu for error handling is consistent across the codebase, as seen in src/flow/src/heartbeat.rs and src/datanode/src/heartbeat.rs.

  • src/flow/src/heartbeat.rs
  • src/datanode/src/heartbeat.rs
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the consistency of error handling for `MetaClientInit`.

# Test: Search for other instances of `MetaClientInit` usage. Expect: Consistent error handling.
rg --type rust 'MetaClientInit'

Length of output: 866



Script:

#!/bin/bash
# Description: Inspect usage of `MetaClientInitSnafu` in identified files for consistent error handling.

# Check usage in src/flow/src/heartbeat.rs
echo "Checking src/flow/src/heartbeat.rs"
rg --context 5 'MetaClientInitSnafu' src/flow/src/heartbeat.rs

# Check usage in src/datanode/src/heartbeat.rs
echo "Checking src/datanode/src/heartbeat.rs"
rg --context 5 'MetaClientInitSnafu' src/datanode/src/heartbeat.rs

Length of output: 2229


187-192: Verify error handling consistency.

Ensure that the error handling for ParseAddr is consistent with the rest of the codebase.

Verification successful

Error handling for ParseAddr is consistent.

The error handling for the ParseAddr variant is consistent across the codebase. Each instance uses the context method with the ParseAddrSnafu context, and the error is appropriately mapped to StatusCode::InvalidArguments.

  • Files with consistent error handling:
    • src/meta-srv/src/bootstrap.rs
    • src/meta-srv/src/error.rs
    • src/flow/src/server.rs
    • src/flow/src/error.rs
    • src/frontend/src/server.rs
    • src/frontend/src/error.rs
    • src/datanode/src/error.rs
    • src/datanode/src/service.rs
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the consistency of error handling for `ParseAddr`.

# Test: Search for other instances of `ParseAddr` usage. Expect: Consistent error handling.
rg --type rust 'ParseAddr'

Length of output: 1181



Script:

#!/bin/bash
# Description: Verify the consistency of error handling for `ParseAddr` in the identified files.

# Test: Extract lines around the usage of `ParseAddr` to check for consistent error handling.

# Extracting lines around usage in bootstrap.rs
rg -A 5 -B 5 'ParseAddr' src/meta-srv/src/bootstrap.rs

# Extracting lines around usage in error.rs (meta-srv)
rg -A 5 -B 5 'ParseAddr' src/meta-srv/src/error.rs

# Extracting lines around usage in server.rs (flow)
rg -A 5 -B 5 'ParseAddr' src/flow/src/server.rs

# Extracting lines around usage in error.rs (flow)
rg -A 5 -B 5 'ParseAddr' src/flow/src/error.rs

# Extracting lines around usage in server.rs (frontend)
rg -A 5 -B 5 'ParseAddr' src/frontend/src/server.rs

# Extracting lines around usage in error.rs (frontend)
rg -A 5 -B 5 'ParseAddr' src/frontend/src/error.rs

# Extracting lines around usage in error.rs (datanode)
rg -A 5 -B 5 'ParseAddr' src/datanode/src/error.rs

# Extracting lines around usage in service.rs (datanode)
rg -A 5 -B 5 'ParseAddr' src/datanode/src/service.rs

Length of output: 5847

src/flow/src/adapter/flownode_impl.rs (2)

Line range hint 77-138: Verify error handling consistency and fetch order logic.

Ensure that the error handling using map_err is consistent with the rest of the codebase. Additionally, verify the efficiency and correctness of the fetch order logic.

Verification successful

Error handling and fetch order logic verified

The error handling using map_err in the handle_inserts function is consistent with other instances across the codebase. The fetch order logic appears efficient and correctly implemented.

  • Instances of map_err across the codebase show consistent error handling.
  • The fetch order logic is implemented correctly and efficiently.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the consistency of error handling using `map_err` and the efficiency and correctness of the fetch order logic.

# Test: Search for other instances of `map_err` usage. Expect: Consistent error handling.
rg --type rust 'map_err'

# Test: Verify the efficiency and correctness of the fetch order logic. Expect: Efficient and correct logic.
rg --type rust 'fetch_order'

Length of output: 67499


Line range hint 41-75: Verify error handling consistency and match arm coverage.

Ensure that the error handling using map_err is consistent with the rest of the codebase. Additionally, verify that all match arms are covered.

src/flow/src/compute/render/src_sink.rs (3)

Line range hint 34-80: Verify error handling consistency and robustness of broadcast error handling.

Ensure that the error handling using try_recv is consistent with the rest of the codebase. Additionally, verify the robustness of handling broadcast errors.


Line range hint 101-154: Verify error handling consistency and robustness of broadcast error handling.

Ensure that the error handling using send is consistent with the rest of the codebase. Additionally, verify the robustness of handling broadcast errors.


Line range hint 82-99: Verify error handling consistency and address TODO comment.

Ensure that the error handling using send is consistent with the rest of the codebase. Additionally, address the TODO comment for handling errors.

src/flow/src/plan.rs (1)

26-26: Import Change Approved

The addition of Error from crate::error is appropriate and aligns with the error handling updates in the file.

tests-integration/src/standalone.rs (3)

159-159: Ensure Flownode Build Success

The use of unwrap() ensures that the build process does not fail silently. This is acceptable in a test context.


Line range hint 163-224: Integrate Flownode with Frontend and Run in Background

The usage of flow_worker_manager to set the frontend invoker and run in the background is necessary for the integration and operation of the flownode.


224-224: Run Flownode in Background

Running the flow_worker_manager in the background ensures continuous operation of the flownode, which is essential for the test setup.

src/flow/src/transform/literal.rs (1)

29-29: Import Change Approved

The addition of Error, NotImplementedSnafu, and PlanSnafu from crate::error is appropriate and aligns with the error handling updates in the file.

src/flow/src/expr/relation/func.rs (1)

27-27: Import Change Approved

The addition of DatafusionSnafu, Error, and InvalidQuerySnafu from crate::error is appropriate and aligns with the error handling updates in the file.

src/flow/src/heartbeat.rs (4)

82-82: Consider adding more detailed logging for troubleshooting.

Adding detailed logging for each step can aid in troubleshooting and monitoring the heartbeat task.

info!("Heartbeat task initialization complete. Starting heartbeat reporting and response handling.");

104-104: Consider adding more detailed logging for troubleshooting.

Adding detailed logging for each step can aid in troubleshooting and monitoring the shutdown process.

info!("Heartbeat task shutdown initiated.");

240-272: LGTM!

The function new_metasrv_client is well-implemented with proper error handling and logging.


Line range hint 157-204: LGTM!

The function start_handle_resp_stream is well-implemented with proper error handling and retry logic.

src/flow/src/server.rs (4)

Line range hint 53-68: LGTM!

The FlowService struct and its implementation are well-implemented with proper error handling.


107-117: LGTM!

The FlownodeServer struct and its implementation are well-implemented with proper error handling and logging.


178-207: Consider adding more detailed logging for troubleshooting.

Adding detailed logging for each step can aid in troubleshooting and monitoring the flownode instance lifecycle.

info!("Flownode instance start initiated.");
info!("Flownode instance shutdown initiated.");

215-300: LGTM!

The FlownodeBuilder struct and its implementation are well-implemented with proper error handling and logging.

src/cmd/src/flownode.rs (4)

47-87: LGTM!

The Instance struct and its implementation are well-implemented with proper error handling and logging.


89-105: LGTM!

The Command struct and its implementation are well-implemented with proper error handling and logging.


107-118: LGTM!

The SubCommand enum and its implementation are well-implemented with proper error handling and logging.


120-301: Consider adding more detailed logging for troubleshooting.

Adding detailed logging for each step can aid in troubleshooting and monitoring the flownode start command.

info!("StartCommand options loaded: {:#?}", opts);
info!("StartCommand build process initiated.");
src/flow/src/transform.rs (5)

Line range hint 63-96: LGTM!

The FunctionExtensions struct and its implementation are well-implemented with proper error handling.


Line range hint 99-118: LGTM!

The function sql_to_flow_plan is well-implemented with proper error handling and logging.


Line range hint 121-123: LGTM!

The function register_function_to_query_engine is well-implemented and correctly registers the TumbleFunction.


Line range hint 126-156: LGTM!

The TumbleFunction struct and its implementation are well-implemented. The eval method correctly returns an error for unimplemented functionality.


Line range hint 159-237: LGTM!

The test module is well-implemented with proper setup and assertions for testing the transformation functionality.

src/cmd/src/error.rs (2)

90-102: Add more descriptive error messages for StartFlownode and ShutdownFlownode.

Consider providing more context in the error messages to help with debugging.

-    #[snafu(display("Failed to start flownode"))]
+    #[snafu(display("Failed to start flownode at {}", location))]
-    #[snafu(display("Failed to shutdown flownode"))]
+    #[snafu(display("Failed to shutdown flownode at {}", location))]

397-399: Ensure status_code method handles StartFlownode and ShutdownFlownode correctly.

The status_code method correctly maps the new error variants to their respective status codes.

src/flow/src/compute/render.rs (1)

37-37: Ensure consistency in error handling.

The render_plan method now uses NotImplementedSnafu for handling unsupported operations. Ensure this is consistent with other methods in the file.

Verification successful

Consistent error handling verified.

The NotImplementedSnafu error handling for unsupported operations is consistently applied within the render_plan method.

  • render_plan method: Consistent usage for Plan::Join and Plan::Union.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify consistency in error handling for unsupported operations.

# Test: Search for `NotImplementedSnafu` usage in the file. Expect: Consistent usage.
rg --type rust $'NotImplementedSnafu' src/flow/src/compute/render.rs

Length of output: 267



Script:

#!/bin/bash
# Description: Verify consistency in error handling for unsupported operations within methods.

# List all methods in the file
ast-grep --lang rust --pattern 'fn $_($$$) { $$$ }' src/flow/src/compute/render.rs

# Search for NotImplementedSnafu usage within methods
rg --type rust 'NotImplementedSnafu' src/flow/src/compute/render.rs -A 5 -B 5

Length of output: 8318

src/flow/src/transform/plan.rs (1)

25-25: Ensure consistency in error handling.

The from_substrait_plan method now uses InvalidQuerySnafu for handling invalid queries. Ensure this is consistent with other methods in the file.

Verification successful

Ensure consistency in error handling.

The from_substrait_plan method uses InvalidQuerySnafu for handling invalid queries, which is consistent with other methods in the file.

  • src/flow/src/transform/plan.rs
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify consistency in error handling for invalid queries.

# Test: Search for `InvalidQuerySnafu` usage in the file. Expect: Consistent usage.
rg --type rust $'InvalidQuerySnafu' src/flow/src/transform/plan.rs

Length of output: 361



Script:

#!/bin/bash
# Description: Verify consistency in error handling for other error types.

# Test: Search for other Snafu error types usage in the file. Expect: Consistent usage in similar contexts.
rg --type rust 'PlanSnafu|UnexpectedSnafu|InternalSnafu|NotImplementedSnafu' src/flow/src/transform/plan.rs

Length of output: 283

tests/runner/src/env.rs (2)

230-230: Ensure correct usage of metasrv-addrs.

The new CLI option metasrv-addrs is added. Verify that this change is correctly integrated and all references to the old metasrv-addr are updated.

Verification successful

The new CLI option metasrv-addrs is correctly integrated, and all references to the old metasrv-addr have been updated.

  • The old option metasrv-addr is no longer present in the codebase.
  • The new option metasrv-addrs is used correctly where applicable.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify all references to `metasrv-addrs` are correctly updated.

# Test: Search for the usage of `metasrv-addrs` across the codebase. Expect: Only occurances of the new option.
rg --type rust $'metasrv-addrs'

Length of output: 209



Script:

#!/bin/bash
# Description: Verify all references to the old option `metasrv-addr` are removed or replaced.

# Test: Search for the usage of `metasrv-addr` across the codebase. Expect: No occurrences of the old option.
rg --type rust $'metasrv-addr'

Length of output: 208


306-306: Ensure correct usage of metasrv-addrs.

The new CLI option metasrv-addrs is added. Verify that this change is correctly integrated and all references to the old metasrv-addr are updated.

Verification successful

Ensure correct usage of metasrv-addrs.

The new CLI option metasrv-addrs is added and correctly integrated. All references to the old metasrv-addr have been updated.

  • Verified in tests/runner/src/env.rs
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify all references to `metasrv-addrs` are correctly updated.

# Test: Search for the usage of `metasrv-addrs` across the codebase. Expect: Only occurances of the new option.
rg --type rust $'metasrv-addrs'

Length of output: 209



Script:

#!/bin/bash
# Description: Verify all references to `metasrv-addr` are correctly updated.

# Test: Search for the usage of `metasrv-addr` across the codebase. Expect: No occurrences of the old option.
rg --type rust $'metasrv-addr'

Length of output: 208

src/flow/src/adapter.rs (12)

24-24: Import Acknowledgement: Configurable

The import of Configurable is noted and seems relevant to the new implementation for FlownodeOptions.


29-29: Import Acknowledgement: LoggingOptions and TracingOptions

The imports of LoggingOptions and TracingOptions are noted and seem relevant to the new fields in FlownodeOptions.


35-35: Import Acknowledgement: MetaClientOptions

The import of MetaClientOptions is noted and seems relevant to the new field in FlownodeOptions.


39-39: Import Acknowledgement: HeartbeatOptions

The import of HeartbeatOptions is noted and seems relevant to the new field in FlownodeOptions.


45-45: Import Acknowledgement: watch, Mutex, and RwLock

The imports of watch, Mutex, and RwLock are noted and seem relevant to the new implementation of FlowWorkerManager.


52-52: Import Modification: error Module

The modification of the error module import is noted and seems relevant to the new error variants added.


55-55: Import Modification: sql_to_flow_plan

The modification of the sql_to_flow_plan import is noted and seems relevant to the new functionalities related to the flow plan.


67-67: Import Acknowledgement: Error

The import of Error is noted and seems relevant to the new error handling implemented in the file.


80-90: New Fields in FlownodeOptions

The addition of new fields (mode, cluster_id, meta_client, logging, tracing, heartbeat) in FlownodeOptions is noted and seems to support new functionalities.


93-103: Updated Implementation of Default for FlownodeOptions

The updated implementation of Default for FlownodeOptions is noted and sets default values for the new fields.


108-108: Implementation of Configurable for FlownodeOptions

The implementation of Configurable for FlownodeOptions is noted and seems to support configuration management.


Line range hint 111-131: Renaming and Updating FlownodeManager to FlowWorkerManager

The renaming of FlownodeManager to FlowWorkerManager and the updates in the implementation are noted and seem to reflect the new architecture and functionalities.

src/cmd/src/standalone.rs (1)

525-530: Verify the correct setting of frontend_invoker and starting flow_worker_manager.

Ensure that the frontend_invoker is correctly set and that the flow_worker_manager is correctly started in the background.

Verification successful

Verification successful for setting frontend_invoker and starting flow_worker_manager.

The frontend_invoker is correctly set using the set_frontend_invoker method, and the flow_worker_manager is correctly started in the background using the run_background method.

  • set_frontend_invoker implementation: src/flow/src/adapter.rs
  • run_background implementation: src/flow/src/adapter.rs
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the correct setting of `frontend_invoker` and starting `flow_worker_manager`.

# Test: Search for the `set_frontend_invoker` method call. Expect: Only occurances of the new signature.
rg --type rust $'set_frontend_invoker'

Length of output: 286



Script:

#!/bin/bash
# Description: Inspect the implementation of the `set_frontend_invoker` method and verify the correct usage of `run_background`.

# Step 1: Inspect the implementation of the `set_frontend_invoker` method in `src/flow/src/adapter.rs`.
ast-grep --lang rust --pattern 'fn set_frontend_invoker($_) { $$$ }' --json

# Step 2: Search for the `run_background` method to verify its implementation and usage.
rg --type rust 'fn run_background' --json

Length of output: 988



Script:

#!/bin/bash
# Description: Verify the implementation details of `set_frontend_invoker` and confirm the usage of `run_background`.

# Step 1: Search for the entire method definition of `set_frontend_invoker` in `src/flow/src/adapter.rs`.
rg --type rust 'fn set_frontend_invoker' --context 10 src/flow/src/adapter.rs

# Step 2: Search for the usage of `run_background` to confirm it starts `flow_worker_manager` correctly.
rg --type rust 'run_background' --context 10

Length of output: 5744

src/meta-client/src/client.rs (1)

103-109: Ensure the correct implementation of flownode_default_options.

The flownode_default_options method should follow the same pattern as other default options methods to ensure consistency.

Verification successful

Ensure pattern consistency with other default options methods.

Verify that the flownode_default_options method in MetaClientBuilder follows the same pattern as other default options methods to ensure consistency.


Let's list all public methods in the MetaClientBuilder implementation to identify any default options methods manually.


Ensure the correct implementation of flownode_default_options.

The flownode_default_options method follows the same pattern as other default options methods (frontend_default_options and datanode_default_options). All these methods use the Self::new method and chain additional methods like enable_store and enable_heartbeat. Therefore, the implementation is consistent.

  • frontend_default_options:

    pub fn frontend_default_options(cluster_id: ClusterId) -> Self {
        Self::new(cluster_id, 0, Role::Frontend)
            .enable_store()
            .enable_heartbeat()
            .enable_procedure()
            .enable_access_cluster_info()
    }
  • datanode_default_options:

    pub fn datanode_default_options(cluster_id: ClusterId, member_id: u64) -> Self {
        Self::new(cluster_id, member_id, Role::Datanode)
            .enable_store()
            .enable_heartbeat()
    }
  • flownode_default_options:

    pub fn flownode_default_options(cluster_id: ClusterId, member_id: u64) -> Self {
        Self::new(cluster_id, member_id, Role::Flownode)
            .enable_store()
            .enable_heartbeat()
    }
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the correct implementation of `flownode_default_options`.

# Test: Search for the `flownode_default_options` method call. Expect: Only occurances of the new signature.
rg --type rust $'flownode_default_options'

Length of output: 275



Script:

#!/bin/bash
# Extract other default options methods in MetaClientBuilder for comparison.

# Search for methods in MetaClientBuilder that follow the pattern of default options.
ast-grep --lang rust --pattern $'impl MetaClientBuilder {
  $$$
  pub fn $_default_options($_, $_) -> Self {
    $$$
  }
  $$$
}'

Length of output: 130



Script:

#!/bin/bash
# List all public methods in MetaClientBuilder for manual identification of default options methods.

# Search for all public methods in MetaClientBuilder.
ast-grep --lang rust --pattern $'impl MetaClientBuilder {
  $$$
  pub fn $_($_) -> $_ {
    $$$
  }
  $$$
}'

Length of output: 8821

src/flow/src/expr/scalar.rs (2)

287-287: Improve error context.

The addition of crate::error::ExternalSnafu context is a good practice to provide more detailed error information.


298-298: Improve error context.

The addition of crate::error::ExternalSnafu context is a good practice to provide more detailed error information.

src/flow/src/expr/linear.rs (1)

24-24: Add necessary imports.

The addition of Error and InvalidQuerySnafu imports is necessary for handling errors in the MapFilterProject struct.

src/flow/src/compute/render/reduce.rs (2)

28-28: Update import statement for PlanSnafu.

The new import statement for PlanSnafu is correct and aligns with the changes in the function body.


Line range hint 35-39: Add error handling for arranging handler.

The addition of error handling using PlanSnafu improves robustness by ensuring that the arrange handler is correctly initialized.

src/cmd/src/standalone.rs Show resolved Hide resolved
src/flow/src/adapter/node_context.rs Outdated Show resolved Hide resolved
src/flow/src/lib.rs Show resolved Hide resolved
src/meta-srv/src/cache_invalidator.rs Show resolved Hide resolved
src/meta-srv/src/cache_invalidator.rs Show resolved Hide resolved
src/cmd/src/frontend.rs Outdated Show resolved Hide resolved
src/cmd/src/bin/greptime.rs Show resolved Hide resolved
src/cmd/src/datanode.rs Outdated Show resolved Hide resolved
Copy link

codecov bot commented Jul 1, 2024

Codecov Report

Attention: Patch coverage is 29.74359% with 274 lines in your changes missing coverage. Please review.

Project coverage is 84.48%. Comparing base (214fd38) to head (c2a4a6d).
Report is 4 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4226      +/-   ##
==========================================
- Coverage   84.86%   84.48%   -0.38%     
==========================================
  Files        1045     1048       +3     
  Lines      184388   185525    +1137     
==========================================
+ Hits       156476   156746     +270     
- Misses      27912    28779     +867     

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 63691f6 and 6cd9c75.

Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
Files selected for processing (38)
  • src/cmd/src/bin/greptime.rs (3 hunks)
  • src/cmd/src/datanode.rs (1 hunks)
  • src/cmd/src/error.rs (2 hunks)
  • src/cmd/src/flownode.rs (1 hunks)
  • src/cmd/src/frontend.rs (2 hunks)
  • src/cmd/src/lib.rs (1 hunks)
  • src/cmd/src/standalone.rs (5 hunks)
  • src/flow/Cargo.toml (1 hunks)
  • src/flow/src/adapter.rs (8 hunks)
  • src/flow/src/adapter/flownode_impl.rs (1 hunks)
  • src/flow/src/adapter/node_context.rs (3 hunks)
  • src/flow/src/adapter/table_source.rs (1 hunks)
  • src/flow/src/adapter/util.rs (1 hunks)
  • src/flow/src/adapter/worker.rs (1 hunks)
  • src/flow/src/compute/render.rs (1 hunks)
  • src/flow/src/compute/render/map.rs (1 hunks)
  • src/flow/src/compute/render/reduce.rs (1 hunks)
  • src/flow/src/compute/render/src_sink.rs (1 hunks)
  • src/flow/src/error.rs (2 hunks)
  • src/flow/src/expr/func.rs (1 hunks)
  • src/flow/src/expr/linear.rs (1 hunks)
  • src/flow/src/expr/relation/func.rs (1 hunks)
  • src/flow/src/expr/scalar.rs (4 hunks)
  • src/flow/src/heartbeat.rs (5 hunks)
  • src/flow/src/lib.rs (1 hunks)
  • src/flow/src/plan.rs (1 hunks)
  • src/flow/src/repr/relation.rs (1 hunks)
  • src/flow/src/server.rs (4 hunks)
  • src/flow/src/transform.rs (1 hunks)
  • src/flow/src/transform/aggr.rs (1 hunks)
  • src/flow/src/transform/expr.rs (1 hunks)
  • src/flow/src/transform/literal.rs (1 hunks)
  • src/flow/src/transform/plan.rs (1 hunks)
  • src/meta-client/src/client.rs (1 hunks)
  • src/meta-srv/src/cache_invalidator.rs (1 hunks)
  • src/meta-srv/src/service/mailbox.rs (5 hunks)
  • tests-integration/src/standalone.rs (2 hunks)
  • tests/runner/src/env.rs (2 hunks)
Files skipped from review due to trivial changes (4)
  • src/cmd/src/standalone.rs
  • src/flow/src/adapter/worker.rs
  • src/flow/src/compute/render.rs
  • src/meta-client/src/client.rs
Files skipped from review as they are similar to previous changes (30)
  • src/cmd/src/bin/greptime.rs
  • src/cmd/src/datanode.rs
  • src/cmd/src/error.rs
  • src/cmd/src/frontend.rs
  • src/cmd/src/lib.rs
  • src/flow/Cargo.toml
  • src/flow/src/adapter.rs
  • src/flow/src/adapter/flownode_impl.rs
  • src/flow/src/adapter/node_context.rs
  • src/flow/src/adapter/table_source.rs
  • src/flow/src/adapter/util.rs
  • src/flow/src/compute/render/map.rs
  • src/flow/src/compute/render/reduce.rs
  • src/flow/src/compute/render/src_sink.rs
  • src/flow/src/error.rs
  • src/flow/src/expr/func.rs
  • src/flow/src/expr/linear.rs
  • src/flow/src/expr/relation/func.rs
  • src/flow/src/expr/scalar.rs
  • src/flow/src/heartbeat.rs
  • src/flow/src/lib.rs
  • src/flow/src/plan.rs
  • src/flow/src/repr/relation.rs
  • src/flow/src/transform.rs
  • src/flow/src/transform/aggr.rs
  • src/flow/src/transform/expr.rs
  • src/flow/src/transform/literal.rs
  • src/flow/src/transform/plan.rs
  • tests-integration/src/standalone.rs
  • tests/runner/src/env.rs
Additional comments not posted (13)
src/meta-srv/src/cache_invalidator.rs (1)

60-74: Ensure proper error handling and logging.

The error handling for broadcasting messages to Flownode is consistent with Frontend. Ensure that any failures are logged for better traceability.

Consider adding logging for the broadcast operation.

info!("Broadcasting message to Flownode");
src/meta-srv/src/service/mailbox.rs (2)

Line range hint 36-50: Consistent naming for channel variants.

The addition of the Flownode variant is consistent with existing variants. Ensure that all necessary parts of the system are updated to handle Flownode messages.


Line range hint 60-84: Ensure proper handling of Flownode messages.

The pusher_id and pusher_range methods are updated to handle Flownode messages correctly. Ensure that these methods are thoroughly tested.

src/flow/src/server.rs (3)

18-18: Ensure necessary imports for Arc usage.

The import of Arc is essential for shared ownership of the FlowWorkerManagerRef. Ensure that Arc is used appropriately to avoid potential issues with ownership and concurrency.


20-47: Ensure proper usage of imported modules.

The added imports are necessary for the functionality of the FlowService and FlownodeServer. Ensure that all imported modules are used correctly and efficiently.


Line range hint 53-117: FlowService implementation looks good.

The FlowService implementation is straightforward and follows best practices. Ensure that the error handling logic in handle_create_remove and handle_mirror_request is robust.

src/cmd/src/flownode.rs (7)

1-13: Ensure license compliance.

The license header is correctly included. Ensure that all files in the project comply with the licensing requirements.


15-42: Ensure proper usage of imports.

The added imports are necessary for the functionality of the Instance and Command structs. Ensure that all imported modules are used correctly and efficiently.


47-69: Instance struct implementation looks good.

The Instance struct is well-implemented, providing methods to access and mutate the flownode instance. Ensure that the logging guard is kept as intended to prevent the worker from being dropped.


71-87: App trait implementation for Instance.

The App trait implementation for Instance is straightforward. Ensure that the start and stop methods handle all edge cases and log relevant information for debugging.


89-105: Command struct implementation looks good.

The Command struct and its methods are well-implemented. Ensure that the build and load_options methods handle all edge cases and provide meaningful error messages.


107-118: SubCommand enum implementation looks good.

The SubCommand enum and its methods are well-implemented. Ensure that the build method handles all edge cases and provides meaningful error messages.


120-301: StartCommand struct and methods.

The StartCommand struct and its methods are well-implemented. Ensure that the build method handles all edge cases and provides meaningful error messages. Consider adding more detailed logging for each step to aid in troubleshooting.

src/flow/src/server.rs Outdated Show resolved Hide resolved
Copy link
Member

@WenyXu WenyXu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 6cd9c75 and 9c9a8a0.

Files selected for processing (5)
  • src/cmd/src/flownode.rs (1 hunks)
  • src/cmd/src/frontend.rs (1 hunks)
  • src/flow/src/adapter/node_context.rs (3 hunks)
  • src/flow/src/heartbeat.rs (5 hunks)
  • src/flow/src/server.rs (4 hunks)
Files skipped from review due to trivial changes (1)
  • src/cmd/src/frontend.rs
Files skipped from review as they are similar to previous changes (2)
  • src/flow/src/adapter/node_context.rs
  • src/flow/src/heartbeat.rs
Additional comments not posted (9)
src/flow/src/server.rs (5)

57-59: LGTM!

The FlowService::new function correctly initializes a FlowService instance.


112-117: LGTM!

The FlownodeServer::new function correctly initializes a FlownodeServer instance.


187-198: Consider adding detailed logging for each step.

Adding more detailed logging can aid in troubleshooting and provide better insights during the start process.

-        if let Some(task) = &self.heartbeat_task {
-            task.start().await?;
-        }
+        if let Some(task) = &self.heartbeat_task {
+            info!("Starting heartbeat task.");
+            task.start().await?;
+        }

-        self.addr = self
-            .server
-            .start(self.addr)
-            .await
-            .context(StartServerSnafu)?;
+        info!("Starting flownode server at address: {}", self.addr);
+        self.addr = self
+            .server
+            .start(self.addr)
+            .await
+            .context(StartServerSnafu)?;

199-207: Consider adding detailed logging for each step.

Adding more detailed logging can aid in troubleshooting and provide better insights during the shutdown process.

-        self.server.shutdown().await.context(ShutdownServerSnafu)?;
+        info!("Shutting down flownode server.");
+        self.server.shutdown().await.context(ShutdownServerSnafu)?;

-        if let Some(task) = &self.heartbeat_task {
-            task.shutdown();
-        }
+        if let Some(task) = &self.heartbeat_task {
+            info!("Shutting down heartbeat task.");
+            task.shutdown();
+        }

247-260: Consider adding detailed logging for each step.

Adding more detailed logging can aid in troubleshooting and provide better insights during the build process.

-        let manager = Arc::new(self.build_manager().await?);
+        info!("Building flow worker manager.");
+        let manager = Arc::new(self.build_manager().await?);

-        let server = FlownodeServer::new(FlowService::new(manager.clone()));
+        info!("Creating flownode server.");
+        let server = FlownodeServer::new(FlowService::new(manager.clone()));

-        let heartbeat_task = self.heartbeat_task;
+        let heartbeat_task = self.heartbeat_task;
+        if heartbeat_task.is_some() {
+            info!("Heartbeat task is set.");
+        }

-        let addr = self.opts.grpc.addr;
+        let addr = self.opts.grpc.addr;
+        info!("Using gRPC address: {}", addr);

-        let instance = FlownodeInstance {
-            server,
-            addr: addr.parse().context(ParseAddrSnafu { addr })?,
-            heartbeat_task,
-        };
+        let instance = FlownodeInstance {
+            server,
+            addr: addr.parse().context(ParseAddrSnafu { addr })?,
+            heartbeat_task,
+        };
+        info!("Flownode instance created.");
src/cmd/src/flownode.rs (4)

55-59: LGTM!

The Instance::new function correctly initializes an Instance with a FlownodeInstance and a vector of WorkerGuard.


77-79: Consider adding detailed logging for the start process.

Adding more detailed logging can aid in troubleshooting and provide better insights during the start process.

-        self.flownode.start().await.context(StartFlownodeSnafu)
+        info!("Starting flownode instance.");
+        self.flownode.start().await.context(StartFlownodeSnafu)

81-86: Consider adding detailed logging for the stop process.

Adding more detailed logging can aid in troubleshooting and provide better insights during the stop process.

-        self.flownode
-            .shutdown()
-            .await
-            .context(ShutdownFlownodeSnafu)
+        info!("Stopping flownode instance.");
+        self.flownode
+            .shutdown()
+            .await
+            .context(ShutdownFlownodeSnafu)

200-300: Consider adding detailed logging for each step.

Adding more detailed logging can aid in troubleshooting and provide better insights during the build process.

-        common_runtime::init_global_runtimes(&opts.runtime);
+        info!("Initializing global runtimes.");
+        common_runtime::init_global_runtimes(&opts.runtime);

-        let guard = common_telemetry::init_global_logging(
-            APP_NAME,
-            &opts.component.logging,
-            &opts.component.tracing,
-            opts.component.node_id.map(|x| x.to_string()),
-        );
+        info!("Initializing global logging.");
+        let guard = common_telemetry::init_global_logging(
+            APP_NAME,
+            &opts.component.logging,
+            &opts.component.tracing,
+            opts.component.node_id.map(|x| x.to_string()),
+        );
+        log_versions(version!(), short_version!());

-        info!("Flownode start command: {:#?}", self);
-        info!("Flownode options: {:#?}", opts);
+        info!("Processing flownode start command: {:#?}", self);
+        info!("Using flownode options: {:#?}", opts);

-        let opts = opts.component;
+        let opts = opts.component;

-        let cluster_id = opts.cluster_id.context(MissingConfigSnafu {
-            msg: "'cluster_id'",
-        })?;
+        let cluster_id = opts.cluster_id.context(MissingConfigSnafu {
+            msg: "'cluster_id'",
+        })?;
+        info!("Using cluster ID: {}", cluster_id);

-        let node_id = opts
-            .node_id
-            .context(MissingConfigSnafu { msg: "'node_id'" })?;
+        let node_id = opts
+            .node_id
+            .context(MissingConfigSnafu { msg: "'node_id'" })?;
+        info!("Using node ID: {}", node_id);

-        let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
-            msg: "'meta_client_options'",
-        })?;
+        let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
+            msg: "'meta_client_options'",
+        })?;
+        info!("Using meta client configuration.");

-        let meta_client = Arc::new(
-            flow::heartbeat::new_metasrv_client(cluster_id, node_id, meta_config)
-                .await
-                .context(StartFlownodeSnafu)?,
-        );
+        let meta_client = Arc::new(
+            flow::heartbeat::new_metasrv_client(cluster_id, node_id, meta_config)
+                .await
+                .context(StartFlownodeSnafu)?,
+        );
+        info!("Meta client created.");

-        let cache_max_capacity = meta_config.metadata_cache_max_capacity;
-        let cache_ttl = meta_config.metadata_cache_ttl;
-        let cache_tti = meta_config.metadata_cache_tti;
+        let cache_max_capacity = meta_config.metadata_cache_max_capacity;
+        let cache_ttl = meta_config.metadata_cache_ttl;
+        let cache_tti = meta_config.metadata_cache_tti;
+        info!("Cache configuration - max capacity: {}, TTL: {}, TTI: {}", cache_max_capacity, cache_ttl, cache_tti);

-        let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
-            .cache_max_capacity(cache_max_capacity)
-            .cache_ttl(cache_ttl)
-            .cache_tti(cache_tti)
-            .build();
+        let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
+            .cache_max_capacity(cache_max_capacity)
+            .cache_ttl(cache_ttl)
+            .cache_tti(cache_tti)
+            .build();
+        info!("Cached meta backend created.");

-        let cached_meta_backend = Arc::new(cached_meta_backend);
+        let cached_meta_backend = Arc::new(cached_meta_backend);

-        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
-            CacheRegistryBuilder::default()
-                .add_cache(cached_meta_backend.clone())
-                .build(),
-        );
+        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
+            CacheRegistryBuilder::default()
+                .add_cache(cached_meta_backend.clone())
+                .build(),
+        );
+        info!("Layered cache builder created.");

-        let fundamental_cache_registry =
-            build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
+        let fundamental_cache_registry =
+            build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
+        info!("Fundamental cache registry created.");

-        let layered_cache_registry = Arc::new(
-            with_default_composite_cache_registry(
-                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
-            )
-            .context(BuildCacheRegistrySnafu)?
-            .build(),
-        );
+        let layered_cache_registry = Arc::new(
+            with_default_composite_cache_registry(
+                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
+            )
+            .context(BuildCacheRegistrySnafu)?
+            .build(),
+        );
+        info!("Layered cache registry created.");

-        let catalog_manager = KvBackendCatalogManager::new(
-            opts.mode,
-            Some(meta_client.clone()),
-            cached_meta_backend.clone(),
-            layered_cache_registry.clone(),
-        );
+        let catalog_manager = KvBackendCatalogManager::new(
+            opts.mode,
+            Some(meta_client.clone()),
+            cached_meta_backend.clone(),
+            layered_cache_registry.clone(),
+        );
+        info!("Catalog manager created.");

-        let table_metadata_manager = Arc::new(TableMetadataManager::new(cached_meta_backend));
+        let table_metadata_manager = Arc::new(TableMetadataManager::new(cached_meta_backend));

-        table_metadata_manager
-            .init()
-            .await
-            .context(InitMetadataSnafu)?;
+        table_metadata_manager
+            .init()
+            .await
+            .context(InitMetadataSnafu)?;
+        info!("Table metadata manager initialized.");

-        let executor = HandlerGroupExecutor::new(vec![
-            Arc::new(ParseMailboxMessageHandler),
-            Arc::new(InvalidateTableCacheHandler::new(
-                layered_cache_registry.clone(),
-            )),
-        ]);
+        let executor = HandlerGroupExecutor::new(vec![
+            Arc::new(ParseMailboxMessageHandler),
+            Arc::new(InvalidateTableCacheHandler::new(
+                layered_cache_registry.clone(),
+            )),
+        ]);
+        info!("Handler group executor created.");

-        let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
-            &opts,
-            meta_client.clone(),
-            opts.heartbeat.clone(),
-            Arc::new(executor),
-        );
+        let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
+            &opts,
+            meta_client.clone(),
+            opts.heartbeat.clone(),
+            Arc::new(executor),
+        );
+        info!("Heartbeat task created.");

-        let flownode_builder = FlownodeBuilder::new(
-            opts,
-            Plugins::new(),
-            table_metadata_manager,
-            catalog_manager,
-        )
-        .with_heartbeat_task(heartbeat_task);
+        let flownode_builder = FlownodeBuilder::new(
+            opts,
+            Plugins::new(),
+            table_metadata_manager,
+            catalog_manager,
+        )
+        .with_heartbeat_task(heartbeat_task);
+        info!("Flownode builder created.");

-        let flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
+        let flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
+        info!("Flownode instance built.");

-        Ok(Instance::new(flownode, guard))
+        Ok(Instance::new(flownode, guard))
+        info!("Instance created.");

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 6cd9c75 and 1fa90e8.

Files selected for processing (6)
  • src/cmd/src/flownode.rs (1 hunks)
  • src/cmd/src/frontend.rs (1 hunks)
  • src/flow/src/adapter/node_context.rs (3 hunks)
  • src/flow/src/heartbeat.rs (5 hunks)
  • src/flow/src/server.rs (4 hunks)
  • src/meta-srv/src/service/mailbox.rs (5 hunks)
Files skipped from review due to trivial changes (1)
  • src/cmd/src/frontend.rs
Additional comments not posted (13)
src/meta-srv/src/service/mailbox.rs (6)

36-36: Add documentation for the new Channel::Flownode variant.

Adding documentation comments for the new Channel::Flownode variant will improve code readability and maintainability.

/// Represents a Flownode channel.
Flownode(u64),

48-50: Add documentation for the new Channel::Flownode variant in Display implementation.

Adding documentation comments for the new Channel::Flownode variant in the Display implementation will enhance code clarity.

/// Formats the Flownode channel for display.
Channel::Flownode(id) => {
    write!(f, "Flownode-{}", id)
}

60-60: Add documentation for the new Channel::Flownode variant in pusher_id method.

Adding documentation comments for the new Channel::Flownode variant in the pusher_id method will improve code readability.

/// Returns the pusher ID for the Flownode channel.
Channel::Flownode(id) => format!("{}-{}", Role::Flownode as i32, id),

67-67: Add documentation for the new BroadcastChannel::Flownode variant.

Adding documentation comments for the new BroadcastChannel::Flownode variant will improve code readability and maintainability.

/// Represents a Flownode broadcast channel.
Flownode,

79-83: Add documentation for the new BroadcastChannel::Flownode variant in pusher_range method.

Adding documentation comments for the new BroadcastChannel::Flownode variant in the pusher_range method will enhance code clarity.

/// Returns the pusher range for the Flownode broadcast channel.
BroadcastChannel::Flownode => Range {
    start: format!("{}-", Role::Flownode as i32),
    end: format!("{}-", Role::Flownode as i32 + 1),
},

157-160: Add test cases for the new BroadcastChannel::Flownode variant.

The test cases for the new BroadcastChannel::Flownode variant are well-written and ensure the correctness of the pusher_range method.

src/flow/src/heartbeat.rs (1)

103-112: Add detailed logging for the shutdown process.

Adding more detailed logging for each step of the shutdown process will aid in troubleshooting and ensure proper tracking of the task's lifecycle.

info!("Initiating shutdown of the heartbeat task for flownode");
if self.running.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire).is_err() {
    warn!("Heartbeat task already shut down or never started");
}
info!("Heartbeat task for flownode has been shut down");
src/flow/src/server.rs (2)

107-117: Add detailed logging for the shutdown process in FlownodeServer.

Adding more detailed logging for each step of the shutdown process will aid in troubleshooting and ensure proper tracking of the server's lifecycle.

info!("Initiating shutdown of the flow node server.");
if let Some(tx) = shutdown_tx.take() {
    if tx.send(()).is_err() {
        info!("Receiver dropped, the flow node server has already shut down.");
    }
} else {
    info!("Flow node server was not running.");
}
info!("Flow node server has been shut down.");

179-211: Add detailed logging for the start and shutdown process in FlownodeInstance.

Adding more detailed logging for each step of the start and shutdown process will aid in troubleshooting and ensure proper tracking of the instance's lifecycle.

pub async fn start(&mut self) -> Result<(), crate::Error> {
    if let Some(task) = &self.heartbeat_task {
        task.start().await?;
    }

    info!("Starting the flow node server at address: {}", self.addr);
    self.addr = self
        .server
        .start(self.addr)
        .await
        .context(StartServerSnafu)?;
    info!("Flow node server started at address: {}", self.addr);
    Ok(())
}

pub async fn shutdown(&self) -> Result<(), crate::Error> {
    info!("Shutting down the flow node server.");
    self.server.shutdown().await.context(ShutdownServerSnafu)?;

    if let Some(task) = &self.heartbeat_task {
        task.shutdown();
    }
    info!("Flow node server has been shut down.");
    Ok(())
}
src/cmd/src/flownode.rs (2)

47-69: Consider adding detailed logging for the lifecycle events in Instance.

Adding more detailed logging for each lifecycle event (start, stop) will aid in troubleshooting and ensure proper tracking of the instance's state.

impl Instance {
    pub fn new(flownode: FlownodeInstance, guard: Vec<WorkerGuard>) -> Self {
        info!("Creating a new flownode instance.");
        Self {
            flownode,
            _guard: guard,
        }
    }

    pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
        &mut self.flownode
    }

    pub fn flownode(&self) -> &FlownodeInstance {
        &self.flownode
    }
}

89-105: Ensure all edge cases are handled in Command methods.

The command handling logic is well-structured. Ensure that all edge cases are handled, particularly in error scenarios where configuration might be incomplete or invalid.

src/flow/src/adapter/node_context.rs (2)

27-27: Ensure consistent usage of TableNotFoundSnafu.

The TableNotFoundSnafu error handling has been added to the imports. Ensure it is used consistently throughout the file.

Verification successful

Consistent usage of TableNotFoundSnafu verified.

The TableNotFoundSnafu error handling has been added to the imports and is used consistently throughout the file.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify consistent usage of `TableNotFoundSnafu` in the file.

# Test: Search for `TableNotFoundSnafu` usage in the file. Expect: Consistent usage.
rg --type rust $'TableNotFoundSnafu' src/flow/src/adapter/node_context.rs

Length of output: 622


329-332: Ensure correctness of error handling in assign_table_schema.

The assign_table_schema method has been updated to include error handling using TableNotFoundSnafu. Ensure the error handling is correctly implemented.

Verification successful

Verified: Correctness of error handling in assign_table_schema.

The assign_table_schema method includes appropriate error handling using TableNotFoundSnafu when the table is not found in the table_repr.

  • src/flow/src/adapter/node_context.rs lines 329-332
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify correctness of error handling in `assign_table_schema`.

# Test: Search for `assign_table_schema` method implementation and verify error handling.
rg --type rust $'fn assign_table_schema' src/flow/src/adapter/node_context.rs

Length of output: 110



Script:

#!/bin/bash
# Retrieve the full implementation of the `assign_table_schema` method to verify error handling.
ast-grep --lang rust --pattern $'fn assign_table_schema($_) {
  $$$
}' src/flow/src/adapter/node_context.rs

Length of output: 108



Script:

#!/bin/bash
# Retrieve the full implementation of the `assign_table_schema` method to verify error handling.
rg --type rust -A 50 'fn assign_table_schema' src/flow/src/adapter/node_context.rs

Length of output: 1816

src/flow/src/heartbeat.rs Show resolved Hide resolved
src/flow/src/heartbeat.rs Show resolved Hide resolved
src/flow/src/heartbeat.rs Show resolved Hide resolved
src/flow/src/server.rs Show resolved Hide resolved
src/cmd/src/flownode.rs Outdated Show resolved Hide resolved
@discord9 discord9 mentioned this pull request Jul 1, 2024
3 tasks
@discord9 discord9 changed the title feat!: flow cli for distributed feat: flow cli for distributed Jul 1, 2024
@github-actions github-actions bot added docs-required This change requires docs update. and removed docs-not-required This change does not impact docs. labels Jul 1, 2024
@github-actions github-actions bot added docs-not-required This change does not impact docs. and removed docs-required This change requires docs update. labels Jul 1, 2024
@discord9 discord9 enabled auto-merge July 1, 2024 09:44
@discord9 discord9 added this pull request to the merge queue Jul 1, 2024
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 1fa90e8 and c2a4a6d.

Files selected for processing (1)
  • src/cmd/src/flownode.rs (1 hunks)
Files not reviewed due to errors (1)
  • src/cmd/src/flownode.rs (Error: Server error. Please try again later.)

Merged via the queue into GreptimeTeam:main with commit f035a7c Jul 1, 2024
58 checks passed
@discord9 discord9 deleted the flow_cli branch July 1, 2024 10:13
v0y4g3r pushed a commit to v0y4g3r/greptimedb that referenced this pull request Jul 3, 2024
* feat(WIP): add FlownodeInstance for flow cli

* feat(WIP): cli

* feat: add merge opts func

* refactor: move server&error to src dir

* feat: flownode cli build

* feat: add `flownode` subcmd to cli

* refactor: per review

* refactor!: BREAKING remove alias `metasrv-addr`

* chore: after rebase

* feat: cache invalide flownode cache

* chore: small refactor per review

* chore: fix a typo

* feat!: revert breaking change

* chore: per review

* refactor: not accept `metasrv-addr` only for flownode
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking-change This pull request contains breaking changes. docs-not-required This change does not impact docs.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants