Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compatibility with Gwhalin Memcached Java Client #60

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import java.net.InetSocketAddress;
import java.nio.ByteOrder;

import net.rubyeye.xmemcached.buffer.BufferAllocator;
import net.rubyeye.xmemcached.networking.MemcachedSession;
import net.rubyeye.xmemcached.networking.ClosedMemcachedSession;
import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;

import com.google.code.yanf4j.core.Handler;
Expand All @@ -18,7 +17,7 @@
* @author dennis
*
*/
public class ClosedMemcachedTCPSession implements MemcachedSession {
public class ClosedMemcachedTCPSession implements ClosedMemcachedSession {
private InetSocketAddressWrapper inetSocketAddressWrapper;
private volatile boolean allowReconnect = true;

Expand All @@ -28,10 +27,6 @@ public ClosedMemcachedTCPSession(
this.inetSocketAddressWrapper = inetSocketAddressWrapper;
}

public void destroy() {

}

public InetSocketAddressWrapper getInetSocketAddressWrapper() {
return this.inetSocketAddressWrapper;
}
Expand All @@ -48,17 +43,10 @@ public boolean isAllowReconnect() {
return this.allowReconnect;
}

public void quit() {

}

public void setAllowReconnect(boolean allow) {
this.allowReconnect = allow;
}

public void setBufferAllocator(BufferAllocator allocator) {

}

public void clearAttributes() {

Expand Down Expand Up @@ -201,14 +189,6 @@ public void setUseBlockingRead(boolean useBlockingRead) {

}

public boolean isAuthFailed() {
return false;
}

public void setAuthFailed(boolean authFailed) {

}

public void setUseBlockingWrite(boolean useBlockingWrite) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,32 +60,46 @@ public class KetamaMemcachedSessionLocator extends
*/
static final int DEFAULT_PORT = 11211;
private final boolean cwNginxUpstreamConsistent;
private final boolean gwhalinMemcachedJavaClientCompatibiltyConsistent;

public KetamaMemcachedSessionLocator() {
this.hashAlg = HashAlgorithm.KETAMA_HASH;
this.cwNginxUpstreamConsistent = false;
this.gwhalinMemcachedJavaClientCompatibiltyConsistent = false;
}

public KetamaMemcachedSessionLocator(boolean cwNginxUpstreamConsistent) {
this.hashAlg = HashAlgorithm.KETAMA_HASH;
this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
this.gwhalinMemcachedJavaClientCompatibiltyConsistent = false;
}

public KetamaMemcachedSessionLocator(HashAlgorithm alg) {
this.hashAlg = alg;
this.cwNginxUpstreamConsistent = false;
this.gwhalinMemcachedJavaClientCompatibiltyConsistent = false;
}

public KetamaMemcachedSessionLocator(HashAlgorithm alg,
boolean cwNginxUpstreamConsistent) {
this.hashAlg = alg;
this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
this.gwhalinMemcachedJavaClientCompatibiltyConsistent = false;
}

public KetamaMemcachedSessionLocator(HashAlgorithm alg,
boolean cwNginxUpstreamConsistent,
boolean gwhalinMemcachedJavaClientCompatibiltyConsistent) {
this.hashAlg = HashAlgorithm.KETAMA_HASH;
this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
this.gwhalinMemcachedJavaClientCompatibiltyConsistent = gwhalinMemcachedJavaClientCompatibiltyConsistent;
}

public KetamaMemcachedSessionLocator(List<Session> list, HashAlgorithm alg) {
super();
this.hashAlg = alg;
this.cwNginxUpstreamConsistent = false;
this.gwhalinMemcachedJavaClientCompatibiltyConsistent = false;
this.buildMap(list, alg);
}

Expand All @@ -102,11 +116,20 @@ private final void buildMap(Collection<Session> list, HashAlgorithm alg) {
sockStr = sockStr + ":" + serverAddress.getPort();
}
} else {
if (session instanceof MemcachedTCPSession) {
// Always use the first time resolved address.
sockStr = ((MemcachedTCPSession) session)
.getInetSocketAddressWrapper()
.getRemoteAddressStr();
if (session instanceof MemcachedSession) {
if (!gwhalinMemcachedJavaClientCompatibiltyConsistent) {
// Always use the first time resolved address.
sockStr = ((MemcachedSession) session)
.getInetSocketAddressWrapper()
.getRemoteAddressStr();
} else {
sockStr = ((MemcachedSession) session)
.getInetSocketAddressWrapper()
.getInetSocketAddress().getHostName() + ":" +
((MemcachedSession) session)
.getInetSocketAddressWrapper()
.getInetSocketAddress().getPort();
}
}
if (sockStr == null) {
sockStr = String.valueOf(session.getRemoteSocketAddress());
Expand All @@ -116,7 +139,7 @@ private final void buildMap(Collection<Session> list, HashAlgorithm alg) {
* Duplicate 160 X weight references
*/
int numReps = NUM_REPS;
if (session instanceof MemcachedTCPSession) {
if (session instanceof MemcachedSession) {
numReps *= ((MemcachedSession) session).getWeight();
}
if (alg == HashAlgorithm.KETAMA_HASH) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package net.rubyeye.xmemcached.networking;

import com.google.code.yanf4j.core.Session;

import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;

public interface ClosedMemcachedSession extends Session {

public void setAllowReconnect(boolean allow);

public boolean isAllowReconnect();

public InetSocketAddressWrapper getInetSocketAddressWrapper();

@Deprecated
public int getWeight();

@Deprecated
public int getOrder();

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,19 @@
package net.rubyeye.xmemcached.networking;

import net.rubyeye.xmemcached.buffer.BufferAllocator;
import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;

import com.google.code.yanf4j.core.Session;

/**
* Abstract interface for memcached connection.
*
* @author dennis
*
*/
public interface MemcachedSession extends Session {

public void setAllowReconnect(boolean allow);

public boolean isAllowReconnect();
public interface MemcachedSession extends ClosedMemcachedSession {

public void setBufferAllocator(BufferAllocator allocator);

public InetSocketAddressWrapper getInetSocketAddressWrapper();

public void destroy();

@Deprecated
public int getWeight();

@Deprecated
public int getOrder();

public void quit();

public boolean isAuthFailed();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package net.rubyeye.xmemcached.test.unittest;

import net.rubyeye.xmemcached.buffer.BufferAllocator;
import net.rubyeye.xmemcached.networking.MemcachedSession;
import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;

public class MockMemcachedSession extends MockSession implements MemcachedSession {


public MockMemcachedSession(int port) {
super(port);
}


public void setAllowReconnect(boolean allow) {
// TODO Auto-generated method stub

}

public boolean isAllowReconnect() {
// TODO Auto-generated method stub
return false;
}

public void setBufferAllocator(BufferAllocator allocator) {
// TODO Auto-generated method stub

}

public InetSocketAddressWrapper getInetSocketAddressWrapper() {
InetSocketAddressWrapper inetSocketAddressWrapper = new InetSocketAddressWrapper(getRemoteSocketAddress(), 1, 1, null);
inetSocketAddressWrapper.setRemoteAddressStr("localhost/127.0.0.1:" + this.port);
return inetSocketAddressWrapper;
}

public void destroy() {
// TODO Auto-generated method stub

}

public int getWeight() {
// TODO Auto-generated method stub
return 1;
}

public int getOrder() {
// TODO Auto-generated method stub
return 0;
}

public void quit() {
// TODO Auto-generated method stub

}

public boolean isAuthFailed() {
// TODO Auto-generated method stub
return false;
}

public void setAuthFailed(boolean authFailed) {
// TODO Auto-generated method stub

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

public class MockSession implements Session {
private boolean closed = false;
private final int port;
protected final int port;

public MockSession(int port) {
this.port = port;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package net.rubyeye.xmemcached.test.unittest.impl;

import static org.junit.Assert.*;

import java.util.ArrayList;
import java.util.List;

import net.rubyeye.xmemcached.HashAlgorithm;
import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator;
import net.rubyeye.xmemcached.test.unittest.MockMemcachedSession;

import org.junit.Before;
import org.junit.Test;

import com.google.code.yanf4j.core.Session;

public class KetamaMemcachedSessionLocatorGwhalinMemcachedJavaClientUnitTest extends
AbstractMemcachedSessionLocatorUnitTest {

@Before
public void setUp() {
this.locator = new KetamaMemcachedSessionLocator(HashAlgorithm.KETAMA_HASH, false, true);
}

@Test
public void testGetSessionByKey_MoreSessions() {
MockMemcachedSession session1 = new MockMemcachedSession(8080);
MockMemcachedSession session2 = new MockMemcachedSession(8081);
MockMemcachedSession session3 = new MockMemcachedSession(8082);
System.err.print(session1.getInetSocketAddressWrapper().getRemoteAddressStr());

List<Session> list = new ArrayList<Session>();
list.add(session1);
list.add(session2);
list.add(session3);
this.locator.updateSessions(list);

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session1, this.locator.getSessionByKey("a3"));

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session1, this.locator.getSessionByKey("a3"));

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session1, this.locator.getSessionByKey("a3"));

}

@Test
public void testGetSessionByKey_MoreSessions_OneClosed() {
MockMemcachedSession session1 = new MockMemcachedSession(8080);
MockMemcachedSession session2 = new MockMemcachedSession(8081);
session1.close();
MockMemcachedSession session3 = new MockMemcachedSession(8082);
List<Session> list = new ArrayList<Session>();
list.add(session1);
list.add(session2);
list.add(session3);
this.locator.updateSessions(list);

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session2, this.locator.getSessionByKey("a3"));

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session2, this.locator.getSessionByKey("a3"));

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session2, this.locator.getSessionByKey("a3"));

}

@Test
public void testGetSessionByKey_MoreSessions_OneClosed_FailureMode() {
this.locator.setFailureMode(true);
MockMemcachedSession session1 = new MockMemcachedSession(8080);
MockMemcachedSession session2 = new MockMemcachedSession(8081);
session1.close();
MockMemcachedSession session3 = new MockMemcachedSession(8082);
List<Session> list = new ArrayList<Session>();
list.add(session1);
list.add(session2);
list.add(session3);
this.locator.updateSessions(list);
assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session1, this.locator.getSessionByKey("a3"));

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session1, this.locator.getSessionByKey("a3"));

assertSame(session2, this.locator.getSessionByKey("a1"));
assertSame(session3, this.locator.getSessionByKey("a2"));
assertSame(session1, this.locator.getSessionByKey("a3"));
}

}