Skip to content

Commit

Permalink
bugfix, outbound/failover: fail-timeout should cover the connect time
Browse files Browse the repository at this point in the history
  • Loading branch information
eycorsican committed May 23, 2023
1 parent bd24848 commit b4b7da5
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 40 deletions.
49 changes: 33 additions & 16 deletions leaf/src/proxy/failover/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,29 +120,46 @@ impl OutboundDatagramHandler for Handler {
let a = &self.actors[i];

debug!(
"failover handles udp [{}] to [{}]",
"[{}] handles [{}:{}] to [{}]",
a.tag(),
sess.network,
sess.destination,
a.tag()
);

match timeout(
Duration::from_secs(self.fail_timeout as u64),
a.datagram()?.handle(
sess,
connect_datagram_outbound(sess, self.dns_client.clone(), a).await?,
),
)
.await
{
// return before timeout
let try_outbound = async move {
a.datagram()?
.handle(
sess,
connect_datagram_outbound(sess, self.dns_client.clone(), a).await?,
)
.await
};

match timeout(Duration::from_secs(self.fail_timeout as u64), try_outbound).await {
Ok(t) => match t {
// return ok
Ok(v) => return Ok(v),
// return err
Err(_) => continue,
Err(e) => {
trace!(
"[{}] failed to handle [{}:{}]: {}",
a.tag(),
sess.network,
sess.destination,
e,
);
continue;
}
},
// after timeout
Err(_) => continue,
Err(e) => {
trace!(
"[{}] failed to handle [{}:{}]: {}",
a.tag(),
sess.network,
sess.destination,
e,
);
continue;
}
}
}

Expand Down
41 changes: 17 additions & 24 deletions leaf/src/proxy/failover/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,32 +159,23 @@ impl OutboundStreamHandler for Handler {
let a = &self.actors[actor_idx];

debug!(
"failover handles tcp [{}] to [{}]",
"[{}] handles [{}:{}] to [{}]",
a.tag(),
sess.network,
sess.destination,
a.tag()
);

match timeout(
Duration::from_secs(self.fail_timeout as u64),
a.stream()?.handle(
sess,
match connect_stream_outbound(sess, self.dns_client.clone(), a).await {
Ok(v) => v,
Err(e) => {
trace!(
"[{}] failed to handle [{}]: {}",
a.tag(),
sess.destination,
e,
);
continue;
}
},
),
)
.await
{
// return before timeout
let try_outbound = async move {
a.stream()?
.handle(
sess,
connect_stream_outbound(sess, self.dns_client.clone(), a).await?,
)
.await
};

match timeout(Duration::from_secs(self.fail_timeout as u64), try_outbound).await {
Ok(t) => match t {
Ok(v) => {
// Only cache for fallback actors.
Expand All @@ -199,8 +190,9 @@ impl OutboundStreamHandler for Handler {
}
Err(e) => {
trace!(
"[{}] failed to handle [{}]: {}",
"[{}] failed to handle [{}:{}]: {}",
a.tag(),
sess.network,
sess.destination,
e,
);
Expand All @@ -209,8 +201,9 @@ impl OutboundStreamHandler for Handler {
},
Err(e) => {
trace!(
"[{}] failed to handle [{}]: {}",
"[{}] failed to handle [{}:{}]: {}",
a.tag(),
sess.network,
sess.destination,
e,
);
Expand Down

0 comments on commit b4b7da5

Please sign in to comment.