Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#16071
Browse files Browse the repository at this point in the history
close tikv#16056, close tikv#16070

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
YuJuncen authored and ti-chi-bot committed Nov 30, 2023
1 parent 693a450 commit 9b34274
Show file tree
Hide file tree
Showing 7 changed files with 1,145 additions and 50 deletions.
100 changes: 53 additions & 47 deletions components/backup-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,53 @@ where
self.meta_client.clone()
}

fn on_fatal_error_of_task(&self, task: &str, err: &Error) -> future![()] {
metrics::update_task_status(TaskStatus::Error, task);
let meta_cli = self.get_meta_client();
let pdc = self.pd_client.clone();
let store_id = self.store_id;
let sched = self.scheduler.clone();
let safepoint_name = self.pause_guard_id_for_task(task);
let safepoint_ttl = self.pause_guard_duration();
let code = err.error_code().code.to_owned();
let msg = err.to_string();
let task = task.to_owned();
async move {
let err_fut = async {
let safepoint = meta_cli.global_progress_of_task(&task).await?;
pdc.update_service_safe_point(
safepoint_name,
TimeStamp::new(safepoint.saturating_sub(1)),
safepoint_ttl,
)
.await?;
meta_cli.pause(&task).await?;
let mut last_error = StreamBackupError::new();
last_error.set_error_code(code);
last_error.set_error_message(msg.clone());
last_error.set_store_id(store_id);
last_error.set_happen_at(TimeStamp::physical_now());
meta_cli.report_last_error(&task, last_error).await?;
Result::Ok(())
};
if let Err(err_report) = err_fut.await {
err_report.report(format_args!("failed to upload error {}", err_report));
let name = task.to_owned();
// Let's retry reporting after 5s.
tokio::task::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
try_send!(
sched,
Task::FatalError(
TaskSelector::ByName(name),
Box::new(annotate!(err_report, "origin error: {}", msg))
)
);
});
}
}
}

fn on_fatal_error(&self, select: TaskSelector, err: Box<Error>) {
err.report_fatal();
let tasks = self
Expand All @@ -224,49 +271,7 @@ where
for task in tasks {
// Let's pause the task first.
self.unload_task(&task);
metrics::update_task_status(TaskStatus::Error, &task);

let meta_cli = self.get_meta_client();
let pdc = self.pd_client.clone();
let store_id = self.store_id;
let sched = self.scheduler.clone();
let safepoint_name = self.pause_guard_id_for_task(&task);
let safepoint_ttl = self.pause_guard_duration();
let code = err.error_code().code.to_owned();
let msg = err.to_string();
self.pool.block_on(async move {
let err_fut = async {
let safepoint = meta_cli.global_progress_of_task(&task).await?;
pdc.update_service_safe_point(
safepoint_name,
TimeStamp::new(safepoint.saturating_sub(1)),
safepoint_ttl,
)
.await?;
meta_cli.pause(&task).await?;
let mut last_error = StreamBackupError::new();
last_error.set_error_code(code);
last_error.set_error_message(msg.clone());
last_error.set_store_id(store_id);
last_error.set_happen_at(TimeStamp::physical_now());
meta_cli.report_last_error(&task, last_error).await?;
Result::Ok(())
};
if let Err(err_report) = err_fut.await {
err_report.report(format_args!("failed to upload error {}", err_report));
// Let's retry reporting after 5s.
tokio::task::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
try_send!(
sched,
Task::FatalError(
TaskSelector::ByName(task.to_owned()),
Box::new(annotate!(err_report, "origin error: {}", msg))
)
);
});
}
});
self.pool.block_on(self.on_fatal_error_of_task(&task, &err));
}
}

Expand Down Expand Up @@ -641,6 +646,9 @@ where
let run = async move {
let task_name = task.info.get_name();
let ranges = cli.ranges_of_task(task_name).await?;
fail::fail_point!("load_task::error_when_fetching_ranges", |_| {
Err(Error::Other("what range? no such thing, go away.".into()))
});
info!(
"register backup stream ranges";
"task" => ?task,
Expand Down Expand Up @@ -668,10 +676,8 @@ where
Result::Ok(())
};
if let Err(e) = run.await {
e.report(format!(
"failed to register backup stream task {} to router: ranges not found",
task_clone.info.get_name()
));
self.on_fatal_error_of_task(&task_clone.info.name, &Box::new(e))
.await;
}
});
metrics::update_task_status(TaskStatus::Running, &task_name);
Expand Down
14 changes: 13 additions & 1 deletion components/backup-stream/src/metadata/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,19 @@ impl<Store: MetaStore> MetadataClient<Store> {
Ok(())
}

pub async fn get_last_error(
pub async fn get_last_error(&self, name: &str) -> Result<Option<StreamBackupError>> {
let key = MetaKey::last_errors_of(name);

let r = self.meta_store.get_latest(Keys::Prefix(key)).await?.inner;
if r.is_empty() {
return Ok(None);
}
let r = &r[0];
let err = protobuf::parse_from_bytes(r.value())?;
Ok(Some(err))
}

pub async fn get_last_error_of(
&self,
name: &str,
store_id: u64,
Expand Down
34 changes: 34 additions & 0 deletions components/backup-stream/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,28 @@ impl Drop for StreamTaskInfo {
}
}

impl Drop for StreamTaskInfo {
fn drop(&mut self) {
let (success, failed): (Vec<_>, Vec<_>) = self
.flushing_files
.get_mut()
.drain(..)
.chain(self.flushing_meta_files.get_mut().drain(..))
.map(|(_, f, _)| f.inner.path().to_owned())
.map(|p| self.temp_file_pool.remove(&p))
.partition(|r| *r);
info!("stream task info dropped[1/2], removing flushing_temp files"; "success" => %success.len(), "failure" => %failed.len());
let (success, failed): (Vec<_>, Vec<_>) = self
.files
.get_mut()
.drain()
.map(|(_, f)| f.into_inner().inner.path().to_owned())
.map(|p| self.temp_file_pool.remove(&p))
.partition(|r| *r);
info!("stream task info dropped[2/2], removing temp files"; "success" => %success.len(), "failure" => %failed.len());
}
}

impl std::fmt::Debug for StreamTaskInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamTaskInfo")
Expand Down Expand Up @@ -2045,6 +2067,12 @@ mod tests {
let (task, _path) = task("cleanup_test".to_owned()).await?;
must_register_table(&router, task, 1).await;
write_simple_data(&router).await;
let tempfiles = router
.get_task_info("cleanup_test")
.await
.unwrap()
.temp_file_pool
.clone();
router
.get_task_info("cleanup_test")
.await?
Expand All @@ -2053,6 +2081,7 @@ mod tests {
write_simple_data(&router).await;
let mut w = walkdir::WalkDir::new(&tmp).into_iter();
assert!(w.next().is_some(), "the temp files doesn't created");
assert!(tempfiles.mem_used() > 0, "the temp files doesn't created.");
drop(router);
let w = walkdir::WalkDir::new(&tmp)
.into_iter()
Expand All @@ -2070,6 +2099,11 @@ mod tests {
"the temp files should be removed, but it is {:?}",
w
);
assert_eq!(
tempfiles.mem_used(),
0,
"the temp files hasn't been cleared."
);
Ok(())
}

Expand Down
Loading

0 comments on commit 9b34274

Please sign in to comment.