Skip to content

Commit

Permalink
Remove ErrorRequestHandler (tikv#4303)
Browse files Browse the repository at this point in the history
Signed-off-by: Breezewish <breezewish@pingcap.com>
  • Loading branch information
breezewish authored and overvenus committed Mar 14, 2019
1 parent 970323e commit eed94db
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 124 deletions.
199 changes: 96 additions & 103 deletions src/coprocessor/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use crate::util::Either;
use crate::coprocessor::dag::executor::ExecutorMetrics;
use crate::coprocessor::metrics::*;
use crate::coprocessor::tracker::Tracker;
use crate::coprocessor::util as cop_util;
use crate::coprocessor::*;

const OUTDATED_ERROR_MSG: &str = "request outdated.";
Expand Down Expand Up @@ -83,12 +82,16 @@ impl<E: Engine> Endpoint<E> {

/// Parse the raw `Request` to create `RequestHandlerBuilder` and `ReqContext`.
/// Returns `Err` if fails.
fn try_parse_request(
fn parse_request(
&self,
mut req: coppb::Request,
peer: Option<String>,
is_streaming: bool,
) -> Result<(RequestHandlerBuilder<E::Snap>, ReqContext)> {
fail_point!("coprocessor_parse_request", |_| Err(box_err!(
"unsupported tp (failpoint)"
)));

let (context, data, ranges) = (
req.take_context(),
req.take_data(),
Expand Down Expand Up @@ -187,34 +190,6 @@ impl<E: Engine> Endpoint<E> {
Ok((builder, req_ctx))
}

/// Parse the raw `Request` to create `RequestHandlerBuilder` and `ReqContext`.
#[inline]
fn parse_request(
&self,
req: coppb::Request,
peer: Option<String>,
is_streaming: bool,
) -> (RequestHandlerBuilder<E::Snap>, ReqContext) {
match self.try_parse_request(req, peer, is_streaming) {
Ok(v) => v,
Err(err) => {
// If there are errors when parsing requests, create a dummy request handler.
let builder =
Box::new(|_, _: &_| Ok(cop_util::ErrorRequestHandler::new(err).into_boxed()));
let req_ctx = ReqContext::new(
"invalid",
kvrpcpb::Context::new(),
&[],
Duration::from_secs(60), // Large enough to avoid becoming outdated error
None,
None,
None,
);
(builder, req_ctx)
}
}
}

/// Get the batch row limit configuration.
#[inline]
fn get_batch_row_limit(&self, is_streaming: bool) -> usize {
Expand Down Expand Up @@ -293,39 +268,46 @@ impl<E: Engine> Endpoint<E> {
})
}

/// Handle a unary request and run on the read pool. Returns a future producing the
/// result, which must be a `Response` and will never fail. If there are errors during
/// handling, they will be embedded in the `Response`.
/// Handle a unary request and run on the read pool.
///
/// Returns `Err(err)` if the read pool is full. Returns `Ok(future)` in other cases.
/// The future inside may be an error however.
fn handle_unary_request(
&self,
req_ctx: ReqContext,
handler_builder: RequestHandlerBuilder<E::Snap>,
) -> impl Future<Item = coppb::Response, Error = ()> {
) -> Result<impl Future<Item = coppb::Response, Error = Error>> {
let engine = self.engine.clone();
let priority = readpool::Priority::from(req_ctx.context.get_priority());
// box the tracker so that moving it is cheap.
let mut tracker = Box::new(Tracker::new(req_ctx));

let result = self.read_pool.future_execute(priority, move |ctxd| {
tracker.attach_ctxd(ctxd);

Self::handle_unary_request_impl(engine, tracker, handler_builder)
});

future::result(result)
// If the read pool is full, an error response will be returned directly.
self.read_pool
.future_execute(priority, move |ctxd| {
tracker.attach_ctxd(ctxd);
Self::handle_unary_request_impl(engine, tracker, handler_builder)
})
.map_err(|_| Error::Full)
.flatten()
.or_else(|e| Ok(make_error_response(e)))
}

/// Parses and handles a unary request. Returns a future that will never fail. If there are
/// errors during parsing or handling, they will be converted into a `Response` as the success
/// result of the future.
#[inline]
pub fn parse_and_handle_unary_request(
&self,
req: coppb::Request,
peer: Option<String>,
) -> impl Future<Item = coppb::Response, Error = ()> {
let (handler_builder, req_ctx) = self.parse_request(req, peer, false);
self.handle_unary_request(req_ctx, handler_builder)
let result_of_future =
self.parse_request(req, peer, false)
.and_then(|(handler_builder, req_ctx)| {
self.handle_unary_request(req_ctx, handler_builder)
});

future::result(result_of_future)
.flatten()
.or_else(|e| Ok(make_error_response(e)))
}

/// The real implementation of handling a stream request.
Expand Down Expand Up @@ -422,62 +404,57 @@ impl<E: Engine> Endpoint<E> {
.flatten_stream()
}

/// Handle a stream request and run on the read pool. Returns a stream producing each
/// result, which must be a `Response` and will never fail. If there are errors during
/// handling, they will be embedded in the `Response`.
/// Handle a stream request and run on the read pool.
///
/// Returns `Err(err)` if the read pool is full. Returns `Ok(stream)` in other cases.
/// The stream inside may produce errors however.
fn handle_stream_request(
&self,
req_ctx: ReqContext,
handler_builder: RequestHandlerBuilder<E::Snap>,
) -> impl Stream<Item = coppb::Response, Error = ()> {
let (tx, rx) = mpsc::channel::<coppb::Response>(self.stream_channel_size);
) -> Result<impl Stream<Item = coppb::Response, Error = Error>> {
let (tx, rx) = mpsc::channel::<Result<coppb::Response>>(self.stream_channel_size);
let engine = self.engine.clone();
let priority = readpool::Priority::from(req_ctx.context.get_priority());
// Must be created befure `future_execute`, otherwise wait time is not tracked.
let mut tracker = Box::new(Tracker::new(req_ctx));

let tx1 = tx.clone();
let result = self.read_pool.future_execute(priority, move |ctxd| {
tracker.attach_ctxd(ctxd);

Self::handle_stream_request_impl(engine, tracker, handler_builder)
.or_else(|e| Ok::<_, mpsc::SendError<_>>(make_error_response(e)))
// Although returning `Ok()` from `or_else` will continue the stream,
// our stream has already ended when error is returned.
// Thus the stream will not continue any more even after we converting errors
// into a response.
.forward(tx1)
});
self.read_pool
.future_execute(priority, move |ctxd| {
tracker.attach_ctxd(ctxd);

match result {
Err(_) => {
stream::once::<_, mpsc::SendError<_>>(Ok(make_error_response(Error::Full)))
Self::handle_stream_request_impl(engine, tracker, handler_builder) // Stream<Resp, Error>
.then(Ok::<_, mpsc::SendError<_>>) // Stream<Result<Resp, Error>, MpscError>
.forward(tx)
.then(|_| {
// ignore sink send failures
Ok::<_, ()>(())
})
// Should not be blocked, since the channel is large enough to hold 1 value.
.wait()
.unwrap();
}
Ok(cpu_future) => {
})
.map_err(|_| Error::Full)
.and_then(move |cpu_future| {
// Keep running stream producer
cpu_future.forget();
}
}

rx
// Returns the stream instead of a future
Ok(rx.then(|r| r.unwrap()))
})
}

/// Parses and handles a stream request. Returns a stream that produce each result in a
/// `Response` and will never fail. If there are errors during parsing or handling, they will
/// be converted into a `Response` as the only stream item.
#[inline]
pub fn parse_and_handle_stream_request(
&self,
req: coppb::Request,
peer: Option<String>,
) -> impl Stream<Item = coppb::Response, Error = ()> {
let (handler_builder, req_ctx) = self.parse_request(req, peer, true);
self.handle_stream_request(req_ctx, handler_builder)
let result_of_stream =
self.parse_request(req, peer, true)
.and_then(|(handler_builder, req_ctx)| {
self.handle_stream_request(req_ctx, handler_builder)
}); // Result<Stream<Resp, Error>, Error>

stream::once(result_of_stream) // Stream<Stream<Resp, Error>, Error>
.flatten() // Stream<Resp, Error>
.or_else(|e| Ok(make_error_response(e))) // Stream<Resp, ()>
}
}

Expand Down Expand Up @@ -676,6 +653,7 @@ mod tests {
Box::new(|_, _: &_| Ok(UnaryFixture::new(Ok(coppb::Response::new())).into_boxed()));
let resp = cop
.handle_unary_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.wait()
.unwrap();
assert!(resp.get_other_error().is_empty());
Expand All @@ -692,11 +670,11 @@ mod tests {
None,
None,
);
let resp = cop
assert!(cop
.handle_unary_request(outdated_req_ctx, handler_builder)
.unwrap()
.wait()
.unwrap();
assert_eq!(resp.get_other_error(), OUTDATED_ERROR_MSG);
.is_err());
}

#[test]
Expand Down Expand Up @@ -806,25 +784,28 @@ mod tests {
let handler_builder = Box::new(|_, _: &_| {
Ok(UnaryFixture::new_with_duration(Ok(response), 1000).into_boxed())
});
let future = cop.handle_unary_request(ReqContext::default_for_test(), handler_builder);
let tx = tx.clone();
thread::spawn(move || tx.send(future.wait().unwrap()));
let result_of_future =
cop.handle_unary_request(ReqContext::default_for_test(), handler_builder);
match result_of_future {
Err(full_error) => {
tx.send(Err(full_error)).unwrap();
}
Ok(future) => {
let tx = tx.clone();
thread::spawn(move || {
tx.send(future.wait()).unwrap();
});
}
}
thread::sleep(Duration::from_millis(100));
}

// verify
for _ in 2..5 {
let resp: coppb::Response = rx.recv().unwrap();
assert_eq!(resp.get_data().len(), 0);
assert!(resp.has_region_error());
assert!(resp.get_region_error().has_server_is_busy());
assert_eq!(
resp.get_region_error().get_server_is_busy().get_reason(),
BUSY_ERROR_MSG
);
assert!(rx.recv().unwrap().is_err());
}
for i in 0..2 {
let resp = rx.recv().unwrap();
let resp = rx.recv().unwrap().unwrap();
assert_eq!(resp.get_data(), [1, 2, i]);
assert!(!resp.has_region_error());
}
Expand All @@ -844,6 +825,7 @@ mod tests {
});
let resp = cop
.handle_unary_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.wait()
.unwrap();
assert_eq!(resp.get_data().len(), 0);
Expand All @@ -865,6 +847,7 @@ mod tests {
});
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand All @@ -884,6 +867,7 @@ mod tests {
let handler_builder = Box::new(|_, _: &_| Ok(StreamFixture::new(responses).into_boxed()));
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand All @@ -907,6 +891,7 @@ mod tests {
let handler_builder = Box::new(|_, _: &_| Ok(StreamFixture::new(vec![]).into_boxed()));
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand Down Expand Up @@ -942,6 +927,7 @@ mod tests {
let handler_builder = Box::new(move |_, _: &_| Ok(handler.into_boxed()));
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand All @@ -967,6 +953,7 @@ mod tests {
let handler_builder = Box::new(move |_, _: &_| Ok(handler.into_boxed()));
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand All @@ -992,6 +979,7 @@ mod tests {
let handler_builder = Box::new(move |_, _: &_| Ok(handler.into_boxed()));
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand Down Expand Up @@ -1029,6 +1017,7 @@ mod tests {
let handler_builder = Box::new(move |_, _: &_| Ok(handler.into_boxed()));
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.take(7)
.collect()
.wait()
Expand Down Expand Up @@ -1086,8 +1075,9 @@ mod tests {
)
.into_boxed())
});
let resp_future_1 =
cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder);
let resp_future_1 = cop
.handle_unary_request(req_with_exec_detail.clone(), handler_builder)
.unwrap();
let sender = tx.clone();
thread::spawn(move || sender.send(vec![resp_future_1.wait().unwrap()]).unwrap());
// Sleep a while to make sure that thread is spawn and snapshot is taken.
Expand All @@ -1100,8 +1090,9 @@ mod tests {
.into_boxed(),
)
});
let resp_future_2 =
cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder);
let resp_future_2 = cop
.handle_unary_request(req_with_exec_detail.clone(), handler_builder)
.unwrap();
let sender = tx.clone();
thread::spawn(move || sender.send(vec![resp_future_2.wait().unwrap()]).unwrap());
thread::sleep(Duration::from_millis(SNAPSHOT_DURATION_MS as u64));
Expand Down Expand Up @@ -1159,8 +1150,9 @@ mod tests {
)
.into_boxed())
});
let resp_future_1 =
cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder);
let resp_future_1 = cop
.handle_unary_request(req_with_exec_detail.clone(), handler_builder)
.unwrap();
let sender = tx.clone();
thread::spawn(move || sender.send(vec![resp_future_1.wait().unwrap()]).unwrap());
// Sleep a while to make sure that thread is spawn and snapshot is taken.
Expand All @@ -1182,8 +1174,9 @@ mod tests {
)
.into_boxed())
});
let resp_future_3 =
cop.handle_stream_request(req_with_exec_detail.clone(), handler_builder);
let resp_future_3 = cop
.handle_stream_request(req_with_exec_detail.clone(), handler_builder)
.unwrap();
let sender = tx.clone();
thread::spawn(move || {
sender
Expand Down
Loading

0 comments on commit eed94db

Please sign in to comment.