From d2407d0ffcc45bca73a2339a456dee741114d90e Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 12 Jul 2019 09:08:17 +0200 Subject: [PATCH] Remove Redundant Setting of OP_WRITE Interest (#43653) (#44255) * Remove Redundant Setting of OP_WRITE Interest * We shouldn't have to set OP_WRITE interest before running into a partial write. Since setting OP_WRITE is handled by the `eventHandler.postHandling` logic, I think we can simply remove this operation and simplify/remove tests that were testing the setting of the write interest --- .../org/elasticsearch/nio/NioSelector.java | 5 +-- .../elasticsearch/nio/NioSelectorTests.java | 36 ------------------- 2 files changed, 3 insertions(+), 38 deletions(-) diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java index fff07f625ff13..175c7661813ba 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java @@ -28,6 +28,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; @@ -196,7 +197,8 @@ void cleanupAndCloseChannels() { cleanupPendingWrites(); channelsToClose.addAll(channelsToRegister); channelsToRegister.clear(); - channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext) sk.attachment()).collect(Collectors.toList())); + channelsToClose.addAll(selector.keys().stream() + .map(sk -> (ChannelContext) sk.attachment()).filter(Objects::nonNull).collect(Collectors.toList())); closePendingChannels(); } @@ -342,7 +344,6 @@ private void writeToChannel(WriteOperation writeOperation) { // the write operation is queued. boolean shouldFlushAfterQueuing = context.readyForFlush() == false; try { - SelectionKeyUtils.setWriteInterested(context.getSelectionKey()); context.queueWriteOperation(writeOperation); } catch (Exception e) { shouldFlushAfterQueuing = false; diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java index 55d2e645cadee..89a01d02ed42d 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; @@ -314,21 +313,6 @@ public void testQueueWriteChannelIsClosed() throws Exception { verify(listener).accept(isNull(Void.class), any(ClosedChannelException.class)); } - public void testQueueWriteSelectionKeyThrowsException() throws Exception { - SelectionKey selectionKey = mock(SelectionKey.class); - - WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener); - CancelledKeyException cancelledKeyException = new CancelledKeyException(); - executeOnNewThread(() -> selector.queueWrite(writeOperation)); - - when(channelContext.getSelectionKey()).thenReturn(selectionKey); - when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException); - selector.preSelect(); - - verify(channelContext, times(0)).queueWriteOperation(writeOperation); - verify(listener).accept(null, cancelledKeyException); - } - public void testQueueWriteSuccessful() throws Exception { WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener); executeOnNewThread(() -> selector.queueWrite(writeOperation)); @@ -338,7 +322,6 @@ public void testQueueWriteSuccessful() throws Exception { selector.preSelect(); verify(channelContext).queueWriteOperation(writeOperation); - assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0); } public void testQueueDirectlyInChannelBufferSuccessful() throws Exception { @@ -352,7 +335,6 @@ public void testQueueDirectlyInChannelBufferSuccessful() throws Exception { verify(channelContext).queueWriteOperation(writeOperation); verify(eventHandler, times(0)).handleWrite(channelContext); verify(eventHandler, times(0)).postHandling(channelContext); - assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0); } public void testShouldFlushIfNoPendingFlushes() throws Exception { @@ -366,24 +348,6 @@ public void testShouldFlushIfNoPendingFlushes() throws Exception { verify(channelContext).queueWriteOperation(writeOperation); verify(eventHandler).handleWrite(channelContext); verify(eventHandler).postHandling(channelContext); - assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0); - } - - public void testQueueDirectlyInChannelBufferSelectionKeyThrowsException() throws Exception { - SelectionKey selectionKey = mock(SelectionKey.class); - - WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener); - CancelledKeyException cancelledKeyException = new CancelledKeyException(); - - when(channelContext.getSelectionKey()).thenReturn(selectionKey); - when(channelContext.readyForFlush()).thenReturn(false); - when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException); - selector.queueWrite(writeOperation); - - verify(channelContext, times(0)).queueWriteOperation(writeOperation); - verify(eventHandler, times(0)).handleWrite(channelContext); - verify(eventHandler, times(0)).postHandling(channelContext); - verify(listener).accept(null, cancelledKeyException); } public void testConnectEvent() throws Exception {