Skip to content

Commit

Permalink
don't crash when Channel goes inactive in read triggered by write err…
Browse files Browse the repository at this point in the history
…or (#594)

Motivation:

Previously we asserted that a Channel cannot go inactive after calling
out for a read that was triggered by (draining the receive buffer after)
a write error.

This assertion was put in place to guard against `readComplete` events
sent on inactive channels. It did that job just fine but crashes aren't
great so we now conditionally fire the `readComplete` event if the
Channel stays active.

Modifications:

make the readComplete event firing conditional

Result:

fewer crashes, more happy faces
  • Loading branch information
weissi authored Aug 27, 2018
1 parent ce0c6d9 commit df764fe
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 8 deletions.
9 changes: 6 additions & 3 deletions Sources/NIO/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -443,17 +443,20 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
} catch let err {
// If there is a write error we should try drain the inbound before closing the socket as there may be some data pending.
// We ignore any error that is thrown as we will use the original err to close the channel and notify the user.
if readIfNeeded0() {
if self.readIfNeeded0() {
assert(self.lifecycleManager.isActive)

// We need to continue reading until there is nothing more to be read from the socket as we will not have another chance to drain it.
var readAtLeastOnce = false
while let read = try? readFromSocket(), read == .some {
assert(self.lifecycleManager.isActive)
readAtLeastOnce = true
}
if readAtLeastOnce && self.lifecycleManager.isActive {
pipeline.fireChannelReadComplete()
}
}

close0(error: err, mode: .all, promise: nil)
self.close0(error: err, mode: .all, promise: nil)

// we handled all writes
return .unregister
Expand Down
3 changes: 2 additions & 1 deletion Sources/NIO/ChannelPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -931,10 +931,11 @@ extension ChannelPipeline {
}

private extension CloseMode {
/// Returns the error to fail outstanding operations writes with.
var error: ChannelError {
switch self {
case .all:
return .alreadyClosed
return .ioOnClosedChannel
case .output:
return .outputClosed
case .input:
Expand Down
1 change: 1 addition & 0 deletions Tests/NIOTests/ChannelTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ extension ChannelTests {
("testFailedRegistrationOfAcceptedSocket", testFailedRegistrationOfAcceptedSocket),
("testFailedRegistrationOfServerSocket", testFailedRegistrationOfServerSocket),
("testTryingToBindOnPortThatIsAlreadyBoundFailsButDoesNotCrash", testTryingToBindOnPortThatIsAlreadyBoundFailsButDoesNotCrash),
("testCloseInReadTriggeredByDrainingTheReceiveBufferBecauseOfWriteError", testCloseInReadTriggeredByDrainingTheReceiveBufferBecauseOfWriteError),
]
}
}
Expand Down
116 changes: 116 additions & 0 deletions Tests/NIOTests/ChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2449,6 +2449,122 @@ public class ChannelTests: XCTestCase {
XCTFail("unexpected error: \(error)")
}
}

func testCloseInReadTriggeredByDrainingTheReceiveBufferBecauseOfWriteError() throws {
final class WriteWhenActiveHandler: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
typealias OutboundOut = ByteBuffer

let channelAvailablePromise: EventLoopPromise<Channel>

init(channelAvailablePromise: EventLoopPromise<Channel>) {
self.channelAvailablePromise = channelAvailablePromise
}

func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
let buffer = self.unwrapInboundIn(data)
XCTFail("unexpected read: \(String(decoding: buffer.readableBytesView, as: UTF8.self))")
}

func channelActive(ctx: ChannelHandlerContext) {
var buffer = ctx.channel.allocator.buffer(capacity: 1)
buffer.write(staticString: "X")
ctx.channel.writeAndFlush(self.wrapOutboundOut(buffer)).map { ctx.channel }.cascade(promise: self.channelAvailablePromise)
}
}

final class WriteAlwaysFailingSocket: Socket {
init() throws {
try super.init(protocolFamily: AF_INET, type: Posix.SOCK_STREAM, setNonBlocking: true)
}

override func write(pointer: UnsafeRawBufferPointer) throws -> IOResult<Int> {
throw IOError(errnoCode: ETXTBSY, function: "WriteAlwaysFailingSocket.write fake error")
}

override func writev(iovecs: UnsafeBufferPointer<IOVector>) throws -> IOResult<Int> {
throw IOError(errnoCode: ETXTBSY, function: "WriteAlwaysFailingSocket.writev fake error")
}
}

final class MakeChannelInactiveInReadCausedByWriteErrorHandler: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
typealias OutboundOut = ByteBuffer

let serverChannel: EventLoopFuture<Channel>
let allDonePromise: EventLoopPromise<Void>

init(serverChannel: EventLoopFuture<Channel>,
allDonePromise: EventLoopPromise<Void>) {
self.serverChannel = serverChannel
self.allDonePromise = allDonePromise
}

func channelActive(ctx: ChannelHandlerContext) {
XCTAssert(serverChannel.eventLoop === ctx.eventLoop)
self.serverChannel.whenSuccess { serverChannel in
// all of the following futures need to complete synchronously for this test to test the correct
// thing. Therefore we keep track if we're still on the same stack frame.
var inSameStackFrame = true
defer {
inSameStackFrame = false
}

XCTAssertTrue(serverChannel.isActive)
// we allow auto-read again to make sure that the socket buffer is drained on write error
// (cf. https://github.com/apple/swift-nio/issues/593)
ctx.channel.setOption(option: ChannelOptions.autoRead, value: true).then {
// let's trigger the write error
var buffer = ctx.channel.allocator.buffer(capacity: 16)
buffer.write(staticString: "THIS WILL FAIL ANYWAY")
return ctx.writeAndFlush(self.wrapOutboundOut(buffer))
}.map {
XCTFail("this should have failed")
}.whenFailure { error in
XCTAssertEqual(ChannelError.ioOnClosedChannel, error as? ChannelError)
XCTAssertTrue(inSameStackFrame)
self.allDonePromise.succeed(result: ())
}
}
}

func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
let buffer = self.unwrapInboundIn(data)
XCTAssertEqual("X", String(decoding: buffer.readableBytesView, as: UTF8.self))
ctx.close(promise: nil)
}
}

let singleThreadedELG = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try singleThreadedELG.syncShutdownGracefully())
}
let serverChannelAvailablePromise: EventLoopPromise<Channel> = singleThreadedELG.next().newPromise()
let allDonePromise: EventLoopPromise<Void> = singleThreadedELG.next().newPromise()
let server = try assertNoThrowWithValue(ServerBootstrap(group: singleThreadedELG)
.childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
.childChannelInitializer { channel in
channel.pipeline.add(handler: WriteWhenActiveHandler(channelAvailablePromise: serverChannelAvailablePromise))
}
.bind(host: "127.0.0.1", port: 0)
.wait())
defer {
XCTAssertNoThrow(try server.close().wait())
}

let c = try assertNoThrowWithValue(SocketChannel(socket: WriteAlwaysFailingSocket(),
parent: nil,
eventLoop: singleThreadedELG.next() as! SelectableEventLoop))
XCTAssertNoThrow(try c.setOption(option: ChannelOptions.autoRead, value: false).wait())
XCTAssertNoThrow(try c.setOption(option: ChannelOptions.allowRemoteHalfClosure, value: true).wait())
XCTAssertNoThrow(try c.pipeline.add(handler: MakeChannelInactiveInReadCausedByWriteErrorHandler(serverChannel: serverChannelAvailablePromise.futureResult,
allDonePromise: allDonePromise)).wait())
XCTAssertNoThrow(try c.register().wait())
XCTAssertNoThrow(try c.connect(to: server.localAddress!).wait())

XCTAssertNoThrow(try allDonePromise.futureResult.wait())
XCTAssertFalse(c.isActive)
}
}

fileprivate final class FailRegistrationAndDelayCloseHandler: ChannelOutboundHandler {
Expand Down
2 changes: 1 addition & 1 deletion Tests/NIOTests/DatagramChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ final class DatagramChannelTests: XCTestCase {
do {
try $0.wait()
XCTFail("Did not error")
} catch ChannelError.alreadyClosed {
} catch ChannelError.ioOnClosedChannel {
// All good
} catch {
XCTFail("Unexpected error: \(error)")
Expand Down
2 changes: 1 addition & 1 deletion Tests/NIOTests/FileRegionTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class FileRegionTest : XCTestCase {
}.wait()
XCTFail("no error happened even though we closed before flush")
} catch let e as ChannelError {
XCTAssertEqual(ChannelError.alreadyClosed, e)
XCTAssertEqual(ChannelError.ioOnClosedChannel, e)
} catch let e {
XCTFail("unexpected error \(e)")
}
Expand Down
4 changes: 2 additions & 2 deletions Tests/NIOTests/SocketChannelTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public class SocketChannelTest : XCTestCase {
let writeFut = clientChannel.write(buffer).map {
XCTFail("Must not succeed")
}.thenIfError { error in
XCTAssertEqual(error as? ChannelError, ChannelError.alreadyClosed)
XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
return clientChannel.close()
}
XCTAssertNoThrow(try clientChannel.close().wait())
Expand Down Expand Up @@ -470,7 +470,7 @@ public class SocketChannelTest : XCTestCase {
do {
try connectPromise.futureResult.wait()
XCTFail("Did not throw")
} catch let err as ChannelError where err == .alreadyClosed {
} catch let err as ChannelError where err == .ioOnClosedChannel {
// expected
}
XCTAssertNoThrow(try closePromise.futureResult.wait())
Expand Down

0 comments on commit df764fe

Please sign in to comment.