From 6d6cd8ffea122f06570e4c85adae4e4dcd6a6943 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Sat, 1 Oct 2016 19:55:04 +0800 Subject: [PATCH] (fix) Fixed decoding command may block recator thread, close #30. --- .../yanf4j/nio/impl/AbstractNioSession.java | 2 +- .../rubyeye/xmemcached/command/Command.java | 10 +++++++ .../xmemcached/impl/MemcachedHandler.java | 10 +++---- .../xmemcached/impl/MemcachedTCPSession.java | 10 ++++++- .../impl/MemcachedHandlerUnitTest.java | 28 +++++++++---------- 5 files changed, 39 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/google/code/yanf4j/nio/impl/AbstractNioSession.java b/src/main/java/com/google/code/yanf4j/nio/impl/AbstractNioSession.java index 556cbc395..14169673f 100644 --- a/src/main/java/com/google/code/yanf4j/nio/impl/AbstractNioSession.java +++ b/src/main/java/com/google/code/yanf4j/nio/impl/AbstractNioSession.java @@ -95,7 +95,7 @@ protected void onWrite(SelectionKey key) { isLockedByMe = true; WriteMessage currentMessage = null; - // make read/write fail, write/read=3/2 + // make read/write fail, write/read=2/1 final long maxWritten = readBuffer.capacity() + readBuffer.capacity() >>> 1; try { long written = 0; diff --git a/src/main/java/net/rubyeye/xmemcached/command/Command.java b/src/main/java/net/rubyeye/xmemcached/command/Command.java index ffdd5d301..71abadf93 100644 --- a/src/main/java/net/rubyeye/xmemcached/command/Command.java +++ b/src/main/java/net/rubyeye/xmemcached/command/Command.java @@ -39,6 +39,16 @@ public abstract class Command implements WriteMessage { public static final byte REQUEST_MAGIC_NUMBER = (byte) (0x80 & 0xFF); public static final byte RESPONSE_MAGIC_NUMBER = (byte) (0x81 & 0xFF); + + private boolean added; + + public boolean isAdded() { + return added; + } + + public void setAdded(boolean added) { + this.added = added; + } public final Object getMessage() { return this; diff --git a/src/main/java/net/rubyeye/xmemcached/impl/MemcachedHandler.java b/src/main/java/net/rubyeye/xmemcached/impl/MemcachedHandler.java index bff5370e0..311f58cb3 100644 --- a/src/main/java/net/rubyeye/xmemcached/impl/MemcachedHandler.java +++ b/src/main/java/net/rubyeye/xmemcached/impl/MemcachedHandler.java @@ -118,18 +118,18 @@ public void setEnableHeartBeat(boolean enableHeartBeat) { public final void onMessageSent(Session session, Object msg) { Command command = (Command) msg; command.setStatus(OperationStatus.SENT); - if (!command.isNoreply() - || this.client.getProtocol() == Protocol.Binary) { - ((MemcachedTCPSession) session).addCommand(command); - } // After message sent,we can set the buffer to be null for gc friendly. command.setIoBuffer(EMPTY_BUF); switch (command.getCommandType()) { + case ADD: + case APPEND: case SET: case SET_MANY: // After message sent,we can set the value to be null for gc // friendly. - ((StoreCommand) command).setValue(null); + if (command instanceof StoreCommand) { + ((StoreCommand) command).setValue(null); + } break; } } diff --git a/src/main/java/net/rubyeye/xmemcached/impl/MemcachedTCPSession.java b/src/main/java/net/rubyeye/xmemcached/impl/MemcachedTCPSession.java index 628f0e1d3..590f47d51 100644 --- a/src/main/java/net/rubyeye/xmemcached/impl/MemcachedTCPSession.java +++ b/src/main/java/net/rubyeye/xmemcached/impl/MemcachedTCPSession.java @@ -28,6 +28,7 @@ import net.rubyeye.xmemcached.exception.MemcachedException; import net.rubyeye.xmemcached.networking.MemcachedSession; import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper; +import net.rubyeye.xmemcached.utils.Protocol; import com.google.code.yanf4j.core.WriteMessage; import com.google.code.yanf4j.core.impl.FutureImpl; @@ -144,13 +145,20 @@ protected WriteMessage preprocessWriteMessage(WriteMessage writeMessage) { } if (currentCommand.getStatus() == OperationStatus.SENDING) { /** - * optimieze commands + * optimize commands */ currentCommand = this.optimiezer.optimize(currentCommand, this.writeQueue, this.commandAlreadySent, this.sendBufferSize); } + currentCommand.setStatus(OperationStatus.WRITING); + if ((!currentCommand.isNoreply() || this.commandFactory.getProtocol() == Protocol.Binary) + && !currentCommand.isAdded()) { + currentCommand.setAdded(true); + this.addCommand(currentCommand); + } + return currentCommand; } diff --git a/src/test/java/net/rubyeye/xmemcached/test/unittest/impl/MemcachedHandlerUnitTest.java b/src/test/java/net/rubyeye/xmemcached/test/unittest/impl/MemcachedHandlerUnitTest.java index 5cbfce754..13a9e20df 100644 --- a/src/test/java/net/rubyeye/xmemcached/test/unittest/impl/MemcachedHandlerUnitTest.java +++ b/src/test/java/net/rubyeye/xmemcached/test/unittest/impl/MemcachedHandlerUnitTest.java @@ -6,6 +6,7 @@ import net.rubyeye.xmemcached.MemcachedClient; import net.rubyeye.xmemcached.command.Command; import net.rubyeye.xmemcached.command.CommandType; +import net.rubyeye.xmemcached.command.OperationStatus; import net.rubyeye.xmemcached.command.binary.BinaryStoreCommand; import net.rubyeye.xmemcached.command.binary.BinaryVersionCommand; import net.rubyeye.xmemcached.command.text.TextStoreCommand; @@ -16,6 +17,7 @@ import org.easymock.classextension.EasyMock; import org.easymock.classextension.IMocksControl; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -41,45 +43,43 @@ public void setUp() { public void testOnMessageSent_TextCommand() { Command cmd = new TextVersionCommand(new CountDownLatch(1), new InetSocketAddress(8080)); - this.session.addCommand(cmd); - EasyMock.expectLastCall(); this.mocksControl.replay(); this.handler.onMessageSent(this.session, cmd); this.mocksControl.verify(); + Assert.assertEquals(cmd.getStatus(), OperationStatus.SENT); } @Test public void testOnMessageSent_TextCommand_NoReply() { - Command cmd = new TextStoreCommand(null, null, CommandType.SET, null, 1, 1, null, - true, null); - EasyMock.expect(this.memcachedClient.getProtocol()).andReturn( - Protocol.Text); + TextStoreCommand cmd = new TextStoreCommand(null, null, + CommandType.SET, null, 1, 1, "hello", true, null); + Assert.assertEquals("hello", cmd.getValue()); this.mocksControl.replay(); this.handler.onMessageSent(this.session, cmd); this.mocksControl.verify(); + Assert.assertEquals(cmd.getStatus(), OperationStatus.SENT); + Assert.assertNull(cmd.getValue()); } @Test public void testOnMessageSent_BinaryCommand() { Command cmd = new BinaryVersionCommand(null, null); - this.session.addCommand(cmd); - EasyMock.expectLastCall(); this.mocksControl.replay(); this.handler.onMessageSent(this.session, cmd); this.mocksControl.verify(); + Assert.assertEquals(cmd.getStatus(), OperationStatus.SENT); } @Test public void testOnMessageSent_BinaryCommand_NoReply() { - Command cmd = new BinaryStoreCommand(null, null, CommandType.ADD, null, - 1, 1, null, true, null); - EasyMock.expect(this.memcachedClient.getProtocol()).andReturn( - Protocol.Binary); - this.session.addCommand(cmd); - EasyMock.expectLastCall(); + BinaryStoreCommand cmd = new BinaryStoreCommand(null, null, + CommandType.ADD, null, 1, 1, "hello", true, null); + Assert.assertEquals("hello", cmd.getValue()); this.mocksControl.replay(); this.handler.onMessageSent(this.session, cmd); this.mocksControl.verify(); + Assert.assertEquals(cmd.getStatus(), OperationStatus.SENT); + Assert.assertNull(cmd.getValue()); } }