Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix HTTP2StreamChannel leak #657

Merged
merged 3 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,21 +237,25 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {

private func runSuccessfulFinalAction(_ action: HTTPRequestStateMachine.Action.FinalSuccessfulRequestAction, context: ChannelHandlerContext) {
switch action {
case .close:
context.close(promise: nil)
case .close, .none:
glbrntt marked this conversation as resolved.
Show resolved Hide resolved
// The actions returned here come from an `HTTPRequestStateMachine` that assumes http/1.1
// semantics. For this reason we can ignore the close here, since an h2 stream is closed
// after every request anyway.
break

case .sendRequestEnd(let writePromise):
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)

case .none:
break
}
}

private func runFailedFinalAction(_ action: HTTPRequestStateMachine.Action.FinalFailedRequestAction, context: ChannelHandlerContext, error: Error) {
// We must close the http2 stream after the request has finished. Since the request failed,
// we have no idea what the h2 streams state was. To be on the save side, we explicitly close
// the h2 stream. This will break a reference cycle in HTTP2Connection.
context.close(promise: nil)
glbrntt marked this conversation as resolved.
Show resolved Hide resolved

switch action {
case .close(let writePromise):
context.close(promise: nil)
writePromise?.fail(error)

case .none:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ final class HTTP2Connection {

/// We use this channel set to remember, which open streams we need to inform that
/// we want to close the connection. The channels shall than cancel their currently running
/// request.
/// request. This property must only be accessed from the connections `EventLoop`.
private var openStreams = Set<ChannelBox>()
let id: HTTPConnectionPool.Connection.ID
let decompression: HTTPClient.Decompression
Expand Down Expand Up @@ -241,7 +241,7 @@ final class HTTP2Connection {
// before.
let box = ChannelBox(channel)
self.openStreams.insert(box)
self.channel.closeFuture.whenComplete { _ in
channel.closeFuture.whenComplete { _ in
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual bug fix. However I have no idea how to test it!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have run into this just today. If we send a too large header and the server sends a go away frame the child channel is closed but the parent channel not. This might be a good way to test this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can try to test this by triggering the shutdown event and finding which channels get sent this user event. So long as we can hook the parent channel we should observe that it doesn't see it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is: How do I get access to the stream/child channel? All the ideas I had so far a quite locked down. Of course I can add an internal function that gives me the currently active channels.

func forTestingGetActiveChildChannels -> EventLoopFuture<[Channel]>

wdyt?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding a testing function is fine. You can guard it behind an SPI, which is probably sufficient.

self.openStreams.remove(box)
}

Expand Down Expand Up @@ -287,6 +287,11 @@ final class HTTP2Connection {
preconditionFailure("invalid state \(self.state)")
}
}

func __forTesting_getStreamChannels() -> [Channel] {
self.channel.eventLoop.preconditionInEventLoop()
return self.openStreams.map { $0.channel }
}
}

extension HTTP2Connection: HTTP2IdleHandlerDelegate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,14 +335,15 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {

// the handler only writes once the channel is writable
XCTAssertEqual(try embedded.readOutbound(as: HTTPClientRequestPart.self), .none)
XCTAssertTrue(embedded.isActive)
embedded.isWritable = true
embedded.pipeline.fireChannelWritabilityChanged()

XCTAssertThrowsError(try requestBag.task.futureResult.wait()) {
XCTAssertEqual($0 as? WriteError, WriteError())
}

XCTAssertEqual(embedded.isActive, false)
XCTAssertFalse(embedded.isActive)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ extension HTTP2ConnectionTests {
("testSimpleGetRequest", testSimpleGetRequest),
("testEveryDoneRequestLeadsToAStreamAvailableCall", testEveryDoneRequestLeadsToAStreamAvailableCall),
("testCancelAllRunningRequests", testCancelAllRunningRequests),
("testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone", testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone),
]
}
}
93 changes: 93 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,99 @@ class HTTP2ConnectionTests: XCTestCase {

XCTAssertNoThrow(try http2Connection.closeFuture.wait())
}

func testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone() {
class SucceedPromiseOnRequestHandler: ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart
typealias OutboundOut = HTTPServerResponsePart

let dataArrivedPromise: EventLoopPromise<Void>
let triggerResponseFuture: EventLoopFuture<Void>

init(dataArrivedPromise: EventLoopPromise<Void>, triggerResponseFuture: EventLoopFuture<Void>) {
self.dataArrivedPromise = dataArrivedPromise
self.triggerResponseFuture = triggerResponseFuture
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
self.dataArrivedPromise.succeed(())

self.triggerResponseFuture.hop(to: context.eventLoop).whenSuccess {
switch self.unwrapInboundIn(data) {
case .head:
context.write(self.wrapOutboundOut(.head(.init(version: .http2, status: .ok))), promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
case .body, .end:
break
}
}
}
}

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let eventLoop = eventLoopGroup.next()
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

let serverReceivedRequestPromise = eventLoop.makePromise(of: Void.self)
let triggerResponsePromise = eventLoop.makePromise(of: Void.self)
let httpBin = HTTPBin(.http2(compress: false)) { _ in
SucceedPromiseOnRequestHandler(
dataArrivedPromise: serverReceivedRequestPromise,
triggerResponseFuture: triggerResponsePromise.futureResult
)
}
defer { XCTAssertNoThrow(try httpBin.shutdown()) }

let connectionCreator = TestConnectionCreator()
let delegate = TestHTTP2ConnectionDelegate()
var maybeHTTP2Connection: HTTP2Connection?
XCTAssertNoThrow(maybeHTTP2Connection = try connectionCreator.createHTTP2Connection(
to: httpBin.port,
delegate: delegate,
on: eventLoop
))
guard let http2Connection = maybeHTTP2Connection else {
return XCTFail("Expected to have an HTTP2 connection here.")
}

var maybeRequest: HTTPClient.Request?
var maybeRequestBag: RequestBag<ResponseAccumulator>?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)"))
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
request: XCTUnwrap(maybeRequest),
eventLoopPreference: .indifferent,
task: .init(eventLoop: eventLoop, logger: .init(label: "test")),
redirectHandler: nil,
connectionDeadline: .distantFuture,
requestOptions: .forTests(),
delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest))
))
guard let requestBag = maybeRequestBag else {
return XCTFail("Expected to have a request bag at this point")
}

http2Connection.executeRequest(requestBag)

XCTAssertNoThrow(try serverReceivedRequestPromise.futureResult.wait())
var channelCount: Int?
XCTAssertNoThrow(channelCount = try eventLoop.submit { http2Connection.__forTesting_getStreamChannels().count }.wait())
XCTAssertEqual(channelCount, 1)
triggerResponsePromise.succeed(())

XCTAssertNoThrow(try requestBag.task.futureResult.wait())

// this is racy. for this reason we allow a couple of tries
var retryCount = 0
let maxRetries = 1000
while retryCount < maxRetries {
XCTAssertNoThrow(channelCount = try eventLoop.submit { http2Connection.__forTesting_getStreamChannels().count }.wait())
if channelCount == 0 {
break
}
retryCount += 1
}
XCTAssertLessThan(retryCount, maxRetries)
}
}

class TestConnectionCreator {
Expand Down