Skip to content

Commit

Permalink
update concurrency-limit to std::future (#492)
Browse files Browse the repository at this point in the history
This branch updates the `concurrency-limit` middleware to std::future.
The new implementation uses the new owned semaphore permit API added in
Tokio 0.2.19 (tokio-rs/tokio#2421). Similarly to #490, the old
implementation could not be directly translated due to `tokio::sync`'s
`Semaphore` losing its `poll`-based API in 0.2.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw authored Apr 27, 2020
1 parent 10675bd commit d0c0b22
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 89 deletions.
40 changes: 20 additions & 20 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ dependencies = [
"ring",
"rustls",
"tokio 0.1.22",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-compat",
"tokio-connect",
"tokio-current-thread",
Expand Down Expand Up @@ -790,7 +790,7 @@ dependencies = [
"rand 0.7.2",
"regex 1.0.0",
"tokio 0.1.22",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-compat",
"tokio-timer",
"tower 0.3.1",
Expand Down Expand Up @@ -899,7 +899,7 @@ dependencies = [
"futures 0.3.4",
"linkerd2-error",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-test",
"tower 0.3.1",
"tower-test",
Expand All @@ -925,10 +925,10 @@ dependencies = [
name = "linkerd2-concurrency-limit"
version = "0.1.0"
dependencies = [
"futures 0.1.26",
"linkerd2-error",
"tokio-sync",
"tower 0.1.1",
"futures 0.3.4",
"pin-project",
"tokio 0.2.19",
"tower 0.3.1",
"tracing",
]

Expand Down Expand Up @@ -964,7 +964,7 @@ dependencies = [
"futures 0.3.4",
"linkerd2-error",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-test",
]

Expand Down Expand Up @@ -1174,7 +1174,7 @@ dependencies = [
"futures 0.3.4",
"linkerd2-error",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tower 0.3.1",
"tracing-futures 0.2.3",
]
Expand All @@ -1188,7 +1188,7 @@ dependencies = [
"linkerd2-io",
"linkerd2-proxy-core",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tower 0.3.1",
]

Expand Down Expand Up @@ -1234,7 +1234,7 @@ dependencies = [
"rand 0.7.2",
"task-compat",
"tokio 0.1.22",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-connect",
"tokio-timer",
"tower 0.1.1",
Expand Down Expand Up @@ -1437,7 +1437,7 @@ dependencies = [
"futures 0.3.4",
"linkerd2-error",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tower 0.3.1",
]

Expand Down Expand Up @@ -1485,7 +1485,7 @@ dependencies = [
"linkerd2-error",
"linkerd2-stack",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-connect",
"tokio-test",
"tower 0.3.1",
Expand Down Expand Up @@ -2446,9 +2446,9 @@ dependencies = [

[[package]]
name = "tokio"
version = "0.2.17"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39fb9142eb6e9cc37f4f29144e62618440b149a138eee01a7bbe9b9226aaf17c"
checksum = "7d9c43f1bb96970e153bcbae39a65e249ccb942bd9d36dbdf086024920417c9c"
dependencies = [
"bytes 0.5.3",
"fnv",
Expand Down Expand Up @@ -2493,7 +2493,7 @@ dependencies = [
"futures-core",
"futures-util",
"pin-project-lite",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-current-thread",
"tokio-executor",
"tokio-reactor",
Expand Down Expand Up @@ -2643,7 +2643,7 @@ checksum = "09cf9705471976fa5fc6817d3fbc9c4ff9696a6647af0e5c1870c81ca7445b05"
dependencies = [
"bytes 0.5.3",
"futures-core",
"tokio 0.2.17",
"tokio 0.2.19",
]

[[package]]
Expand Down Expand Up @@ -2719,7 +2719,7 @@ dependencies = [
"futures-sink",
"log",
"pin-project-lite",
"tokio 0.2.17",
"tokio 0.2.19",
]

[[package]]
Expand Down Expand Up @@ -2748,7 +2748,7 @@ dependencies = [
"futures-core",
"futures-util",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tower-layer 0.3.0 (git+https://github.com/tower-rs/tower?rev=8752a3811788e94670c62dc0acbc9613207931b1)",
"tower-service 0.3.0",
"tracing",
Expand Down Expand Up @@ -2965,7 +2965,7 @@ checksum = "9ba4bbc2c1e4a8543c30d4c13a4c8314ed72d6e07581910f665aa13fde0153c8"
dependencies = [
"futures-util",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-test",
"tower-layer 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.3.0",
Expand Down
12 changes: 6 additions & 6 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,12 @@ impl<S> Stack<S> {
self.push(SpawnReadyLayer::new())
}

// pub fn push_concurrency_limit(
// self,
// max: usize,
// ) -> Stack<concurrency_limit::ConcurrencyLimit<S>> {
// self.push(concurrency_limit::Layer::new(max))
// }
pub fn push_concurrency_limit(
self,
max: usize,
) -> Stack<concurrency_limit::ConcurrencyLimit<S>> {
self.push(concurrency_limit::Layer::new(max))
}

pub fn push_timeout(self, timeout: Duration) -> Stack<tower::timeout::Timeout<S>> {
self.push(tower::timeout::TimeoutLayer::new(timeout))
Expand Down
8 changes: 4 additions & 4 deletions linkerd/concurrency-limit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ publish = false


[dependencies]
futures = "0.1"
linkerd2-error = { path = "../error" }
tokio-sync = "0.1"
tower = "0.1"
futures = "0.3"
tokio = { version = "0.2.19", features = ["sync"] }
tower = { version = "0.3", default-features = false }
tracing = "0.1"
pin-project = "0.4"
116 changes: 57 additions & 59 deletions linkerd/concurrency-limit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@

#![deny(warnings, rust_2018_idioms)]

use futures::{try_ready, Future, Poll};
use linkerd2_error::Error;
use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio_sync::semaphore::{Permit, Semaphore};
use std::task::{Context, Poll};
use std::{fmt, mem};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tower::Service;
use tracing::trace;

Expand All @@ -22,20 +25,24 @@ pub struct Layer {
#[derive(Debug)]
pub struct ConcurrencyLimit<T> {
inner: T,
limit: Limit,
semaphore: Arc<Semaphore>,
state: State,
}

#[derive(Debug)]
struct Limit {
semaphore: Arc<Semaphore>,
permit: Permit,
enum State {
Waiting(Pin<Box<dyn Future<Output = OwnedSemaphorePermit> + 'static>>),
Ready(OwnedSemaphorePermit),
Empty,
}

/// Future for the `ConcurrencyLimit` service.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
inner: T,
semaphore: Arc<Semaphore>,
// We only keep this around so that it is dropped when the future completes.
_permit: OwnedSemaphorePermit,
}

impl From<Arc<Semaphore>> for Layer {
Expand Down Expand Up @@ -63,10 +70,8 @@ impl<T> ConcurrencyLimit<T> {
pub fn new(inner: T, semaphore: Arc<Semaphore>) -> Self {
ConcurrencyLimit {
inner,
limit: Limit {
semaphore,
permit: Permit::new(),
},
semaphore,
state: State::Empty,
}
}

Expand All @@ -89,45 +94,42 @@ impl<T> ConcurrencyLimit<T> {
impl<S, Request> Service<Request> for ConcurrencyLimit<S>
where
S: Service<Request>,
S::Error: Into<Error>,
{
type Response = S::Response;
type Error = Error;
type Error = S::Error;
type Future = ResponseFuture<S::Future>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
trace!(available = %self.limit.semaphore.available_permits(), "acquiring permit");
try_ready!(self
.limit
.permit
.poll_acquire(&self.limit.semaphore)
.map_err(Error::from));

self.inner.poll_ready().map_err(Into::into)
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
trace!(available = %self.semaphore.available_permits(), "acquiring permit");
loop {
self.state = match self.state {
State::Ready(_) => {
trace!(available = %self.semaphore.available_permits(), "permit acquired");
return Poll::Ready(Ok(()));
}
State::Waiting(ref mut fut) => {
tokio::pin!(fut);
let permit = futures::ready!(fut.poll(cx));
State::Ready(permit)
}
State::Empty => State::Waiting(Box::pin(self.semaphore.clone().acquire_owned())),
};
}
}

fn call(&mut self, request: Request) -> Self::Future {
// Make sure a permit has been acquired
if self
.limit
.permit
.try_acquire(&self.limit.semaphore)
.is_err()
{
panic!("max requests in-flight; poll_ready must be called first");
}
let _permit = match mem::replace(&mut self.state, State::Empty) {
// Take the permit.
State::Ready(permit) => permit,
// whoopsie!
_ => panic!("max requests in-flight; poll_ready must be called first"),
};

// Call the inner service
let inner = self.inner.call(request);

// Forget the permit, the permit will be returned when
// `future::ResponseFuture` is dropped.
self.limit.permit.forget();

ResponseFuture {
inner,
semaphore: self.limit.semaphore.clone(),
}
ResponseFuture { inner, _permit }
}
}

Expand All @@ -138,36 +140,32 @@ where
fn clone(&self) -> ConcurrencyLimit<S> {
ConcurrencyLimit {
inner: self.inner.clone(),
limit: Limit {
semaphore: self.limit.semaphore.clone(),
permit: Permit::new(),
},
semaphore: self.semaphore.clone(),
state: State::Empty,
}
}
}

impl Drop for Limit {
fn drop(&mut self) {
self.permit.release(&self.semaphore);
}
}

impl<T> Future for ResponseFuture<T>
where
T: Future,
T::Error: Into<Error>,
{
type Item = T::Item;
type Error = Error;
type Output = T::Output;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll().map_err(Into::into)
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
}

impl<T> Drop for ResponseFuture<T> {
fn drop(&mut self) {
trace!(available = %self.semaphore.available_permits() + 1, "releasing permit");
self.semaphore.add_permits(1);
impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
State::Waiting(_) => f
.debug_tuple("State::Waiting")
.field(&format_args!("..."))
.finish(),
State::Ready(ref r) => f.debug_tuple("State::Ready").field(&r).finish(),
State::Empty => f.debug_tuple("State::Empty").finish(),
}
}
}

0 comments on commit d0c0b22

Please sign in to comment.