Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for AWS Elasticache Autodiscovery #50

Merged
merged 13 commits into from
Feb 27, 2017
Merged
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.googlecode.xmemcached</groupId>
<artifactId>xmemcached</artifactId>
<version>2.2.1-SNAPSHOT</version>
<version>2.3.0-SNAPSHOT</version>
<name>xmemcached</name>
<description>Extreme performance modern memcached client for java</description>
<url>https://github.com/killme2008/xmemcached</url>
Expand Down
23 changes: 20 additions & 3 deletions src/main/java/net/rubyeye/xmemcached/CommandFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ public Command createAuthStepCommand(String mechanism,
* @since 1.3.3
* @param key
* @param keyBytes
* @param latch TODO
* @param latch
* TODO
* @param exp
* @param noreply
* @return
Expand All @@ -281,13 +282,29 @@ public Command createTouchCommand(final String key, final byte[] keyBytes,
* @since 1.3.3
* @param key
* @param keyBytes
* @param latch TODO
* @param latch
* TODO
* @param exp
* @param noreply
* @return
*/
public Command createGetAndTouchCommand(final String key,
final byte[] keyBytes, CountDownLatch latch, int exp, boolean noreply);
final byte[] keyBytes, CountDownLatch latch, int exp,
boolean noreply);

/**
* Create a AWS ElasticCache config command, only supports Cache Engine
* Version 1.4.14 or Higher.
*
* @see <a
* href="http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/AutoDiscovery.AddingToYourClientLibrary.html">Adding
* Auto Discovery To Your Client Library</a>
* @param subCommand
* @param key
* @return
*/
public Command createAWSElasticCacheConfigCommand(String subCommand,
String key);

/**
* Get this client's protocol version
Expand Down
113 changes: 69 additions & 44 deletions src/main/java/net/rubyeye/xmemcached/XMemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class XMemcachedClient implements XMemcachedClientMBean, MemcachedClient
private boolean sanitizeKeys;
private MemcachedHandler memcachedHandler;
protected CommandFactory commandFactory;
private long opTimeout = DEFAULT_OP_TIMEOUT;
protected long opTimeout = DEFAULT_OP_TIMEOUT;
private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
protected int connectionPoolSize = DEFAULT_CONNECTION_POOL_SIZE;
protected int maxQueuedNoReplyOperations = DEFAULT_MAX_QUEUED_NOPS;
Expand Down Expand Up @@ -319,7 +319,7 @@ private final <T> GetsResponse<T> gets0(final String key,
return result;
}

private final Session sendCommand(final Command cmd)
protected final Session sendCommand(final Command cmd)
throws MemcachedException {
if (this.shutdown) {
throw new MemcachedException("Xmemcached is stopped");
Expand Down Expand Up @@ -534,53 +534,51 @@ public final void removeServer(String hostList) {
List<InetSocketAddress> addresses = AddrUtil.getAddresses(hostList);
if (addresses != null && addresses.size() > 0) {
for (InetSocketAddress address : addresses) {
// Close main sessions
Queue<Session> sessionQueue = this.connector
.getSessionByAddress(address);
if (sessionQueue != null) {
for (Session session : sessionQueue) {
if (session != null) {
// Disable auto reconnection
((MemcachedSession) session)
.setAllowReconnect(false);
// Close connection
((MemcachedSession) session).quit();
}
}
}
// Close standby sessions
List<Session> standBySession = this.connector
.getStandbySessionListByMainNodeAddr(address);
if (standBySession != null) {
for (Session session : standBySession) {
if (session != null) {
this.connector.removeReconnectRequest(session
.getRemoteSocketAddress());
// Disable auto reconnection
((MemcachedSession) session)
.setAllowReconnect(false);
// Close connection
((MemcachedSession) session).quit();
}
}
}
this.connector.removeReconnectRequest(address);
removeAddr(address);
}

}

}

protected void checkSocketAddress(InetSocketAddress address) {

protected void removeAddr(InetSocketAddress address) {
// Close main sessions
Queue<Session> sessionQueue = this.connector
.getSessionByAddress(address);
if (sessionQueue != null) {
for (Session session : sessionQueue) {
if (session != null) {
// Disable auto reconnection
((MemcachedSession) session).setAllowReconnect(false);
// Close connection
((MemcachedSession) session).quit();
}
}
}
// Close standby sessions
List<Session> standBySession = this.connector
.getStandbySessionListByMainNodeAddr(address);
if (standBySession != null) {
for (Session session : standBySession) {
if (session != null) {
this.connector.removeReconnectRequest(session
.getRemoteSocketAddress());
// Disable auto reconnection
((MemcachedSession) session).setAllowReconnect(false);
// Close connection
((MemcachedSession) session).quit();
}
}
}
this.connector.removeReconnectRequest(address);
}

private void connect(final InetSocketAddressWrapper inetSocketAddressWrapper)
protected void connect(
final InetSocketAddressWrapper inetSocketAddressWrapper)
throws IOException {
// creat connection pool
InetSocketAddress inetSocketAddress = inetSocketAddressWrapper
.getInetSocketAddress();
this.checkSocketAddress(inetSocketAddress);
if (this.connectionPoolSize > 1) {
log.warn("You are using connection pool for xmemcached client,it's not recommended unless you have test it that it can boost performance in your app.");
}
Expand Down Expand Up @@ -804,25 +802,33 @@ public final void setBufferAllocator(final BufferAllocator bufferAllocator) {
* @throws IOException
*/
public XMemcachedClient(final InetSocketAddress inetSocketAddress,
int weight) throws IOException {
int weight, CommandFactory cmdFactory) throws IOException {
super();
if (inetSocketAddress == null) {
throw new IllegalArgumentException("Null InetSocketAddress");

}
if (cmdFactory == null) {
throw new IllegalArgumentException("Null command factory.");
}
if (weight <= 0) {
throw new IllegalArgumentException("weight<=0");
}
this.buildConnector(new ArrayMemcachedSessionLocator(),
new SimpleBufferAllocator(),
XMemcachedClientBuilder.getDefaultConfiguration(),
XMemcachedClientBuilder.getDefaultSocketOptions(),
new TextCommandFactory(), new SerializingTranscoder());
XMemcachedClientBuilder.getDefaultSocketOptions(), cmdFactory,
new SerializingTranscoder());
this.start0();
this.connect(new InetSocketAddressWrapper(inetSocketAddress,
this.serverOrderCount.incrementAndGet(), weight, null));
}

public XMemcachedClient(final InetSocketAddress inetSocketAddress,
int weight) throws IOException {
this(inetSocketAddress, weight, new TextCommandFactory());
}

public XMemcachedClient(final InetSocketAddress inetSocketAddress)
throws IOException {
this(inetSocketAddress, 1);
Expand Down Expand Up @@ -999,16 +1005,33 @@ private final boolean isWindowsPlatform() {
*/
public XMemcachedClient(List<InetSocketAddress> addressList)
throws IOException {
this(addressList, new TextCommandFactory());
}

/**
* XMemcached Constructor.Every server's weight is one by default.
*
* @param cmdFactory
* command factory
* @param addressList
* memcached server socket address list.
* @throws IOException
*/
public XMemcachedClient(List<InetSocketAddress> addressList,
CommandFactory cmdFactory) throws IOException {
super();
if (cmdFactory == null) {
throw new IllegalArgumentException("Null command factory.");
}
if (addressList == null || addressList.isEmpty()) {
throw new IllegalArgumentException("Empty address list");
}
BufferAllocator simpleBufferAllocator = new SimpleBufferAllocator();
this.buildConnector(new ArrayMemcachedSessionLocator(),
simpleBufferAllocator,
XMemcachedClientBuilder.getDefaultConfiguration(),
XMemcachedClientBuilder.getDefaultSocketOptions(),
new TextCommandFactory(), new SerializingTranscoder());
XMemcachedClientBuilder.getDefaultSocketOptions(), cmdFactory,
new SerializingTranscoder());
this.start0();
for (InetSocketAddress inetSocketAddress : addressList) {
this.connect(new InetSocketAddressWrapper(inetSocketAddress,
Expand Down Expand Up @@ -1926,7 +1949,8 @@ private boolean delete0(String key, final int time, long cas,
return (Boolean) command.getResult();
}

void checkException(final Command command) throws MemcachedException {
protected void checkException(final Command command)
throws MemcachedException {
if (command.getException() != null) {
if (command.getException() instanceof MemcachedException) {
throw (MemcachedException) command.getException();
Expand Down Expand Up @@ -2404,6 +2428,7 @@ public final void shutdown() throws IOException {
return;
}
this.shutdown = true;
this.connector.shuttingDown();
this.connector.quitAllSessions();
this.connector.stop();
this.memcachedHandler.stop();
Expand Down Expand Up @@ -2523,7 +2548,7 @@ private final <T> boolean sendStoreCommand(Command command, long timeout)

private static final String CONTINUOUS_TIMEOUT_COUNTER = "ContinuousTimeouts";

private void latchWait(final Command cmd, final long timeout,
protected void latchWait(final Command cmd, final long timeout,
final Session session) throws InterruptedException,
TimeoutException {
if (cmd.getLatch().await(timeout, TimeUnit.MILLISECONDS)) {
Expand Down
Loading