Skip to content

Commit

Permalink
restore cancel handle for node http request
Browse files Browse the repository at this point in the history
  • Loading branch information
kt3k committed Dec 10, 2024
1 parent 3ec165c commit 0e3e450
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 10 deletions.
39 changes: 31 additions & 8 deletions ext/node/ops/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::Canceled;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_fetch::FetchCancelHandle;
use deno_fetch::FetchError;
use deno_fetch::FetchReturn;
use deno_fetch::ResBody;
use deno_net::io::TcpStreamResource;
use deno_net::ops_tls::TlsStreamResource;
Expand All @@ -45,7 +49,6 @@ use http::header::HeaderValue;
use http::header::AUTHORIZATION;
use http::header::CONTENT_LENGTH;
use http::Method;
use http::Response;
use http_body_util::BodyExt;
use hyper::body::Frame;
use hyper::body::Incoming;
Expand All @@ -68,9 +71,11 @@ pub struct NodeHttpResponse {
pub error: Option<String>,
}

type CancelableResponseResult =
Result<Result<http::Response<Incoming>, Error>, Canceled>;

pub struct NodeHttpClientResponse {
response:
Pin<Box<dyn Future<Output = Result<Response<Incoming>, Error>> + Send>>,
response: Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
url: String,
}

Expand Down Expand Up @@ -98,7 +103,7 @@ pub async fn op_node_http_request_with_conn<P>(
#[smi] body: Option<ResourceId>,
#[smi] conn_rid: ResourceId,
encrypted: bool,
) -> Result<ResourceId, AnyError>
) -> Result<FetchReturn, AnyError>
where
P: crate::NodePermissions + 'static,
{
Expand Down Expand Up @@ -213,16 +218,34 @@ where
request.headers_mut().insert(CONTENT_LENGTH, len.into());
}

let res = sender.send_request(request).map_err(Error::from).boxed();
let cancel_handle = CancelHandle::new_rc();
let cancel_handle_ = cancel_handle.clone();

let fut = async move {
sender
.send_request(request)
.map_err(Error::from)
.or_cancel(cancel_handle_)
.await
};

let rid = state
.borrow_mut()
.resource_table
.add(NodeHttpClientResponse {
response: res,
response: Box::pin(fut),
url: url.clone(),
});

Ok(rid)
let cancel_handle_rid = state
.borrow_mut()
.resource_table
.add(FetchCancelHandle(cancel_handle));

Ok(FetchReturn {
request_rid: rid,
cancel_handle_rid: Some(cancel_handle_rid),
})
}

#[op2(async)]
Expand All @@ -238,7 +261,7 @@ pub async fn op_node_http_await_response(
let resource = Rc::try_unwrap(resource)
.map_err(|_| bad_resource("NodeHttpClientResponse"))?;

let res = resource.response.await?;
let res = resource.response.await??;
let status = res.status();
let mut res_headers = Vec::new();
for (key, val) in res.headers().iter() {
Expand Down
17 changes: 15 additions & 2 deletions ext/node/polyfills/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ import {
import { getTimerDuration } from "ext:deno_node/internal/timers.mjs";
import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.ts";
import { headersEntries } from "ext:deno_fetch/20_headers.js";
import { timerId } from "ext:deno_web/03_abort_signal.js";
import { clearTimeout as webClearTimeout } from "ext:deno_web/02_timers.js";
import { resourceForReadableStream } from "ext:deno_web/06_streams.js";
import { UpgradedConn } from "ext:deno_net/01_net.js";
import { STATUS_CODES } from "node:_http_server";
Expand Down Expand Up @@ -463,7 +465,7 @@ class ClientRequest extends OutgoingMessage {
alpnProtocols: ["http/1.0", "http/1.1"],
});
}
const rid = await op_node_http_request_with_conn(
this._req = await op_node_http_request_with_conn(
this.method,
url,
headers,
Expand All @@ -472,7 +474,14 @@ class ClientRequest extends OutgoingMessage {
this._encrypted,
);
this._flushBuffer();
const res = await op_node_http_await_response(rid);
const res = await op_node_http_await_response(this._req!.requestRid);
if (this._req.cancelHandleRid !== null) {
core.tryClose(this._req.cancelHandleRid);
}
if (this._timeout) {
this._timeout.removeEventListener("abort", this._timeoutCb);
webClearTimeout(this._timeout[timerId]);
}
const incoming = new IncomingMessageForClient(this.socket);
incoming.req = this;
this.res = incoming;
Expand Down Expand Up @@ -543,6 +552,10 @@ class ClientRequest extends OutgoingMessage {
this.emit("response", incoming);
}
} catch (err) {
if (this._req.cancelHandleRid !== null) {
core.tryClose(this._req.cancelHandleRid);
}

if (this._requestSendError !== undefined) {
// if the request body stream errored, we want to propagate that error
// instead of the original error from opFetchSend
Expand Down

0 comments on commit 0e3e450

Please sign in to comment.