Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(barrier): support database failure isolation (part 1, meta) #19664

Merged
merged 2 commits into from
Dec 11, 2024

Conversation

wenym1
Copy link
Contributor

@wenym1 wenym1 commented Dec 4, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

In this PR, we implement the logic of database failure isolation in meta global barrier manager part.

We will add a new variant ReportDatabaseFailureResponse in StreamingControlStreamResponse to report the failure of single database. After receiving such response, instead of triggering a global recovery, we will only recover the streaming job of the failed database, and other databases won't be affected by this failure. Note that, the per-database recovery is only triggered by receiving a ReportDatabaseFailureResponse from CN, and when receiving an error from bidi-stream to any compute node will still trigger global recovery. However, in this PR we only implement the logic in the meta global barrier manager, and the CN local barrier manager will not send any ReportDatabaseFailureResponse yet. Therefore, the per-database recovery logic will not be triggered in this PR. It will be triggered only after #19579, which implements the logic in CN local barrier manager, is merged.

Previously in CheckpointControl, we store the runtime info of each database in DatabaseCheckpointControl. In this PR, each database will be stored as DatabaseCheckpointControlStatus, which can be either Running or Recovering.

enum DatabaseCheckpointControlStatus {
    Running(DatabaseCheckpointControl),
    Recovering(DatabaseRecoveringState),
}

In Recovery, it has two stages, Resetting and Initializing. In the Resetting stage, it send ResetDatabaseRequest to all CNs, and wait for the ResetDatabaseResponse from all CNs. After receiving the ResetDatabaseResponse from all CNs, and will enter the Initializing stage. By now, we will have ensured that, any information of the reset database will not exist in any CN, and there won't be any inflight message of the database exist in the bidi-stream. In the Initializing stage, we will send initial barriers to all CNs to recreate the actors of the database. After we successfully collects the initial barrier from all CNs, the database is recovered and enter the normal Running.

We can treat each database as a state machine of 3 states: Running, Resetting and Initializing. The state transition can be triggered when receiving 3 variants of response: ReportDatabaseFailure, BarrierComplete, DatabaseReset. The logic of state transition can be summarized as followed:

  • Running
    • on ReportDatabaseFailure
      • wait for the inflight BarrierCompletingTask to finish if there is any, mark the database as blocked in command queue
      • send ResetDatabaseRequest with reset_request_id as 0 to all CNs, and save reset_request_id and the set of nodes that need to collect response.
      • enter Resetting state.
    • on BarrierComplete: update the DatabaseCheckpointControl.
    • on DatabaseReset: unreachable
  • Resetting
    • on ReportDatabaseFailure or BarrierComplete: ignore
    • on DatabaseReset:
      • if the reset_request_id in the response is less than the saved reset_request_id, ignore
      • otherwise, mark the CN as collected.
      • when all CNs have collected the response:
        • load the database runtime info from catalog manager and fragment manager
        • inject the initial barrier to CNs, save the set of nodes that need to collect response
        • enter Initializing state
  • Initializing
    • on BarrierComplete:
      • mark the CN as collected
      • when all CNs ahve collected the response: enter Running
    • on ReportDatabaseFailure
      • increment the previously saved reset_request_id, and send ResetDatabaseRequest to all CNs
      • enter Resetting
    • on DatabaseReset: unreachable

Note that we use the reset_request_id to handle the case of stale ResetDatabaseResponse, though in current state machine, there should not be stale ResetDatabaseResponse, because in Resetting state, we either enter Initializing when we collect the ResetDatabaseResponse from previously sent CNs, or get error and enter global recovery.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Copy link
Contributor Author

wenym1 commented Dec 4, 2024

@@ -979,33 +982,11 @@ impl LocalBarrierManager {
}

/// A [`StreamError`] with a score, used to find the root cause of actor failures.
#[derive(Debug, Clone)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to src/error/src/tonic/extra.rs.

Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some early comments.

if self.checkpoint_control.is_failed_at_worker_err(worker_id) {
let errors = self.control_stream_manager.collect_errors(worker_id, e).await;
let err = merge_node_rpc_errors("get error from control stream", errors);
self.report_collect_failure(&err);
Self::report_collect_failure(&self.env, &err);
self.failure_recovery(err).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that, the per-database recovery is only triggered by receiving a ReportDatabaseFailureResponse from CN, and when receiving an error from bidi-stream to any compute node will still trigger global recovery.

With node label introduced in the future, I think the flow on bidi-stream failure will become:

  • Get the databases assigned to the failing CN
  • Trigger database recovery for each of those databases

In other words, we will only rely on database recovery and deprecate the global recovery logics ultimately

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Global recovery may still be triggered in cases such as manually triggering recovery, and some internal errors of global barrier manager such as failed to commit epoch.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not saying that we no longer have global recovery. My point is it is better that the global recovery is done by triggering DB recovery for each existing DB.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. In future PRs, the logic when handling bidi-stream failure will become something similar to:

  • Get the databases assigned to the failing CN
  • Trigger database recovery for each of those databases

The reason why this PR still triggers a global recovery is because the CN side per database recovery logic is not included in this PR so to avoid breaking CI, we need to keep the logic global recovery separately in this PR.

src/meta/src/barrier/worker.rs Show resolved Hide resolved
src/meta/src/barrier/worker.rs Show resolved Hide resolved
if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(resp, &mut self.control_stream_manager) {
let database_id = entering_recovery.database_id();
warn!(database_id = database_id.database_id, "database entering recovery");
self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also make the barrier manager status per DB? Otherwise, when one DB is under recovery here, the barrier manager check_status_running will return an error, which will affect meta operations or batch queries on other DBs I guess.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @yezizp2012 Will it be an issue if we remove the barrier_manager.check_status_running check completely? Previously the check seems to be used to avoid DDL and scale (1, 2). However, the check is not 100% accurate because it is possible that recovery is triggered immediately after the check has passed.

In this case, we still need to rely on the validation in barrier manager to make sure the corresponding command is not allowed. In other words, if we change to only rely on the barrier manager validation, will it be a problem? We may need to check whether there can be partial metadata written before barrier injection and whether these partial metadata will be cleaned up correctly on barrier injection/collection failure.

src/meta/src/barrier/checkpoint/recovery.rs Show resolved Hide resolved
src/meta/src/barrier/worker.rs Show resolved Hide resolved
Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM.

I have two more suggestions:

  1. The PR description is awesome and can help people understand the state machine in the codes. How about including the description in the codes as documentation of GlobalBarrierWorker::run_inner (with references to other codes as well)
  2. The state machine is a bit more complicated so we need to improve observability on that. Let's include elapsed time and an info log with necessary information (db_id, state, status action, retry, etc) before and after each state machine transition.

Comment on lines +384 to +418
let temp_place_holder = DatabaseRecoveringStage::Resetting {
remaining_workers: Default::default(),
reset_workers: Default::default(),
reset_request_id: 0,
backoff_future: None,
};
match replace(&mut state.stage, temp_place_holder) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: What do we need the replace here? The whole database_status will be reset in L400 anyway so I guess we only need to do match state.stage here?

@hzxa21
Copy link
Collaborator

hzxa21 commented Dec 10, 2024

cc @shanicky @st1page for awareness. In this PR, global recovery is still present when the CN-meta RPC fails, which can happen on CN crashes. In the future we will trigger DB recovery instead of global recovery in such cases and global recovery will only be triggered on manual recovery or errors inside meta. See context here.

By comparing the metadata recovery logic for DB recovery and global recovery, there is one missing piece in the former:

Per DB scale_actors / migrate_actors are not supported. I think we will need it to achieve isolation on node failure between DB (i.e. resource group) because otherwise we can only trigger global recovery on node failure.

@st1page
Copy link
Contributor

st1page commented Dec 11, 2024

isolation on node failure between DB (i.e. resource group)

We need to determine whether we need a database level recovery or a resource group level group.
Here database and resource group still have some slight differences. In our design, a resource group can contain multiple databases, so database level recovery is more granular than resource group level group.
However, for a node failure, all the databases in a resource group must be affected, so there is no difference between the two levels of recovery in this case. The scenarios that database level recovery can handle are the errors thrown by streaming execution.
I preferred database level because I probably felt that it wouldn't require any extra work, but after writing this I realized that it would require a lot of work, such as additional support on each compute node, so I'm leaning towards doing the resource group recovery first.

cc @shanicky

@hzxa21
Copy link
Collaborator

hzxa21 commented Dec 11, 2024

I preferred database level because I probably felt that it wouldn't require any extra work, but after writing this I realized that it would require a lot of work, such as additional support on each compute node, so I'm leaning towards doing the resource group recovery first.

Can you elaborate why it requires additional support on each CN to support per DB recovery/actor migration? Currently I think we make a strong assumption that there will be no edges between DB objects in different DB but I am not sure whether the same assumption holds for resource group. Also, the metadata like barrier command and catalog are bound to DB id so I think it is more natural to do per DB recovery actually.

@wenym1 wenym1 force-pushed the yiming/database-failure-isolation-meta-part branch from 7baa0d1 to 07de9ce Compare December 11, 2024 05:57
@wenym1 wenym1 added this pull request to the merge queue Dec 11, 2024
Merged via the queue into main with commit 61d4a79 Dec 11, 2024
34 of 35 checks passed
@wenym1 wenym1 deleted the yiming/database-failure-isolation-meta-part branch December 11, 2024 08:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants