Skip to content

Commit

Permalink
feat(barrier): support database failure isolation (part 1, meta) (#19664
Browse files Browse the repository at this point in the history
)
  • Loading branch information
wenym1 authored Dec 11, 2024
1 parent f945379 commit 61d4a79
Show file tree
Hide file tree
Showing 13 changed files with 885 additions and 122 deletions.
22 changes: 22 additions & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,44 @@ message StreamingControlStreamRequest {
uint32 database_id = 2;
}

message ResetDatabaseRequest {
uint32 database_id = 1;
uint32 reset_request_id = 2;
}

oneof request {
InitRequest init = 1;
InjectBarrierRequest inject_barrier = 2;
RemovePartialGraphRequest remove_partial_graph = 3;
CreatePartialGraphRequest create_partial_graph = 4;
ResetDatabaseRequest reset_database = 5;
}
}

message ScoredError {
string err_msg = 1;
int32 score = 2;
}

message StreamingControlStreamResponse {
message InitResponse {}
message ShutdownResponse {}
message ReportDatabaseFailureResponse {
uint32 database_id = 1;
}

message ResetDatabaseResponse {
uint32 database_id = 1;
optional ScoredError root_err = 2;
uint32 reset_request_id = 3;
}

oneof response {
InitResponse init = 1;
BarrierCompleteResponse complete_barrier = 2;
ShutdownResponse shutdown = 3;
ReportDatabaseFailureResponse report_database_failure = 4;
ResetDatabaseResponse reset_database = 5;
}
}

Expand Down
25 changes: 25 additions & 0 deletions src/error/src/tonic/extra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,31 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Score(pub i32);

/// A error with a score, used to find the root cause of multiple failures.
#[derive(Debug, Clone)]
pub struct ScoredError<E> {
pub error: E,
pub score: Score,
}

impl<E: std::fmt::Display> std::fmt::Display for ScoredError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.error.fmt(f)
}
}

impl<E: std::error::Error> std::error::Error for ScoredError<E> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.error.source()
}

fn provide<'a>(&'a self, request: &mut std::error::Request<'a>) {
self.error.provide(request);
// HIGHLIGHT: Provide the score to make it retrievable from meta service.
request.provide_value(self.score);
}
}

/// Extra fields in errors that can be passed through the gRPC boundary.
///
/// - Field being set to `None` means it is not available.
Expand Down
Loading

0 comments on commit 61d4a79

Please sign in to comment.