From 707eef1bc016b33c1c585e8f7cc1ec7993649707 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 24 Jan 2023 17:48:23 +0100 Subject: [PATCH 1/2] Fix HTTP2StreamChannel leak --- .../HTTP2/HTTP2ClientRequestHandler.swift | 12 +-- .../HTTP2/HTTP2Connection.swift | 9 +- .../HTTP2ClientRequestHandlerTests.swift | 3 +- .../HTTP2ConnectionTests+XCTest.swift | 1 + .../HTTP2ConnectionTests.swift | 93 +++++++++++++++++++ 5 files changed, 109 insertions(+), 9 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift index 578b83029..48e77d3c5 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift @@ -239,21 +239,21 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { private func runSuccessfulFinalAction(_ action: HTTPRequestStateMachine.Action.FinalSuccessfulRequestAction, context: ChannelHandlerContext) { switch action { - case .close: - context.close(promise: nil) + case .close, .none: + break case .sendRequestEnd(let writePromise): context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise) - - case .none: - break } } private func runFailedFinalAction(_ action: HTTPRequestStateMachine.Action.FinalFailedRequestAction, context: ChannelHandlerContext, error: Error) { + // We must close the http2 stream after the request has finished. This breaks a reference + // cycle in HTTP2Connection. + context.close(promise: nil) + switch action { case .close(let writePromise): - context.close(promise: nil) writePromise?.fail(error) case .none: diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift index 5859e619a..0cad92cfe 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift @@ -77,7 +77,7 @@ final class HTTP2Connection { /// We use this channel set to remember, which open streams we need to inform that /// we want to close the connection. The channels shall than cancel their currently running - /// request. + /// request. This property must only be accessed from the connections `EventLoop`. private var openStreams = Set() let id: HTTPConnectionPool.Connection.ID let decompression: HTTPClient.Decompression @@ -241,7 +241,7 @@ final class HTTP2Connection { // before. let box = ChannelBox(channel) self.openStreams.insert(box) - self.channel.closeFuture.whenComplete { _ in + channel.closeFuture.whenComplete { _ in self.openStreams.remove(box) } @@ -287,6 +287,11 @@ final class HTTP2Connection { preconditionFailure("invalid state \(self.state)") } } + + func __forTesting_getStreamChannels() -> [Channel] { + self.channel.eventLoop.preconditionInEventLoop() + return self.openStreams.map { $0.channel } + } } extension HTTP2Connection: HTTP2IdleHandlerDelegate { diff --git a/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift b/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift index 5dfce3f9d..ec3047758 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift @@ -335,6 +335,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase { // the handler only writes once the channel is writable XCTAssertEqual(try embedded.readOutbound(as: HTTPClientRequestPart.self), .none) + XCTAssertTrue(embedded.isActive) embedded.isWritable = true embedded.pipeline.fireChannelWritabilityChanged() @@ -342,7 +343,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase { XCTAssertEqual($0 as? WriteError, WriteError()) } - XCTAssertEqual(embedded.isActive, false) + XCTAssertFalse(embedded.isActive) } } diff --git a/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests+XCTest.swift index 06b60f757..f26ca7d38 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests+XCTest.swift @@ -30,6 +30,7 @@ extension HTTP2ConnectionTests { ("testSimpleGetRequest", testSimpleGetRequest), ("testEveryDoneRequestLeadsToAStreamAvailableCall", testEveryDoneRequestLeadsToAStreamAvailableCall), ("testCancelAllRunningRequests", testCancelAllRunningRequests), + ("testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone", testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift b/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift index 652884a84..951a64494 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift @@ -243,6 +243,99 @@ class HTTP2ConnectionTests: XCTestCase { XCTAssertNoThrow(try http2Connection.closeFuture.wait()) } + + func testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone() { + class SucceedPromiseOnRequestHandler: ChannelInboundHandler { + typealias InboundIn = HTTPServerRequestPart + typealias OutboundOut = HTTPServerResponsePart + + let dataArrivedPromise: EventLoopPromise + let triggerResponseFuture: EventLoopFuture + + init(dataArrivedPromise: EventLoopPromise, triggerResponseFuture: EventLoopFuture) { + self.dataArrivedPromise = dataArrivedPromise + self.triggerResponseFuture = triggerResponseFuture + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + self.dataArrivedPromise.succeed(()) + + self.triggerResponseFuture.hop(to: context.eventLoop).whenSuccess { + switch self.unwrapInboundIn(data) { + case .head: + context.write(self.wrapOutboundOut(.head(.init(version: .http2, status: .ok))), promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + case .body, .end: + break + } + } + } + } + + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let serverReceivedRequestPromise = eventLoop.makePromise(of: Void.self) + let triggerResponsePromise = eventLoop.makePromise(of: Void.self) + let httpBin = HTTPBin(.http2(compress: false)) { _ in + SucceedPromiseOnRequestHandler( + dataArrivedPromise: serverReceivedRequestPromise, + triggerResponseFuture: triggerResponsePromise.futureResult + ) + } + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + + let connectionCreator = TestConnectionCreator() + let delegate = TestHTTP2ConnectionDelegate() + var maybeHTTP2Connection: HTTP2Connection? + XCTAssertNoThrow(maybeHTTP2Connection = try connectionCreator.createHTTP2Connection( + to: httpBin.port, + delegate: delegate, + on: eventLoop + )) + guard let http2Connection = maybeHTTP2Connection else { + return XCTFail("Expected to have an HTTP2 connection here.") + } + + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .indifferent, + task: .init(eventLoop: eventLoop, logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .distantFuture, + requestOptions: .forTests(), + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + guard let requestBag = maybeRequestBag else { + return XCTFail("Expected to have a request bag at this point") + } + + http2Connection.executeRequest(requestBag) + + XCTAssertNoThrow(try serverReceivedRequestPromise.futureResult.wait()) + var channelCount: Int? + XCTAssertNoThrow(channelCount = try eventLoop.submit { http2Connection.__forTesting_getStreamChannels().count }.wait()) + XCTAssertEqual(channelCount, 1) + triggerResponsePromise.succeed(()) + + XCTAssertNoThrow(try requestBag.task.futureResult.wait()) + + // this is racy. for this reason we allow a couple of tries + var retryCount = 0 + let maxRetries = 1000 + while retryCount < maxRetries { + XCTAssertNoThrow(channelCount = try eventLoop.submit { http2Connection.__forTesting_getStreamChannels().count }.wait()) + if channelCount == 0 { + break + } + retryCount += 1 + } + XCTAssertLessThan(retryCount, maxRetries) + } } class TestConnectionCreator { From 19d3599f198a4a751bd1bbc6b2627d53a8128b64 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 13 Feb 2023 19:33:16 +0100 Subject: [PATCH 2/2] Update code comments. --- .../ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift index 9a58d8524..0e8e819e8 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift @@ -238,6 +238,9 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { private func runSuccessfulFinalAction(_ action: HTTPRequestStateMachine.Action.FinalSuccessfulRequestAction, context: ChannelHandlerContext) { switch action { case .close, .none: + // The actions returned here come from an `HTTPRequestStateMachine` that assumes http/1.1 + // semantics. For this reason we can ignore the close here, since an h2 stream is closed + // after every request anyway. break case .sendRequestEnd(let writePromise): @@ -246,8 +249,9 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { } private func runFailedFinalAction(_ action: HTTPRequestStateMachine.Action.FinalFailedRequestAction, context: ChannelHandlerContext, error: Error) { - // We must close the http2 stream after the request has finished. This breaks a reference - // cycle in HTTP2Connection. + // We must close the http2 stream after the request has finished. Since the request failed, + // we have no idea what the h2 streams state was. To be on the save side, we explicitly close + // the h2 stream. This will break a reference cycle in HTTP2Connection. context.close(promise: nil) switch action {