diff --git a/src/coprocessor/endpoint.rs b/src/coprocessor/endpoint.rs index 34dee0ac569..f948c9eb2f6 100644 --- a/src/coprocessor/endpoint.rs +++ b/src/coprocessor/endpoint.rs @@ -31,7 +31,6 @@ use crate::util::Either; use crate::coprocessor::dag::executor::ExecutorMetrics; use crate::coprocessor::metrics::*; use crate::coprocessor::tracker::Tracker; -use crate::coprocessor::util as cop_util; use crate::coprocessor::*; const OUTDATED_ERROR_MSG: &str = "request outdated."; @@ -83,12 +82,16 @@ impl Endpoint { /// Parse the raw `Request` to create `RequestHandlerBuilder` and `ReqContext`. /// Returns `Err` if fails. - fn try_parse_request( + fn parse_request( &self, mut req: coppb::Request, peer: Option, is_streaming: bool, ) -> Result<(RequestHandlerBuilder, ReqContext)> { + fail_point!("coprocessor_parse_request", |_| Err(box_err!( + "unsupported tp (failpoint)" + ))); + let (context, data, ranges) = ( req.take_context(), req.take_data(), @@ -187,34 +190,6 @@ impl Endpoint { Ok((builder, req_ctx)) } - /// Parse the raw `Request` to create `RequestHandlerBuilder` and `ReqContext`. - #[inline] - fn parse_request( - &self, - req: coppb::Request, - peer: Option, - is_streaming: bool, - ) -> (RequestHandlerBuilder, ReqContext) { - match self.try_parse_request(req, peer, is_streaming) { - Ok(v) => v, - Err(err) => { - // If there are errors when parsing requests, create a dummy request handler. - let builder = - Box::new(|_, _: &_| Ok(cop_util::ErrorRequestHandler::new(err).into_boxed())); - let req_ctx = ReqContext::new( - "invalid", - kvrpcpb::Context::new(), - &[], - Duration::from_secs(60), // Large enough to avoid becoming outdated error - None, - None, - None, - ); - (builder, req_ctx) - } - } - } - /// Get the batch row limit configuration. #[inline] fn get_batch_row_limit(&self, is_streaming: bool) -> usize { @@ -293,39 +268,46 @@ impl Endpoint { }) } - /// Handle a unary request and run on the read pool. Returns a future producing the - /// result, which must be a `Response` and will never fail. If there are errors during - /// handling, they will be embedded in the `Response`. + /// Handle a unary request and run on the read pool. + /// + /// Returns `Err(err)` if the read pool is full. Returns `Ok(future)` in other cases. + /// The future inside may be an error however. fn handle_unary_request( &self, req_ctx: ReqContext, handler_builder: RequestHandlerBuilder, - ) -> impl Future { + ) -> Result> { let engine = self.engine.clone(); let priority = readpool::Priority::from(req_ctx.context.get_priority()); + // box the tracker so that moving it is cheap. let mut tracker = Box::new(Tracker::new(req_ctx)); - let result = self.read_pool.future_execute(priority, move |ctxd| { - tracker.attach_ctxd(ctxd); - - Self::handle_unary_request_impl(engine, tracker, handler_builder) - }); - - future::result(result) - // If the read pool is full, an error response will be returned directly. + self.read_pool + .future_execute(priority, move |ctxd| { + tracker.attach_ctxd(ctxd); + Self::handle_unary_request_impl(engine, tracker, handler_builder) + }) .map_err(|_| Error::Full) - .flatten() - .or_else(|e| Ok(make_error_response(e))) } + /// Parses and handles a unary request. Returns a future that will never fail. If there are + /// errors during parsing or handling, they will be converted into a `Response` as the success + /// result of the future. #[inline] pub fn parse_and_handle_unary_request( &self, req: coppb::Request, peer: Option, ) -> impl Future { - let (handler_builder, req_ctx) = self.parse_request(req, peer, false); - self.handle_unary_request(req_ctx, handler_builder) + let result_of_future = + self.parse_request(req, peer, false) + .and_then(|(handler_builder, req_ctx)| { + self.handle_unary_request(req_ctx, handler_builder) + }); + + future::result(result_of_future) + .flatten() + .or_else(|e| Ok(make_error_response(e))) } /// The real implementation of handling a stream request. @@ -422,62 +404,57 @@ impl Endpoint { .flatten_stream() } - /// Handle a stream request and run on the read pool. Returns a stream producing each - /// result, which must be a `Response` and will never fail. If there are errors during - /// handling, they will be embedded in the `Response`. + /// Handle a stream request and run on the read pool. + /// + /// Returns `Err(err)` if the read pool is full. Returns `Ok(stream)` in other cases. + /// The stream inside may produce errors however. fn handle_stream_request( &self, req_ctx: ReqContext, handler_builder: RequestHandlerBuilder, - ) -> impl Stream { - let (tx, rx) = mpsc::channel::(self.stream_channel_size); + ) -> Result> { + let (tx, rx) = mpsc::channel::>(self.stream_channel_size); let engine = self.engine.clone(); let priority = readpool::Priority::from(req_ctx.context.get_priority()); // Must be created befure `future_execute`, otherwise wait time is not tracked. let mut tracker = Box::new(Tracker::new(req_ctx)); - let tx1 = tx.clone(); - let result = self.read_pool.future_execute(priority, move |ctxd| { - tracker.attach_ctxd(ctxd); - - Self::handle_stream_request_impl(engine, tracker, handler_builder) - .or_else(|e| Ok::<_, mpsc::SendError<_>>(make_error_response(e))) - // Although returning `Ok()` from `or_else` will continue the stream, - // our stream has already ended when error is returned. - // Thus the stream will not continue any more even after we converting errors - // into a response. - .forward(tx1) - }); + self.read_pool + .future_execute(priority, move |ctxd| { + tracker.attach_ctxd(ctxd); - match result { - Err(_) => { - stream::once::<_, mpsc::SendError<_>>(Ok(make_error_response(Error::Full))) + Self::handle_stream_request_impl(engine, tracker, handler_builder) // Stream + .then(Ok::<_, mpsc::SendError<_>>) // Stream, MpscError> .forward(tx) - .then(|_| { - // ignore sink send failures - Ok::<_, ()>(()) - }) - // Should not be blocked, since the channel is large enough to hold 1 value. - .wait() - .unwrap(); - } - Ok(cpu_future) => { + }) + .map_err(|_| Error::Full) + .and_then(move |cpu_future| { // Keep running stream producer cpu_future.forget(); - } - } - rx + // Returns the stream instead of a future + Ok(rx.then(|r| r.unwrap())) + }) } + /// Parses and handles a stream request. Returns a stream that produce each result in a + /// `Response` and will never fail. If there are errors during parsing or handling, they will + /// be converted into a `Response` as the only stream item. #[inline] pub fn parse_and_handle_stream_request( &self, req: coppb::Request, peer: Option, ) -> impl Stream { - let (handler_builder, req_ctx) = self.parse_request(req, peer, true); - self.handle_stream_request(req_ctx, handler_builder) + let result_of_stream = + self.parse_request(req, peer, true) + .and_then(|(handler_builder, req_ctx)| { + self.handle_stream_request(req_ctx, handler_builder) + }); // Result, Error> + + stream::once(result_of_stream) // Stream, Error> + .flatten() // Stream + .or_else(|e| Ok(make_error_response(e))) // Stream } } @@ -676,6 +653,7 @@ mod tests { Box::new(|_, _: &_| Ok(UnaryFixture::new(Ok(coppb::Response::new())).into_boxed())); let resp = cop .handle_unary_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .wait() .unwrap(); assert!(resp.get_other_error().is_empty()); @@ -692,11 +670,11 @@ mod tests { None, None, ); - let resp = cop + assert!(cop .handle_unary_request(outdated_req_ctx, handler_builder) + .unwrap() .wait() - .unwrap(); - assert_eq!(resp.get_other_error(), OUTDATED_ERROR_MSG); + .is_err()); } #[test] @@ -806,25 +784,28 @@ mod tests { let handler_builder = Box::new(|_, _: &_| { Ok(UnaryFixture::new_with_duration(Ok(response), 1000).into_boxed()) }); - let future = cop.handle_unary_request(ReqContext::default_for_test(), handler_builder); - let tx = tx.clone(); - thread::spawn(move || tx.send(future.wait().unwrap())); + let result_of_future = + cop.handle_unary_request(ReqContext::default_for_test(), handler_builder); + match result_of_future { + Err(full_error) => { + tx.send(Err(full_error)).unwrap(); + } + Ok(future) => { + let tx = tx.clone(); + thread::spawn(move || { + tx.send(future.wait()).unwrap(); + }); + } + } thread::sleep(Duration::from_millis(100)); } // verify for _ in 2..5 { - let resp: coppb::Response = rx.recv().unwrap(); - assert_eq!(resp.get_data().len(), 0); - assert!(resp.has_region_error()); - assert!(resp.get_region_error().has_server_is_busy()); - assert_eq!( - resp.get_region_error().get_server_is_busy().get_reason(), - BUSY_ERROR_MSG - ); + assert!(rx.recv().unwrap().is_err()); } for i in 0..2 { - let resp = rx.recv().unwrap(); + let resp = rx.recv().unwrap().unwrap(); assert_eq!(resp.get_data(), [1, 2, i]); assert!(!resp.has_region_error()); } @@ -844,6 +825,7 @@ mod tests { }); let resp = cop .handle_unary_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .wait() .unwrap(); assert_eq!(resp.get_data().len(), 0); @@ -865,6 +847,7 @@ mod tests { }); let resp_vec = cop .handle_stream_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .collect() .wait() .unwrap(); @@ -884,6 +867,7 @@ mod tests { let handler_builder = Box::new(|_, _: &_| Ok(StreamFixture::new(responses).into_boxed())); let resp_vec = cop .handle_stream_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .collect() .wait() .unwrap(); @@ -907,6 +891,7 @@ mod tests { let handler_builder = Box::new(|_, _: &_| Ok(StreamFixture::new(vec![]).into_boxed())); let resp_vec = cop .handle_stream_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .collect() .wait() .unwrap(); @@ -942,6 +927,7 @@ mod tests { let handler_builder = Box::new(move |_, _: &_| Ok(handler.into_boxed())); let resp_vec = cop .handle_stream_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .collect() .wait() .unwrap(); @@ -967,6 +953,7 @@ mod tests { let handler_builder = Box::new(move |_, _: &_| Ok(handler.into_boxed())); let resp_vec = cop .handle_stream_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .collect() .wait() .unwrap(); @@ -992,6 +979,7 @@ mod tests { let handler_builder = Box::new(move |_, _: &_| Ok(handler.into_boxed())); let resp_vec = cop .handle_stream_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .collect() .wait() .unwrap(); @@ -1029,6 +1017,7 @@ mod tests { let handler_builder = Box::new(move |_, _: &_| Ok(handler.into_boxed())); let resp_vec = cop .handle_stream_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .take(7) .collect() .wait() @@ -1086,8 +1075,9 @@ mod tests { ) .into_boxed()) }); - let resp_future_1 = - cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder); + let resp_future_1 = cop + .handle_unary_request(req_with_exec_detail.clone(), handler_builder) + .unwrap(); let sender = tx.clone(); thread::spawn(move || sender.send(vec![resp_future_1.wait().unwrap()]).unwrap()); // Sleep a while to make sure that thread is spawn and snapshot is taken. @@ -1100,8 +1090,9 @@ mod tests { .into_boxed(), ) }); - let resp_future_2 = - cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder); + let resp_future_2 = cop + .handle_unary_request(req_with_exec_detail.clone(), handler_builder) + .unwrap(); let sender = tx.clone(); thread::spawn(move || sender.send(vec![resp_future_2.wait().unwrap()]).unwrap()); thread::sleep(Duration::from_millis(SNAPSHOT_DURATION_MS as u64)); @@ -1159,8 +1150,9 @@ mod tests { ) .into_boxed()) }); - let resp_future_1 = - cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder); + let resp_future_1 = cop + .handle_unary_request(req_with_exec_detail.clone(), handler_builder) + .unwrap(); let sender = tx.clone(); thread::spawn(move || sender.send(vec![resp_future_1.wait().unwrap()]).unwrap()); // Sleep a while to make sure that thread is spawn and snapshot is taken. @@ -1182,8 +1174,9 @@ mod tests { ) .into_boxed()) }); - let resp_future_3 = - cop.handle_stream_request(req_with_exec_detail.clone(), handler_builder); + let resp_future_3 = cop + .handle_stream_request(req_with_exec_detail.clone(), handler_builder) + .unwrap(); let sender = tx.clone(); thread::spawn(move || { sender diff --git a/src/coprocessor/mod.rs b/src/coprocessor/mod.rs index b23b3f92e0d..b94f439ccca 100644 --- a/src/coprocessor/mod.rs +++ b/src/coprocessor/mod.rs @@ -110,6 +110,10 @@ impl Deadline { /// Returns error if the deadline is exceeded. pub fn check_if_exceeded(&self) -> Result<()> { + fail_point!("coprocessor_deadline_check_exceeded", |_| Err( + Error::Outdated(Duration::from_secs(60), self.tag) + )); + let now = Instant::now_coarse(); if self.deadline <= now { let elapsed = now.duration_since(self.start_time); diff --git a/src/coprocessor/util.rs b/src/coprocessor/util.rs index f71978c00af..92eb9d9685b 100644 --- a/src/coprocessor/util.rs +++ b/src/coprocessor/util.rs @@ -15,27 +15,6 @@ use kvproto::coprocessor as coppb; use tipb::schema::ColumnInfo; use crate::coprocessor::codec::datum::Datum; -use crate::coprocessor::*; - -/// A `RequestHandler` that always produces errors. -pub struct ErrorRequestHandler { - error: Option, -} - -impl ErrorRequestHandler { - pub fn new(error: Error) -> ErrorRequestHandler { - ErrorRequestHandler { error: Some(error) } - } -} - -impl RequestHandler for ErrorRequestHandler { - fn handle_request(&mut self) -> Result { - Err(self.error.take().unwrap()) - } - fn handle_streaming_request(&mut self) -> Result<(Option, bool)> { - Err(self.error.take().unwrap()) - } -} /// Convert the key to the smallest key which is larger than the key given. pub fn convert_to_prefix_next(key: &mut Vec) { diff --git a/src/server/readpool/mod.rs b/src/server/readpool/mod.rs index 291f67126c7..2995bc7a41c 100644 --- a/src/server/readpool/mod.rs +++ b/src/server/readpool/mod.rs @@ -124,6 +124,11 @@ impl ReadPool { F::Item: Send + 'static, F::Error: Send + 'static, { + fail_point!("read_pool_execute_full", |_| Err(Full { + current_tasks: 100, + max_tasks: 100, + })); + let pool = self.get_pool_by_priority(priority); let max_tasks = self.get_max_tasks_by_priority(priority); let current_tasks = pool.get_running_task_count(); diff --git a/src/storage/engine/rocksdb_engine.rs b/src/storage/engine/rocksdb_engine.rs index deab0c01578..fc5c2b7614b 100644 --- a/src/storage/engine/rocksdb_engine.rs +++ b/src/storage/engine/rocksdb_engine.rs @@ -16,6 +16,7 @@ use std::ops::Deref; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; +use kvproto::errorpb::Error as ErrorHeader; use kvproto::kvrpcpb::Context; use tempdir::TempDir; @@ -249,6 +250,14 @@ impl Engine for RocksEngine { } fn async_snapshot(&self, _: &Context, cb: Callback) -> Result<()> { + fail_point!("rockskv_async_snapshot", |_| Err(box_err!( + "snapshot failed" + ))); + fail_point!("rockskv_async_snapshot_not_leader", |_| { + let mut header = ErrorHeader::new(); + header.mut_not_leader().set_region_id(100); + Err(Error::Request(header)) + }); box_try!(self.sched.schedule(Task::Snapshot(cb))); Ok(()) } diff --git a/tests/failpoints/cases/mod.rs b/tests/failpoints/cases/mod.rs index f4494cf2f62..8484a7c1a77 100644 --- a/tests/failpoints/cases/mod.rs +++ b/tests/failpoints/cases/mod.rs @@ -13,6 +13,7 @@ mod test_bootstrap; mod test_conf_change; +mod test_coprocessor; mod test_merge; mod test_pending_peers; mod test_snap; diff --git a/tests/failpoints/cases/test_coprocessor.rs b/tests/failpoints/cases/test_coprocessor.rs new file mode 100644 index 00000000000..35065fc3a99 --- /dev/null +++ b/tests/failpoints/cases/test_coprocessor.rs @@ -0,0 +1,118 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use test_coprocessor::*; + +#[test] +fn test_outdated() { + let _guard = crate::setup(); + + let product = ProductTable::new(); + let (_, endpoint) = init_with_data(&product, &[]); + let req = DAGSelect::from(&product).build(); + + fail::cfg("coprocessor_deadline_check_exceeded", "return()").unwrap(); + let resp = handle_request(&endpoint, req); + + assert!(resp.get_other_error().contains("request outdated")); +} + +#[test] +fn test_outdated_2() { + // It should not even take any snapshots when request is outdated from the beginning. + + let _guard = crate::setup(); + + let product = ProductTable::new(); + let (_, endpoint) = init_with_data(&product, &[]); + let req = DAGSelect::from(&product).build(); + + fail::cfg("rockskv_async_snapshot", "panic").unwrap(); + fail::cfg("coprocessor_deadline_check_exceeded", "return()").unwrap(); + let resp = handle_request(&endpoint, req); + + assert!(resp.get_other_error().contains("request outdated")); +} + +#[test] +fn test_parse_request_failed() { + let _guard = crate::setup(); + + let product = ProductTable::new(); + let (_, endpoint) = init_with_data(&product, &[]); + let req = DAGSelect::from(&product).build(); + + fail::cfg("coprocessor_parse_request", "return()").unwrap(); + let resp = handle_request(&endpoint, req); + + assert!(resp.get_other_error().contains("unsupported tp")); +} + +#[test] +fn test_parse_request_failed_2() { + // It should not even take any snapshots when parse failed. + + let _guard = crate::setup(); + + let product = ProductTable::new(); + let (_, endpoint) = init_with_data(&product, &[]); + let req = DAGSelect::from(&product).build(); + + fail::cfg("rockskv_async_snapshot", "panic").unwrap(); + fail::cfg("coprocessor_parse_request", "return()").unwrap(); + let resp = handle_request(&endpoint, req); + + assert!(resp.get_other_error().contains("unsupported tp")); +} + +#[test] +fn test_readpool_full() { + let _guard = crate::setup(); + + let product = ProductTable::new(); + let (_, endpoint) = init_with_data(&product, &[]); + let req = DAGSelect::from(&product).build(); + + fail::cfg("read_pool_execute_full", "return()").unwrap(); + let resp = handle_request(&endpoint, req); + + assert!(resp.get_region_error().has_server_is_busy()); +} + +#[test] +fn test_snapshot_failed() { + let _guard = crate::setup(); + + let product = ProductTable::new(); + let (_, endpoint) = init_with_data(&product, &[]); + let req = DAGSelect::from(&product).build(); + + fail::cfg("rockskv_async_snapshot", "return()").unwrap(); + let resp = handle_request(&endpoint, req); + + assert!(resp.get_other_error().contains("snapshot failed")); +} + +#[test] +fn test_snapshot_failed_2() { + let _guard = crate::setup(); + + let product = ProductTable::new(); + let (_, endpoint) = init_with_data(&product, &[]); + let req = DAGSelect::from(&product).build(); + + fail::cfg("rockskv_async_snapshot_not_leader", "return()").unwrap(); + let resp = handle_request(&endpoint, req); + + assert!(resp.get_region_error().has_not_leader()); +}