Skip to content

Commit

Permalink
Merge pull request #60 from lucaspouzac/memcached-java-client_compati…
Browse files Browse the repository at this point in the history
…bility

Add compatibility with Gwhalin Memcached Java Client
  • Loading branch information
killme2008 authored Jun 21, 2017
2 parents 7ac7894 + bd65736 commit b56e7ec
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import java.net.InetSocketAddress;
import java.nio.ByteOrder;

import net.rubyeye.xmemcached.buffer.BufferAllocator;
import net.rubyeye.xmemcached.networking.MemcachedSession;
import net.rubyeye.xmemcached.networking.ClosedMemcachedSession;
import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;

import com.google.code.yanf4j.core.Handler;
Expand All @@ -18,7 +17,7 @@
* @author dennis
*
*/
public class ClosedMemcachedTCPSession implements MemcachedSession {
public class ClosedMemcachedTCPSession implements ClosedMemcachedSession {
private InetSocketAddressWrapper inetSocketAddressWrapper;
private volatile boolean allowReconnect = true;

Expand All @@ -28,10 +27,6 @@ public ClosedMemcachedTCPSession(
this.inetSocketAddressWrapper = inetSocketAddressWrapper;
}

public void destroy() {

}

public InetSocketAddressWrapper getInetSocketAddressWrapper() {
return this.inetSocketAddressWrapper;
}
Expand All @@ -48,17 +43,10 @@ public boolean isAllowReconnect() {
return this.allowReconnect;
}

public void quit() {

}

public void setAllowReconnect(boolean allow) {
this.allowReconnect = allow;
}

public void setBufferAllocator(BufferAllocator allocator) {

}

public void clearAttributes() {

Expand Down Expand Up @@ -201,14 +189,6 @@ public void setUseBlockingRead(boolean useBlockingRead) {

}

public boolean isAuthFailed() {
return false;
}

public void setAuthFailed(boolean authFailed) {

}

public void setUseBlockingWrite(boolean useBlockingWrite) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,32 +60,46 @@ public class KetamaMemcachedSessionLocator extends
*/
static final int DEFAULT_PORT = 11211;
private final boolean cwNginxUpstreamConsistent;
private final boolean gwhalinMemcachedJavaClientCompatibiltyConsistent;

public KetamaMemcachedSessionLocator() {
this.hashAlg = HashAlgorithm.KETAMA_HASH;
this.cwNginxUpstreamConsistent = false;
this.gwhalinMemcachedJavaClientCompatibiltyConsistent = false;
}

public KetamaMemcachedSessionLocator(boolean cwNginxUpstreamConsistent) {
this.hashAlg = HashAlgorithm.KETAMA_HASH;
this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
this.gwhalinMemcachedJavaClientCompatibiltyConsistent = false;
}

public KetamaMemcachedSessionLocator(HashAlgorithm alg) {
this.hashAlg = alg;
this.cwNginxUpstreamConsistent = false;
this.gwhalinMemcachedJavaClientCompatibiltyConsistent = false;
}

public KetamaMemcachedSessionLocator(HashAlgorithm alg,
boolean cwNginxUpstreamConsistent) {
this.hashAlg = alg;
this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
this.gwhalinMemcachedJavaClientCompatibiltyConsistent = false;
}

public KetamaMemcachedSessionLocator(HashAlgorithm alg,
boolean cwNginxUpstreamConsistent,
boolean gwhalinMemcachedJavaClientCompatibiltyConsistent) {
this.hashAlg = HashAlgorithm.KETAMA_HASH;
this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
this.gwhalinMemcachedJavaClientCompatibiltyConsistent = gwhalinMemcachedJavaClientCompatibiltyConsistent;
}

public KetamaMemcachedSessionLocator(List<Session> list, HashAlgorithm alg) {
super();
this.hashAlg = alg;
this.cwNginxUpstreamConsistent = false;
this.gwhalinMemcachedJavaClientCompatibiltyConsistent = false;
this.buildMap(list, alg);
}

Expand All @@ -102,11 +116,20 @@ private final void buildMap(Collection<Session> list, HashAlgorithm alg) {
sockStr = sockStr + ":" + serverAddress.getPort();
}
} else {
if (session instanceof MemcachedTCPSession) {
// Always use the first time resolved address.
sockStr = ((MemcachedTCPSession) session)
.getInetSocketAddressWrapper()
.getRemoteAddressStr();
if (session instanceof MemcachedSession) {
if (!gwhalinMemcachedJavaClientCompatibiltyConsistent) {
// Always use the first time resolved address.
sockStr = ((MemcachedSession) session)
.getInetSocketAddressWrapper()
.getRemoteAddressStr();
} else {
sockStr = ((MemcachedSession) session)
.getInetSocketAddressWrapper()
.getInetSocketAddress().getHostName() + ":" +
((MemcachedSession) session)
.getInetSocketAddressWrapper()
.getInetSocketAddress().getPort();
}
}
if (sockStr == null) {
sockStr = String.valueOf(session.getRemoteSocketAddress());
Expand All @@ -116,7 +139,7 @@ private final void buildMap(Collection<Session> list, HashAlgorithm alg) {
* Duplicate 160 X weight references
*/
int numReps = NUM_REPS;
if (session instanceof MemcachedTCPSession) {
if (session instanceof MemcachedSession) {
numReps *= ((MemcachedSession) session).getWeight();
}
if (alg == HashAlgorithm.KETAMA_HASH) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package net.rubyeye.xmemcached.networking;

import com.google.code.yanf4j.core.Session;

import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;

public interface ClosedMemcachedSession extends Session {

public void setAllowReconnect(boolean allow);

public boolean isAllowReconnect();

public InetSocketAddressWrapper getInetSocketAddressWrapper();

@Deprecated
public int getWeight();

@Deprecated
public int getOrder();

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,19 @@
package net.rubyeye.xmemcached.networking;

import net.rubyeye.xmemcached.buffer.BufferAllocator;
import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;

import com.google.code.yanf4j.core.Session;

/**
* Abstract interface for memcached connection.
*
* @author dennis
*
*/
public interface MemcachedSession extends Session {

public void setAllowReconnect(boolean allow);

public boolean isAllowReconnect();
public interface MemcachedSession extends ClosedMemcachedSession {

public void setBufferAllocator(BufferAllocator allocator);

public InetSocketAddressWrapper getInetSocketAddressWrapper();

public void destroy();

@Deprecated
public int getWeight();

@Deprecated
public int getOrder();

public void quit();

public boolean isAuthFailed();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package net.rubyeye.xmemcached.test.unittest;

import net.rubyeye.xmemcached.buffer.BufferAllocator;
import net.rubyeye.xmemcached.networking.MemcachedSession;
import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;

public class MockMemcachedSession extends MockSession implements MemcachedSession {


public MockMemcachedSession(int port) {
super(port);
}


public void setAllowReconnect(boolean allow) {
// TODO Auto-generated method stub

}

public boolean isAllowReconnect() {
// TODO Auto-generated method stub
return false;
}

public void setBufferAllocator(BufferAllocator allocator) {
// TODO Auto-generated method stub

}

public InetSocketAddressWrapper getInetSocketAddressWrapper() {
InetSocketAddressWrapper inetSocketAddressWrapper = new InetSocketAddressWrapper(getRemoteSocketAddress(), 1, 1, null);
inetSocketAddressWrapper.setRemoteAddressStr("localhost/127.0.0.1:" + this.port);
return inetSocketAddressWrapper;
}

public void destroy() {
// TODO Auto-generated method stub

}

public int getWeight() {
// TODO Auto-generated method stub
return 1;
}

public int getOrder() {
// TODO Auto-generated method stub
return 0;
}

public void quit() {
// TODO Auto-generated method stub

}

public boolean isAuthFailed() {
// TODO Auto-generated method stub
return false;
}

public void setAuthFailed(boolean authFailed) {
// TODO Auto-generated method stub

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

public class MockSession implements Session {
private boolean closed = false;
private final int port;
protected final int port;

public MockSession(int port) {
this.port = port;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package net.rubyeye.xmemcached.test.unittest.impl;

import static org.junit.Assert.*;

import java.util.ArrayList;
import java.util.List;

import net.rubyeye.xmemcached.HashAlgorithm;
import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator;
import net.rubyeye.xmemcached.test.unittest.MockMemcachedSession;

import org.junit.Before;
import org.junit.Test;

import com.google.code.yanf4j.core.Session;

public class KetamaMemcachedSessionLocatorGwhalinMemcachedJavaClientUnitTest extends
AbstractMemcachedSessionLocatorUnitTest {

@Before
public void setUp() {
this.locator = new KetamaMemcachedSessionLocator(HashAlgorithm.KETAMA_HASH, false, true);
}

@Test
public void testGetSessionByKey_MoreSessions() {
MockMemcachedSession session1 = new MockMemcachedSession(8080);
MockMemcachedSession session2 = new MockMemcachedSession(8081);
MockMemcachedSession session3 = new MockMemcachedSession(8082);
System.err.print(session1.getInetSocketAddressWrapper().getRemoteAddressStr());

List<Session> list = new ArrayList<Session>();
list.add(session1);
list.add(session2);
list.add(session3);
this.locator.updateSessions(list);

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session1, this.locator.getSessionByKey("a3"));

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session1, this.locator.getSessionByKey("a3"));

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session1, this.locator.getSessionByKey("a3"));

}

@Test
public void testGetSessionByKey_MoreSessions_OneClosed() {
MockMemcachedSession session1 = new MockMemcachedSession(8080);
MockMemcachedSession session2 = new MockMemcachedSession(8081);
session1.close();
MockMemcachedSession session3 = new MockMemcachedSession(8082);
List<Session> list = new ArrayList<Session>();
list.add(session1);
list.add(session2);
list.add(session3);
this.locator.updateSessions(list);

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session2, this.locator.getSessionByKey("a3"));

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session2, this.locator.getSessionByKey("a3"));

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session2, this.locator.getSessionByKey("a3"));

}

@Test
public void testGetSessionByKey_MoreSessions_OneClosed_FailureMode() {
this.locator.setFailureMode(true);
MockMemcachedSession session1 = new MockMemcachedSession(8080);
MockMemcachedSession session2 = new MockMemcachedSession(8081);
session1.close();
MockMemcachedSession session3 = new MockMemcachedSession(8082);
List<Session> list = new ArrayList<Session>();
list.add(session1);
list.add(session2);
list.add(session3);
this.locator.updateSessions(list);
assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session1, this.locator.getSessionByKey("a3"));

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session1, this.locator.getSessionByKey("a3"));

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session1, this.locator.getSessionByKey("a3"));
}

}

0 comments on commit b56e7ec

Please sign in to comment.