Skip to content

Commit

Permalink
Fix HTTP2StreamChannel leak (#657)
Browse files Browse the repository at this point in the history
* Fix HTTP2StreamChannel leak

* Update code comments.
  • Loading branch information
fabianfett authored Feb 14, 2023
1 parent 9401037 commit e264599
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,21 +237,25 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {

private func runSuccessfulFinalAction(_ action: HTTPRequestStateMachine.Action.FinalSuccessfulRequestAction, context: ChannelHandlerContext) {
switch action {
case .close:
context.close(promise: nil)
case .close, .none:
// The actions returned here come from an `HTTPRequestStateMachine` that assumes http/1.1
// semantics. For this reason we can ignore the close here, since an h2 stream is closed
// after every request anyway.
break

case .sendRequestEnd(let writePromise):
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)

case .none:
break
}
}

private func runFailedFinalAction(_ action: HTTPRequestStateMachine.Action.FinalFailedRequestAction, context: ChannelHandlerContext, error: Error) {
// We must close the http2 stream after the request has finished. Since the request failed,
// we have no idea what the h2 streams state was. To be on the save side, we explicitly close
// the h2 stream. This will break a reference cycle in HTTP2Connection.
context.close(promise: nil)

switch action {
case .close(let writePromise):
context.close(promise: nil)
writePromise?.fail(error)

case .none:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ final class HTTP2Connection {

/// We use this channel set to remember, which open streams we need to inform that
/// we want to close the connection. The channels shall than cancel their currently running
/// request.
/// request. This property must only be accessed from the connections `EventLoop`.
private var openStreams = Set<ChannelBox>()
let id: HTTPConnectionPool.Connection.ID
let decompression: HTTPClient.Decompression
Expand Down Expand Up @@ -241,7 +241,7 @@ final class HTTP2Connection {
// before.
let box = ChannelBox(channel)
self.openStreams.insert(box)
self.channel.closeFuture.whenComplete { _ in
channel.closeFuture.whenComplete { _ in
self.openStreams.remove(box)
}

Expand Down Expand Up @@ -287,6 +287,11 @@ final class HTTP2Connection {
preconditionFailure("invalid state \(self.state)")
}
}

func __forTesting_getStreamChannels() -> [Channel] {
self.channel.eventLoop.preconditionInEventLoop()
return self.openStreams.map { $0.channel }
}
}

extension HTTP2Connection: HTTP2IdleHandlerDelegate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,14 +335,15 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {

// the handler only writes once the channel is writable
XCTAssertEqual(try embedded.readOutbound(as: HTTPClientRequestPart.self), .none)
XCTAssertTrue(embedded.isActive)
embedded.isWritable = true
embedded.pipeline.fireChannelWritabilityChanged()

XCTAssertThrowsError(try requestBag.task.futureResult.wait()) {
XCTAssertEqual($0 as? WriteError, WriteError())
}

XCTAssertEqual(embedded.isActive, false)
XCTAssertFalse(embedded.isActive)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ extension HTTP2ConnectionTests {
("testSimpleGetRequest", testSimpleGetRequest),
("testEveryDoneRequestLeadsToAStreamAvailableCall", testEveryDoneRequestLeadsToAStreamAvailableCall),
("testCancelAllRunningRequests", testCancelAllRunningRequests),
("testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone", testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone),
]
}
}
93 changes: 93 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,99 @@ class HTTP2ConnectionTests: XCTestCase {

XCTAssertNoThrow(try http2Connection.closeFuture.wait())
}

func testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone() {
class SucceedPromiseOnRequestHandler: ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart
typealias OutboundOut = HTTPServerResponsePart

let dataArrivedPromise: EventLoopPromise<Void>
let triggerResponseFuture: EventLoopFuture<Void>

init(dataArrivedPromise: EventLoopPromise<Void>, triggerResponseFuture: EventLoopFuture<Void>) {
self.dataArrivedPromise = dataArrivedPromise
self.triggerResponseFuture = triggerResponseFuture
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
self.dataArrivedPromise.succeed(())

self.triggerResponseFuture.hop(to: context.eventLoop).whenSuccess {
switch self.unwrapInboundIn(data) {
case .head:
context.write(self.wrapOutboundOut(.head(.init(version: .http2, status: .ok))), promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
case .body, .end:
break
}
}
}
}

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let eventLoop = eventLoopGroup.next()
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

let serverReceivedRequestPromise = eventLoop.makePromise(of: Void.self)
let triggerResponsePromise = eventLoop.makePromise(of: Void.self)
let httpBin = HTTPBin(.http2(compress: false)) { _ in
SucceedPromiseOnRequestHandler(
dataArrivedPromise: serverReceivedRequestPromise,
triggerResponseFuture: triggerResponsePromise.futureResult
)
}
defer { XCTAssertNoThrow(try httpBin.shutdown()) }

let connectionCreator = TestConnectionCreator()
let delegate = TestHTTP2ConnectionDelegate()
var maybeHTTP2Connection: HTTP2Connection?
XCTAssertNoThrow(maybeHTTP2Connection = try connectionCreator.createHTTP2Connection(
to: httpBin.port,
delegate: delegate,
on: eventLoop
))
guard let http2Connection = maybeHTTP2Connection else {
return XCTFail("Expected to have an HTTP2 connection here.")
}

var maybeRequest: HTTPClient.Request?
var maybeRequestBag: RequestBag<ResponseAccumulator>?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)"))
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
request: XCTUnwrap(maybeRequest),
eventLoopPreference: .indifferent,
task: .init(eventLoop: eventLoop, logger: .init(label: "test")),
redirectHandler: nil,
connectionDeadline: .distantFuture,
requestOptions: .forTests(),
delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest))
))
guard let requestBag = maybeRequestBag else {
return XCTFail("Expected to have a request bag at this point")
}

http2Connection.executeRequest(requestBag)

XCTAssertNoThrow(try serverReceivedRequestPromise.futureResult.wait())
var channelCount: Int?
XCTAssertNoThrow(channelCount = try eventLoop.submit { http2Connection.__forTesting_getStreamChannels().count }.wait())
XCTAssertEqual(channelCount, 1)
triggerResponsePromise.succeed(())

XCTAssertNoThrow(try requestBag.task.futureResult.wait())

// this is racy. for this reason we allow a couple of tries
var retryCount = 0
let maxRetries = 1000
while retryCount < maxRetries {
XCTAssertNoThrow(channelCount = try eventLoop.submit { http2Connection.__forTesting_getStreamChannels().count }.wait())
if channelCount == 0 {
break
}
retryCount += 1
}
XCTAssertLessThan(retryCount, maxRetries)
}
}

class TestConnectionCreator {
Expand Down

0 comments on commit e264599

Please sign in to comment.