diff --git a/pom.xml b/pom.xml index bf1c61e2d..ad1ff2d31 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ 4.0.0 com.googlecode.xmemcached xmemcached - 2.2.1-SNAPSHOT + 2.3.0-SNAPSHOT xmemcached Extreme performance modern memcached client for java https://github.com/killme2008/xmemcached diff --git a/src/main/java/net/rubyeye/xmemcached/CommandFactory.java b/src/main/java/net/rubyeye/xmemcached/CommandFactory.java index 8dc10c9f9..dc9440d52 100644 --- a/src/main/java/net/rubyeye/xmemcached/CommandFactory.java +++ b/src/main/java/net/rubyeye/xmemcached/CommandFactory.java @@ -267,7 +267,8 @@ public Command createAuthStepCommand(String mechanism, * @since 1.3.3 * @param key * @param keyBytes - * @param latch TODO + * @param latch + * TODO * @param exp * @param noreply * @return @@ -281,13 +282,29 @@ public Command createTouchCommand(final String key, final byte[] keyBytes, * @since 1.3.3 * @param key * @param keyBytes - * @param latch TODO + * @param latch + * TODO * @param exp * @param noreply * @return */ public Command createGetAndTouchCommand(final String key, - final byte[] keyBytes, CountDownLatch latch, int exp, boolean noreply); + final byte[] keyBytes, CountDownLatch latch, int exp, + boolean noreply); + + /** + * Create a AWS ElasticCache config command, only supports Cache Engine + * Version 1.4.14 or Higher. + * + * @see Adding + * Auto Discovery To Your Client Library + * @param subCommand + * @param key + * @return + */ + public Command createAWSElasticCacheConfigCommand(String subCommand, + String key); /** * Get this client's protocol version diff --git a/src/main/java/net/rubyeye/xmemcached/XMemcachedClient.java b/src/main/java/net/rubyeye/xmemcached/XMemcachedClient.java index 78cf11d18..946828509 100644 --- a/src/main/java/net/rubyeye/xmemcached/XMemcachedClient.java +++ b/src/main/java/net/rubyeye/xmemcached/XMemcachedClient.java @@ -93,7 +93,7 @@ public class XMemcachedClient implements XMemcachedClientMBean, MemcachedClient private boolean sanitizeKeys; private MemcachedHandler memcachedHandler; protected CommandFactory commandFactory; - private long opTimeout = DEFAULT_OP_TIMEOUT; + protected long opTimeout = DEFAULT_OP_TIMEOUT; private long connectTimeout = DEFAULT_CONNECT_TIMEOUT; protected int connectionPoolSize = DEFAULT_CONNECTION_POOL_SIZE; protected int maxQueuedNoReplyOperations = DEFAULT_MAX_QUEUED_NOPS; @@ -319,7 +319,7 @@ private final GetsResponse gets0(final String key, return result; } - private final Session sendCommand(final Command cmd) + protected final Session sendCommand(final Command cmd) throws MemcachedException { if (this.shutdown) { throw new MemcachedException("Xmemcached is stopped"); @@ -534,53 +534,51 @@ public final void removeServer(String hostList) { List addresses = AddrUtil.getAddresses(hostList); if (addresses != null && addresses.size() > 0) { for (InetSocketAddress address : addresses) { - // Close main sessions - Queue sessionQueue = this.connector - .getSessionByAddress(address); - if (sessionQueue != null) { - for (Session session : sessionQueue) { - if (session != null) { - // Disable auto reconnection - ((MemcachedSession) session) - .setAllowReconnect(false); - // Close connection - ((MemcachedSession) session).quit(); - } - } - } - // Close standby sessions - List standBySession = this.connector - .getStandbySessionListByMainNodeAddr(address); - if (standBySession != null) { - for (Session session : standBySession) { - if (session != null) { - this.connector.removeReconnectRequest(session - .getRemoteSocketAddress()); - // Disable auto reconnection - ((MemcachedSession) session) - .setAllowReconnect(false); - // Close connection - ((MemcachedSession) session).quit(); - } - } - } - this.connector.removeReconnectRequest(address); + removeAddr(address); } } } - protected void checkSocketAddress(InetSocketAddress address) { - + protected void removeAddr(InetSocketAddress address) { + // Close main sessions + Queue sessionQueue = this.connector + .getSessionByAddress(address); + if (sessionQueue != null) { + for (Session session : sessionQueue) { + if (session != null) { + // Disable auto reconnection + ((MemcachedSession) session).setAllowReconnect(false); + // Close connection + ((MemcachedSession) session).quit(); + } + } + } + // Close standby sessions + List standBySession = this.connector + .getStandbySessionListByMainNodeAddr(address); + if (standBySession != null) { + for (Session session : standBySession) { + if (session != null) { + this.connector.removeReconnectRequest(session + .getRemoteSocketAddress()); + // Disable auto reconnection + ((MemcachedSession) session).setAllowReconnect(false); + // Close connection + ((MemcachedSession) session).quit(); + } + } + } + this.connector.removeReconnectRequest(address); } - private void connect(final InetSocketAddressWrapper inetSocketAddressWrapper) + protected void connect( + final InetSocketAddressWrapper inetSocketAddressWrapper) throws IOException { // creat connection pool InetSocketAddress inetSocketAddress = inetSocketAddressWrapper .getInetSocketAddress(); - this.checkSocketAddress(inetSocketAddress); if (this.connectionPoolSize > 1) { log.warn("You are using connection pool for xmemcached client,it's not recommended unless you have test it that it can boost performance in your app."); } @@ -804,25 +802,33 @@ public final void setBufferAllocator(final BufferAllocator bufferAllocator) { * @throws IOException */ public XMemcachedClient(final InetSocketAddress inetSocketAddress, - int weight) throws IOException { + int weight, CommandFactory cmdFactory) throws IOException { super(); if (inetSocketAddress == null) { throw new IllegalArgumentException("Null InetSocketAddress"); } + if (cmdFactory == null) { + throw new IllegalArgumentException("Null command factory."); + } if (weight <= 0) { throw new IllegalArgumentException("weight<=0"); } this.buildConnector(new ArrayMemcachedSessionLocator(), new SimpleBufferAllocator(), XMemcachedClientBuilder.getDefaultConfiguration(), - XMemcachedClientBuilder.getDefaultSocketOptions(), - new TextCommandFactory(), new SerializingTranscoder()); + XMemcachedClientBuilder.getDefaultSocketOptions(), cmdFactory, + new SerializingTranscoder()); this.start0(); this.connect(new InetSocketAddressWrapper(inetSocketAddress, this.serverOrderCount.incrementAndGet(), weight, null)); } + public XMemcachedClient(final InetSocketAddress inetSocketAddress, + int weight) throws IOException { + this(inetSocketAddress, weight, new TextCommandFactory()); + } + public XMemcachedClient(final InetSocketAddress inetSocketAddress) throws IOException { this(inetSocketAddress, 1); @@ -999,7 +1005,24 @@ private final boolean isWindowsPlatform() { */ public XMemcachedClient(List addressList) throws IOException { + this(addressList, new TextCommandFactory()); + } + + /** + * XMemcached Constructor.Every server's weight is one by default. + * + * @param cmdFactory + * command factory + * @param addressList + * memcached server socket address list. + * @throws IOException + */ + public XMemcachedClient(List addressList, + CommandFactory cmdFactory) throws IOException { super(); + if (cmdFactory == null) { + throw new IllegalArgumentException("Null command factory."); + } if (addressList == null || addressList.isEmpty()) { throw new IllegalArgumentException("Empty address list"); } @@ -1007,8 +1030,8 @@ public XMemcachedClient(List addressList) this.buildConnector(new ArrayMemcachedSessionLocator(), simpleBufferAllocator, XMemcachedClientBuilder.getDefaultConfiguration(), - XMemcachedClientBuilder.getDefaultSocketOptions(), - new TextCommandFactory(), new SerializingTranscoder()); + XMemcachedClientBuilder.getDefaultSocketOptions(), cmdFactory, + new SerializingTranscoder()); this.start0(); for (InetSocketAddress inetSocketAddress : addressList) { this.connect(new InetSocketAddressWrapper(inetSocketAddress, @@ -1926,7 +1949,8 @@ private boolean delete0(String key, final int time, long cas, return (Boolean) command.getResult(); } - void checkException(final Command command) throws MemcachedException { + protected void checkException(final Command command) + throws MemcachedException { if (command.getException() != null) { if (command.getException() instanceof MemcachedException) { throw (MemcachedException) command.getException(); @@ -2404,6 +2428,7 @@ public final void shutdown() throws IOException { return; } this.shutdown = true; + this.connector.shuttingDown(); this.connector.quitAllSessions(); this.connector.stop(); this.memcachedHandler.stop(); @@ -2523,7 +2548,7 @@ private final boolean sendStoreCommand(Command command, long timeout) private static final String CONTINUOUS_TIMEOUT_COUNTER = "ContinuousTimeouts"; - private void latchWait(final Command cmd, final long timeout, + protected void latchWait(final Command cmd, final long timeout, final Session session) throws InterruptedException, TimeoutException { if (cmd.getLatch().await(timeout, TimeUnit.MILLISECONDS)) { diff --git a/src/main/java/net/rubyeye/xmemcached/aws/AWSElasticCacheClient.java b/src/main/java/net/rubyeye/xmemcached/aws/AWSElasticCacheClient.java new file mode 100644 index 000000000..9def16f96 --- /dev/null +++ b/src/main/java/net/rubyeye/xmemcached/aws/AWSElasticCacheClient.java @@ -0,0 +1,257 @@ +package net.rubyeye.xmemcached.aws; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.code.yanf4j.core.Session; + +import net.rubyeye.xmemcached.CommandFactory; +import net.rubyeye.xmemcached.XMemcachedClient; +import net.rubyeye.xmemcached.command.Command; +import net.rubyeye.xmemcached.command.TextCommandFactory; +import net.rubyeye.xmemcached.exception.MemcachedException; +import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper; + +/** + * AWS ElasticCache Client. + * + * @since 2.3.0 + * @author dennis + * + */ +public class AWSElasticCacheClient extends XMemcachedClient implements + ConfigUpdateListener { + + private static final Logger log = LoggerFactory + .getLogger(AWSElasticCacheClient.class); + + private boolean firstTimeUpdate = true; + + private List configAddrs = new ArrayList(); + + public synchronized void onUpdate(ClusterConfigration config) { + + if (firstTimeUpdate) { + firstTimeUpdate = false; + removeConfigAddrs(); + } + + List oldList = this.currentClusterConfiguration != null ? this.currentClusterConfiguration + .getNodeList() : Collections.EMPTY_LIST; + List newList = config.getNodeList(); + + List addNodes = new ArrayList(); + List removeNodes = new ArrayList(); + + for (CacheNode node : newList) { + if (!oldList.contains(node)) { + addNodes.add(node); + } + } + + for (CacheNode node : oldList) { + if (!newList.contains(node)) { + removeNodes.add(node); + } + } + + // Begin to update server list + for (CacheNode node : addNodes) { + try { + this.connect(new InetSocketAddressWrapper(node + .getInetSocketAddress(), this.configPoller + .getCacheNodeOrder(node), 1, null)); + } catch (IOException e) { + log.error("Connect to " + node + "failed.", e); + } + } + + for (CacheNode node : removeNodes) { + try { + this.removeAddr(node.getInetSocketAddress()); + } catch (Exception e) { + log.error("Remove " + node + " failed."); + } + } + + this.currentClusterConfiguration = config; + } + + private void removeConfigAddrs() { + for (InetSocketAddress configAddr : this.configAddrs) { + this.removeAddr(configAddr); + while (this.getConnector().getSessionByAddress(configAddr) != null + && this.getConnector().getSessionByAddress(configAddr) + .size() > 0) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + + private final ConfigurationPoller configPoller; + + /** + * Default elasticcache configuration poll interval, it's one minute. + */ + public static final long DEFAULT_POLL_CONFIG_INTERVAL_MS = 60000; + + /** + * Construct an AWSElasticCacheClient instance with one config address and + * default poll interval. + * + * @since 2.3.0 + * @param addr + * config server address. + * @throws IOException + */ + public AWSElasticCacheClient(InetSocketAddress addr) throws IOException { + this(addr, DEFAULT_POLL_CONFIG_INTERVAL_MS); + } + + /** + * Construct an AWSElasticCacheClient instance with one config address and + * poll interval. + * + * @since 2.3.0 + * @param addr + * config server address. + * @param pollConfigIntervalMills + * config poll interval in milliseconds. + * @throws IOException + */ + public AWSElasticCacheClient(InetSocketAddress addr, + long pollConfigIntervalMills) throws IOException { + this(addr, pollConfigIntervalMills, new TextCommandFactory()); + } + + public AWSElasticCacheClient(InetSocketAddress addr, + long pollConfigIntervalMills, CommandFactory cmdFactory) + throws IOException { + this(asList(addr), pollConfigIntervalMills, cmdFactory); + } + + private static List asList(InetSocketAddress addr) { + List addrs = new ArrayList(); + addrs.add(addr); + return addrs; + } + + /** + * Construct an AWSElasticCacheClient instance with config server addresses + * and default config poll interval. + * + * @since 2.3.0 + * @param addrs + * config server list. + * @throws IOException + */ + public AWSElasticCacheClient(List addrs) + throws IOException { + this(addrs, DEFAULT_POLL_CONFIG_INTERVAL_MS); + } + + /** + * Construct an AWSElasticCacheClient instance with config server addresses. + * + * @since 2.3.0 + * @param addrs + * @param pollConfigIntervalMills + * @throws IOException + */ + public AWSElasticCacheClient(List addrs, + long pollConfigIntervalMills) throws IOException { + this(addrs, pollConfigIntervalMills, new TextCommandFactory()); + } + + /** + * Construct an AWSElasticCacheClient instance with config server addresses. + * + * @since 2.3.0 + * @param addrs + * config server list. + * @param pollConfigIntervalMills + * config poll interval in milliseconds. + * @param commandFactory + * protocol command factory. + * @throws IOException + */ + public AWSElasticCacheClient(List addrs, + long pollConfigIntervalMills, CommandFactory commandFactory) + throws IOException { + super(addrs, commandFactory); + if (pollConfigIntervalMills <= 0) { + throw new IllegalArgumentException( + "Invalid pollConfigIntervalMills value."); + } + // Use failure mode by default. + this.commandFactory = commandFactory; + this.setFailureMode(true); + this.configAddrs = addrs; + this.configPoller = new ConfigurationPoller(this, + pollConfigIntervalMills); + // Run at once to get config at startup. + // It will call onUpdate in the same thread. + this.configPoller.run(); + if (this.currentClusterConfiguration == null) { + throw new IllegalStateException( + "Retrieve ElasticCache config from `" + addrs.toString() + + "` failed."); + } + this.configPoller.start(); + } + + private volatile ClusterConfigration currentClusterConfiguration; + + /** + * Get cluster config from cache node by network command. + * + * @return + */ + public ClusterConfigration getConfig() throws MemcachedException, + InterruptedException, TimeoutException { + return this.getConfig("cluster"); + } + + /** + * Get config by key from cache node by network command. + * + * @since 2.3.0 + * @return clusetr config. + */ + public ClusterConfigration getConfig(String key) throws MemcachedException, + InterruptedException, TimeoutException { + Command cmd = this.commandFactory.createAWSElasticCacheConfigCommand( + "get", key); + final Session session = this.sendCommand(cmd); + this.latchWait(cmd, opTimeout, session); + cmd.getIoBuffer().free(); + this.checkException(cmd); + String result = (String) cmd.getResult(); + if (result == null) { + throw new MemcachedException( + "Operation fail,may be caused by networking or timeout"); + } + return AWSUtils.parseConfiguration(result); + } + + /** + * Get the current using configuration in memory. + * + * @since 2.3.0 + * @return current cluster config. + */ + public ClusterConfigration getCurrentConfig() { + return this.currentClusterConfiguration; + } +} diff --git a/src/main/java/net/rubyeye/xmemcached/aws/AWSUtils.java b/src/main/java/net/rubyeye/xmemcached/aws/AWSUtils.java new file mode 100644 index 000000000..a82b26484 --- /dev/null +++ b/src/main/java/net/rubyeye/xmemcached/aws/AWSUtils.java @@ -0,0 +1,62 @@ +package net.rubyeye.xmemcached.aws; + +import java.util.ArrayList; +import java.util.List; + +import net.rubyeye.xmemcached.utils.ByteUtils; + +/** + * AWS get config command + * + * @author dennis + * + */ +public class AWSUtils { + + private static final String DELIMITER = "|"; + + /** + * Parse response string to ClusterConfiguration instance. + * + * @param line + * @return + */ + public static ClusterConfigration parseConfiguration(String line) { + String[] lines = line.trim().split("(?:\\r?\\n)"); + if (lines.length < 2) { + throw new IllegalArgumentException("Incorrect config response:" + + line); + } + String configversion = lines[0]; + String nodeListStr = lines[1]; + if (!ByteUtils.isNumber(configversion)) { + throw new IllegalArgumentException("Invalid configversion: " + + configversion + ", it should be a number."); + } + String[] nodeStrs = nodeListStr.split("(?:\\s)+"); + int version = Integer.parseInt(configversion); + List nodeList = new ArrayList(nodeStrs.length); + for (String nodeStr : nodeStrs) { + if (nodeStr.equals("")) { + continue; + } + + int firstDelimiter = nodeStr.indexOf(DELIMITER); + int secondDelimiter = nodeStr.lastIndexOf(DELIMITER); + if (firstDelimiter < 1 || firstDelimiter == secondDelimiter) { + throw new IllegalArgumentException("Invalid server ''" + + nodeStr + "'' in response: " + line); + } + String hostName = nodeStr.substring(0, firstDelimiter).trim(); + String ipAddress = nodeStr.substring(firstDelimiter + 1, + secondDelimiter).trim(); + String portNum = nodeStr.substring(secondDelimiter + 1).trim(); + int port = Integer.parseInt(portNum); + nodeList.add(new CacheNode(hostName, ipAddress, port)); + } + + return new ClusterConfigration(version, nodeList); + + } + +} diff --git a/src/main/java/net/rubyeye/xmemcached/aws/CacheNode.java b/src/main/java/net/rubyeye/xmemcached/aws/CacheNode.java new file mode 100644 index 000000000..4688edcc8 --- /dev/null +++ b/src/main/java/net/rubyeye/xmemcached/aws/CacheNode.java @@ -0,0 +1,97 @@ +package net.rubyeye.xmemcached.aws; + +import java.io.Serializable; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; + +import net.rubyeye.xmemcached.utils.ByteUtils; + +/** + * AWS ElasticCache Node information. + * + * @author dennis + * + */ +public class CacheNode implements Serializable { + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((hostName == null) ? 0 : hostName.hashCode()); + result = prime * result + port; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + CacheNode other = (CacheNode) obj; + if (hostName == null) { + if (other.hostName != null) + return false; + } else if (!hostName.equals(other.hostName)) + return false; + if (port != other.port) + return false; + return true; + } + + private static final long serialVersionUID = -2999058612548153786L; + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public InetSocketAddress getInetSocketAddress() { + return new InetSocketAddress(hostName, port); + } + + public String getIpAddress() { + return ipAddress; + } + + public void setIpAddress(String ipAddress) { + this.ipAddress = ipAddress; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String toString() { + return "[" + this.hostName + "|" + this.ipAddress + "|" + this.port + + "]"; + } + + public String getCacheKey() { + return this.hostName + ":" + this.port; + } + + private String hostName; + private String ipAddress; + private int port; + + public CacheNode(String hostName, String ipAddress, int port) { + super(); + this.hostName = hostName; + this.ipAddress = ipAddress; + this.port = port; + } + +} diff --git a/src/main/java/net/rubyeye/xmemcached/aws/ClusterConfigration.java b/src/main/java/net/rubyeye/xmemcached/aws/ClusterConfigration.java new file mode 100644 index 000000000..27f09eae1 --- /dev/null +++ b/src/main/java/net/rubyeye/xmemcached/aws/ClusterConfigration.java @@ -0,0 +1,53 @@ +package net.rubyeye.xmemcached.aws; + +import java.io.Serializable; +import java.util.List; + +/** + * Cluster configuration retrieved from ElasticCache. + * + * @author dennis + * + */ +public class ClusterConfigration implements Serializable { + + private static final long serialVersionUID = 6809891639636689050L; + + public int getVersion() { + return version; + } + + public void setVersion(int version) { + this.version = version; + } + + public List getNodeList() { + return nodeList; + } + + public void setNodeList(List nodeList) { + this.nodeList = nodeList; + } + + private int version; + private List nodeList; + + public ClusterConfigration(int version, List nodeList) { + super(); + this.version = version; + this.nodeList = nodeList; + } + + public ClusterConfigration() { + super(); + } + + public String toString() { + StringBuilder nodeList = new StringBuilder("{ Version: " + version + + ", CacheNode List: "); + nodeList.append(this.nodeList); + nodeList.append("}"); + + return nodeList.toString(); + } +} diff --git a/src/main/java/net/rubyeye/xmemcached/aws/ConfigUpdateListener.java b/src/main/java/net/rubyeye/xmemcached/aws/ConfigUpdateListener.java new file mode 100644 index 000000000..9fbeeb07e --- /dev/null +++ b/src/main/java/net/rubyeye/xmemcached/aws/ConfigUpdateListener.java @@ -0,0 +1,18 @@ +package net.rubyeye.xmemcached.aws; + +/** + * AWS ElasticCache config update event listener. + * + * @author dennis + * + */ +public interface ConfigUpdateListener { + + /** + * Called when config is changed. + * + * @param config + * the new config + */ + public void onUpdate(ClusterConfigration config); +} diff --git a/src/main/java/net/rubyeye/xmemcached/aws/ConfigurationPoller.java b/src/main/java/net/rubyeye/xmemcached/aws/ConfigurationPoller.java new file mode 100644 index 000000000..31538b552 --- /dev/null +++ b/src/main/java/net/rubyeye/xmemcached/aws/ConfigurationPoller.java @@ -0,0 +1,117 @@ +package net.rubyeye.xmemcached.aws; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * AWS ElastiCache configuration poller + * + * @author dennis + * + */ +public class ConfigurationPoller implements Runnable { + + /** + * Return current ClusterConfigration. + * + * @return + */ + public ClusterConfigration getClusterConfiguration() { + return clusterConfigration; + } + + private final AtomicInteger serverOrderCounter = new AtomicInteger(0); + + private Map ordersMap = new HashMap(); + + public synchronized int getCacheNodeOrder(CacheNode node) { + Integer order = this.ordersMap.get(node.getCacheKey()); + if (order != null) { + return order; + } + order = this.serverOrderCounter.incrementAndGet(); + this.ordersMap.put(node.getCacheKey(), order); + return order; + } + + public synchronized void removeCacheNodeOrder(CacheNode node) { + this.ordersMap.remove(node.getCacheKey()); + } + + private static final Logger log = LoggerFactory + .getLogger(ConfigurationPoller.class); + + private final AWSElasticCacheClient client; + + private final long pollIntervalMills; + + private ScheduledExecutorService scheduledExecutorService; + + private volatile ClusterConfigration clusterConfigration = null; + + public ConfigurationPoller(AWSElasticCacheClient client, + long pollIntervalMills) { + super(); + this.client = client; + this.pollIntervalMills = pollIntervalMills; + this.scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactory() { + + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "AWSElasticCacheConfigPoller"); + t.setDaemon(true); + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + }); + } + + public void start() { + this.scheduledExecutorService.scheduleWithFixedDelay(this, + this.pollIntervalMills, this.pollIntervalMills, + TimeUnit.MILLISECONDS); + } + + public void stop() { + this.scheduledExecutorService.shutdown(); + } + + public void run() { + try { + ClusterConfigration newConfig = this.client.getConfig(); + if (newConfig != null) { + ClusterConfigration currentConfig = this.clusterConfigration; + if (currentConfig == null) { + this.clusterConfigration = newConfig; + } else { + if (newConfig.getVersion() < currentConfig.getVersion()) { + log.warn("Ignored new config from ElasticCache node, it's too old, current version is: " + + currentConfig.getVersion() + + ", but the new version is: " + + newConfig.getVersion()); + return; + } else { + this.clusterConfigration = newConfig; + } + } + log.info("Retrieved new config from ElasticCache node: " + + this.clusterConfigration); + this.client.onUpdate(newConfig); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.error("Poll config from ElasticCache node failed", e); + } + } +} diff --git a/src/main/java/net/rubyeye/xmemcached/command/BinaryCommandFactory.java b/src/main/java/net/rubyeye/xmemcached/command/BinaryCommandFactory.java index fdc39c8a1..7046c2fed 100644 --- a/src/main/java/net/rubyeye/xmemcached/command/BinaryCommandFactory.java +++ b/src/main/java/net/rubyeye/xmemcached/command/BinaryCommandFactory.java @@ -10,6 +10,7 @@ import net.rubyeye.xmemcached.CommandFactory; import net.rubyeye.xmemcached.buffer.BufferAllocator; import net.rubyeye.xmemcached.buffer.SimpleBufferAllocator; +import net.rubyeye.xmemcached.command.binary.BinaryAWSElasticCacheConfigCommand; import net.rubyeye.xmemcached.command.binary.BinaryAppendPrependCommand; import net.rubyeye.xmemcached.command.binary.BinaryAuthListMechanismsCommand; import net.rubyeye.xmemcached.command.binary.BinaryAuthStartCommand; @@ -43,6 +44,12 @@ @SuppressWarnings("unchecked") public class BinaryCommandFactory implements CommandFactory { + public Command createAWSElasticCacheConfigCommand(String subCommand, + String key) { + return new BinaryAWSElasticCacheConfigCommand(new CountDownLatch(1), + subCommand, key); + } + private BufferAllocator bufferAllocator = new SimpleBufferAllocator(); public void setBufferAllocator(BufferAllocator bufferAllocator) { @@ -51,8 +58,8 @@ public void setBufferAllocator(BufferAllocator bufferAllocator) { public Command createAddCommand(String key, byte[] keyBytes, int exp, Object value, boolean noreply, Transcoder transcoder) { - return this.createStoreCommand(key, keyBytes, exp, value, CommandType.ADD, - noreply, transcoder); + return this.createStoreCommand(key, keyBytes, exp, value, + CommandType.ADD, noreply, transcoder); } public Command createAppendCommand(String key, byte[] keyBytes, @@ -69,8 +76,7 @@ public Command createCASCommand(String key, byte[] keyBytes, int exp, } public Command createDeleteCommand(String key, byte[] keyBytes, int time, - long cas, - boolean noreply) { + long cas, boolean noreply) { return new BinaryDeleteCommand(key, keyBytes, cas, CommandType.DELETE, new CountDownLatch(1), noreply); } @@ -96,9 +102,9 @@ public Command createGetMultiCommand(Collection keys, key = it.next(); if (it.hasNext()) { // first n-1 send getq command - Command command = new BinaryGetCommand(key, ByteUtils - .getBytes(key), cmdType, null, OpCode.GET_KEY_QUIETLY, - true); + Command command = new BinaryGetCommand(key, + ByteUtils.getBytes(key), cmdType, null, + OpCode.GET_KEY_QUIETLY, true); command.encode(); totalLength += command.getIoBuffer().remaining(); bufferList.add(command.getIoBuffer()); @@ -151,8 +157,8 @@ final Command createStoreCommand(String key, byte[] keyBytes, int exp, public Command createSetCommand(String key, byte[] keyBytes, int exp, Object value, boolean noreply, Transcoder transcoder) { - return this.createStoreCommand(key, keyBytes, exp, value, CommandType.SET, - noreply, transcoder); + return this.createStoreCommand(key, keyBytes, exp, value, + CommandType.SET, noreply, transcoder); } public Command createStatsCommand(InetSocketAddress server, @@ -176,21 +182,21 @@ public Command createAuthListMechanismsCommand(CountDownLatch latch) { public Command createAuthStartCommand(String mechanism, CountDownLatch latch, byte[] authData) { - return new BinaryAuthStartCommand(mechanism, ByteUtils - .getBytes(mechanism), latch, authData); + return new BinaryAuthStartCommand(mechanism, + ByteUtils.getBytes(mechanism), latch, authData); } public Command createAuthStepCommand(String mechanism, CountDownLatch latch, byte[] authData) { - return new BinaryAuthStepCommand(mechanism, ByteUtils - .getBytes(mechanism), latch, authData); + return new BinaryAuthStepCommand(mechanism, + ByteUtils.getBytes(mechanism), latch, authData); } public Command createGetAndTouchCommand(String key, byte[] keyBytes, CountDownLatch latch, int exp, boolean noreply) { return new BinaryGetAndTouchCommand(key, keyBytes, noreply ? CommandType.GATQ : CommandType.GAT, latch, exp, - noreply); + noreply); } public Command createTouchCommand(String key, byte[] keyBytes, diff --git a/src/main/java/net/rubyeye/xmemcached/command/CommandType.java b/src/main/java/net/rubyeye/xmemcached/command/CommandType.java index 73237280e..632728bf0 100644 --- a/src/main/java/net/rubyeye/xmemcached/command/CommandType.java +++ b/src/main/java/net/rubyeye/xmemcached/command/CommandType.java @@ -8,6 +8,8 @@ */ public enum CommandType { - NOOP, STATS, FLUSH_ALL, GET_ONE, GET_MANY, SET, REPLACE, ADD, EXCEPTION, DELETE, VERSION, QUIT, INCR, DECR, GETS_ONE, GETS_MANY, CAS, APPEND, PREPEND, GET_HIT, GET_MISS, VERBOSITY, AUTH_LIST, AUTH_START, AUTH_STEP, TOUCH, GAT, GATQ, SET_MANY; - + NOOP, STATS, FLUSH_ALL, GET_ONE, GET_MANY, SET, REPLACE, ADD, EXCEPTION, // + DELETE, VERSION, QUIT, INCR, DECR, GETS_ONE, GETS_MANY, CAS, APPEND, PREPEND, // + GET_HIT, GET_MISS, VERBOSITY, AUTH_LIST, AUTH_START, AUTH_STEP, TOUCH, GAT, GATQ, SET_MANY, // + AWS_CONFIG; } \ No newline at end of file diff --git a/src/main/java/net/rubyeye/xmemcached/command/KestrelCommandFactory.java b/src/main/java/net/rubyeye/xmemcached/command/KestrelCommandFactory.java index 3ca6b86f6..20a44c153 100644 --- a/src/main/java/net/rubyeye/xmemcached/command/KestrelCommandFactory.java +++ b/src/main/java/net/rubyeye/xmemcached/command/KestrelCommandFactory.java @@ -28,6 +28,12 @@ @SuppressWarnings("unchecked") public class KestrelCommandFactory implements CommandFactory { + public Command createAWSElasticCacheConfigCommand(String subCommand, + String key) { + throw new UnsupportedOperationException( + "Kestrel doesn't support this operation"); + } + public Command createAddCommand(String key, byte[] keyBytes, int exp, Object value, boolean noreply, Transcoder transcoder) { throw new UnsupportedOperationException( @@ -47,8 +53,7 @@ public Command createCASCommand(String key, byte[] keyBytes, int exp, } public Command createDeleteCommand(String key, byte[] keyBytes, int time, - long cas, - boolean noreply) { + long cas, boolean noreply) { return new KestrelDeleteCommand(key, keyBytes, -1, new CountDownLatch(1), noreply); } @@ -144,8 +149,8 @@ public Command createGetAndTouchCommand(String key, byte[] keyBytes, "GAT is only supported by binary protocol"); } - public Command createTouchCommand(String key, byte[] keyBytes, CountDownLatch latch, - int exp, boolean noreply) { + public Command createTouchCommand(String key, byte[] keyBytes, + CountDownLatch latch, int exp, boolean noreply) { throw new UnsupportedOperationException( "Touch is only supported by binary protocol"); } diff --git a/src/main/java/net/rubyeye/xmemcached/command/TextCommandFactory.java b/src/main/java/net/rubyeye/xmemcached/command/TextCommandFactory.java index 216ba4303..2f4c74030 100644 --- a/src/main/java/net/rubyeye/xmemcached/command/TextCommandFactory.java +++ b/src/main/java/net/rubyeye/xmemcached/command/TextCommandFactory.java @@ -6,6 +6,7 @@ import net.rubyeye.xmemcached.CommandFactory; import net.rubyeye.xmemcached.buffer.BufferAllocator; +import net.rubyeye.xmemcached.command.text.TextAWSElasticCacheConfigCommand; import net.rubyeye.xmemcached.command.text.TextCASCommand; import net.rubyeye.xmemcached.command.text.TextDeleteCommand; import net.rubyeye.xmemcached.command.text.TextFlushAllCommand; @@ -30,6 +31,12 @@ */ public class TextCommandFactory implements CommandFactory { + public Command createAWSElasticCacheConfigCommand(String subCommand, + String key) { + return new TextAWSElasticCacheConfigCommand(new CountDownLatch(1), + subCommand, key); + } + public void setBufferAllocator(BufferAllocator bufferAllocator) { } @@ -114,16 +121,16 @@ public final Command createCASCommand(final String key, public final Command createSetCommand(final String key, final byte[] keyBytes, final int exp, final Object value, boolean noreply, Transcoder transcoder) { - return this.createStoreCommand(key, keyBytes, exp, value, CommandType.SET, - noreply, transcoder); + return this.createStoreCommand(key, keyBytes, exp, value, + CommandType.SET, noreply, transcoder); } @SuppressWarnings("unchecked") public final Command createAddCommand(final String key, final byte[] keyBytes, final int exp, final Object value, boolean noreply, Transcoder transcoder) { - return this.createStoreCommand(key, keyBytes, exp, value, CommandType.ADD, - noreply, transcoder); + return this.createStoreCommand(key, keyBytes, exp, value, + CommandType.ADD, noreply, transcoder); } @SuppressWarnings("unchecked") @@ -138,16 +145,16 @@ public final Command createReplaceCommand(final String key, public final Command createAppendCommand(final String key, final byte[] keyBytes, final Object value, boolean noreply, Transcoder transcoder) { - return this.createStoreCommand(key, keyBytes, 0, value, CommandType.APPEND, - noreply, transcoder); + return this.createStoreCommand(key, keyBytes, 0, value, + CommandType.APPEND, noreply, transcoder); } @SuppressWarnings("unchecked") public final Command createPrependCommand(final String key, final byte[] keyBytes, final Object value, boolean noreply, Transcoder transcoder) { - return this.createStoreCommand(key, keyBytes, 0, value, CommandType.PREPEND, - noreply, transcoder); + return this.createStoreCommand(key, keyBytes, 0, value, + CommandType.PREPEND, noreply, transcoder); } @SuppressWarnings("unchecked") diff --git a/src/main/java/net/rubyeye/xmemcached/command/binary/BinaryAWSElasticCacheConfigCommand.java b/src/main/java/net/rubyeye/xmemcached/command/binary/BinaryAWSElasticCacheConfigCommand.java new file mode 100644 index 000000000..7b3e03369 --- /dev/null +++ b/src/main/java/net/rubyeye/xmemcached/command/binary/BinaryAWSElasticCacheConfigCommand.java @@ -0,0 +1,88 @@ +/** + *Copyright [2009-2010] [dennis zhuang(killme2008@gmail.com)] + *Licensed under the Apache License, Version 2.0 (the "License"); + *you may not use this file except in compliance with the License. + *You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an "AS IS" BASIS, + *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + *either express or implied. See the License for the specific language governing permissions and limitations under the License + */ +/** + *Copyright [2009-2010] [dennis zhuang(killme2008@gmail.com)] + *Licensed under the Apache License, Version 2.0 (the "License"); + *you may not use this file except in compliance with the License. + *You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an "AS IS" BASIS, + *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + *either express or implied. See the License for the specific language governing permissions and limitations under the License + */ +package net.rubyeye.xmemcached.command.binary; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; + +import net.rubyeye.xmemcached.command.CommandType; +import net.rubyeye.xmemcached.transcoders.CachedData; +import net.rubyeye.xmemcached.utils.ByteUtils; + +/** + * AWS ElasticCache config command + * + * @author dennis + * + */ +public class BinaryAWSElasticCacheConfigCommand extends BaseBinaryCommand { + + public BinaryAWSElasticCacheConfigCommand(final CountDownLatch latch, + String subCommand, String key) { + super(key, ByteUtils.getBytes(key), CommandType.AWS_CONFIG, latch, 0, + 0, latch, false, null); + this.commandType = CommandType.AWS_CONFIG; + if (subCommand.equals("get")) { + this.opCode = OpCode.CONFIG_GET; + } else if (subCommand.equals("set")) { + this.opCode = OpCode.CONFIG_SET; + } else if (subCommand.equals("delete")) { + this.opCode = OpCode.CONFIG_DEL; + } + } + + @Override + protected boolean readValue(ByteBuffer buffer, int bodyLength, + int keyLength, int extrasLength) { + int valueLength = bodyLength - keyLength - extrasLength; + if (buffer.remaining() < valueLength) { + return false; + } + byte[] bytes = new byte[valueLength]; + buffer.get(bytes); + setResult(new String(bytes)); + countDownLatch(); + return true; + } + + @Override + protected void fillExtras(CachedData data) { + // must not have extras + } + + @Override + protected void fillValue(CachedData data) { + // must not have value + } + + @Override + protected byte getExtrasLength() { + return 0; + } + + @Override + protected int getValueLength(CachedData data) { + return 0; + } + +} diff --git a/src/main/java/net/rubyeye/xmemcached/command/binary/OpCode.java b/src/main/java/net/rubyeye/xmemcached/command/binary/OpCode.java index f57f14674..9aa05e461 100644 --- a/src/main/java/net/rubyeye/xmemcached/command/binary/OpCode.java +++ b/src/main/java/net/rubyeye/xmemcached/command/binary/OpCode.java @@ -260,6 +260,29 @@ public byte fieldValue() { public byte fieldValue() { return 0x1e; + } + }, + // AWS ElasticCache config commands + // https://github.com/awslabs/aws-elasticache-cluster-client-memcached-for-java/commit/70bf7643963500db20749d97c071b64b954eabb3 + CONFIG_GET { + @Override + public byte fieldValue() { + return 0x60; + + } + }, + CONFIG_SET { + @Override + public byte fieldValue() { + return 0x64; + + } + }, + CONFIG_DEL { + @Override + public byte fieldValue() { + return 0x66; + } }; diff --git a/src/main/java/net/rubyeye/xmemcached/command/text/TextAWSElasticCacheConfigCommand.java b/src/main/java/net/rubyeye/xmemcached/command/text/TextAWSElasticCacheConfigCommand.java new file mode 100644 index 000000000..4bac2df4f --- /dev/null +++ b/src/main/java/net/rubyeye/xmemcached/command/text/TextAWSElasticCacheConfigCommand.java @@ -0,0 +1,69 @@ +package net.rubyeye.xmemcached.command.text; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; + +import com.google.code.yanf4j.buffer.IoBuffer; + +import net.rubyeye.xmemcached.command.Command; +import net.rubyeye.xmemcached.command.CommandType; +import net.rubyeye.xmemcached.impl.MemcachedTCPSession; +import net.rubyeye.xmemcached.networking.MemcachedSession; +import net.rubyeye.xmemcached.utils.ByteUtils; + +/** + * AWS ElasticCache config command, see Adding Auto Discovery To Your Client Library. Only supports Cache Engine + * version 1.4.14 or higher. + * + * @author dennis + * + */ +public class TextAWSElasticCacheConfigCommand extends Command { + + private String key; + + private String subCommand; + + public TextAWSElasticCacheConfigCommand(final CountDownLatch latch, + String subCommand, String key) { + super(subCommand + key, CommandType.AWS_CONFIG, latch); + this.key = key; + this.subCommand = subCommand; + this.result = new StringBuilder(); + } + + @Override + public boolean decode(MemcachedTCPSession session, ByteBuffer buffer) { + String line = null; + while ((line = ByteUtils.nextLine(buffer)) != null) { + if (line.equals("END")) { // at the end + return done(session); + } else if (line.startsWith("CONFIG")) { + // ignore + } else { + ((StringBuilder) this.getResult()).append(line); + } + } + return false; + } + + private final boolean done(MemcachedSession session) { + setResult(this.getResult().toString()); + countDownLatch(); + return true; + } + + @Override + public void encode() { + // config [sub-command] [key] + final byte[] subCmdBytes = ByteUtils.getBytes(this.subCommand); + final byte[] keyBytes = ByteUtils.getBytes(this.key); + this.ioBuffer = IoBuffer.allocate(6 + 1 + subCmdBytes.length + 1 + + keyBytes.length + 2); + ByteUtils.setArguments(this.ioBuffer, "config", subCmdBytes, keyBytes); + this.ioBuffer.flip(); + } + +} diff --git a/src/main/java/net/rubyeye/xmemcached/impl/MemcachedConnector.java b/src/main/java/net/rubyeye/xmemcached/impl/MemcachedConnector.java index c1dffec06..33b85ab02 100644 --- a/src/main/java/net/rubyeye/xmemcached/impl/MemcachedConnector.java +++ b/src/main/java/net/rubyeye/xmemcached/impl/MemcachedConnector.java @@ -65,7 +65,7 @@ * @author dennis */ public class MemcachedConnector extends SocketChannelController implements -Connector { + Connector { private final DelayQueue waitingQueue = new DelayQueue(); private BufferAllocator bufferAllocator; @@ -80,12 +80,18 @@ public class MemcachedConnector extends SocketChannelController implements private boolean failureMode; private final ConcurrentHashMap/* - * standby - * sessions - */> standbySessionMap = new ConcurrentHashMap>(); + * standby + * sessions + */> standbySessionMap = new ConcurrentHashMap>(); private final FlowControl flowControl; + private volatile boolean shuttingDown = false; + + public void shuttingDown() { + this.shuttingDown = true; + } + public void setSessionLocator(MemcachedSessionLocator sessionLocator) { this.sessionLocator = sessionLocator; } @@ -103,7 +109,8 @@ public SessionMonitor() { @Override public void run() { - while (MemcachedConnector.this.isStarted() && MemcachedConnector.this.enableHealSession) { + while (MemcachedConnector.this.isStarted() + && MemcachedConnector.this.enableHealSession) { ReconnectRequest request = null; try { request = MemcachedConnector.this.waitingQueue.take(); @@ -171,7 +178,7 @@ private void rescheduleConnectRequest(ReconnectRequest request) { public void setEnableHealSession(boolean enableHealSession) { this.enableHealSession = enableHealSession; - //wake up session monitor thread. + // wake up session monitor thread. if (this.sessionMonitor != null && this.sessionMonitor.isAlive()) { this.sessionMonitor.interrupt(); } @@ -205,7 +212,7 @@ public void setOptimizeGet(boolean optimiezeGet) { public void setOptimizeMergeBuffer(boolean optimizeMergeBuffer) { ((OptimizerMBean) this.optimiezer) - .setOptimizeMergeBuffer(optimizeMergeBuffer); + .setOptimizeMergeBuffer(optimizeMergeBuffer); } public Protocol getProtocol() { @@ -214,7 +221,6 @@ public Protocol getProtocol() { protected MemcachedSessionLocator sessionLocator; - protected final ConcurrentHashMap> sessionMap = new ConcurrentHashMap>(); public synchronized void addSession(Session session) { @@ -318,7 +324,7 @@ public void removeReconnectRequest(InetSocketAddress inetSocketAddress) { it.remove(); log.warn("Remove invalid reconnect task for " + request.getInetSocketAddressWrapper() - .getInetSocketAddress()); + .getInetSocketAddress()); } } } @@ -352,7 +358,8 @@ private void removeMainSession(Session session) { InetSocketAddress remoteSocketAddress = session .getRemoteSocketAddress(); // If it was in failure mode,we don't remove closed session from list. - if (this.failureMode) { + if (this.failureMode && ((MemcachedSession) session).isAllowReconnect() + && !this.shuttingDown && this.isStarted()) { log.warn("Client in failure mode,we don't remove session " + SystemUtils.getRawAddress(remoteSocketAddress) + ":" + remoteSocketAddress.getPort()); @@ -403,8 +410,8 @@ public void onConnect(SelectionKey key) throws IOException { + SystemUtils.getRawAddress(future .getInetSocketAddressWrapper() .getInetSocketAddress()) - + ":" - + future.getInetSocketAddressWrapper() + + ":" + + future.getInetSocketAddressWrapper() .getInetSocketAddress().getPort() + " fail")); } else { key.attach(null); @@ -420,10 +427,10 @@ public void onConnect(SelectionKey key) throws IOException { + SystemUtils.getRawAddress(future .getInetSocketAddressWrapper() .getInetSocketAddress()) - + ":" - + future.getInetSocketAddressWrapper() + + ":" + + future.getInetSocketAddressWrapper() .getInetSocketAddress().getPort() + " fail," - + e.getMessage()); + + e.getMessage()); } } @@ -502,8 +509,8 @@ public Session send(final Command msg) throws MemcachedException { throw new MemcachedException("Session(" + SystemUtils.getRawAddress(session .getRemoteSocketAddress()) + ":" - + session.getRemoteSocketAddress().getPort() - + ") has been closed"); + + session.getRemoteSocketAddress().getPort() + + ") has been closed"); } if (session.isAuthFailed()) { throw new MemcachedException("Auth failed to connection " @@ -538,6 +545,7 @@ public List getStandbySessionListByMainNodeAddr( } private final SessionMonitor sessionMonitor = new SessionMonitor(); + /** * Inner state listenner,manage session monitor. * diff --git a/src/main/java/net/rubyeye/xmemcached/utils/ByteUtils.java b/src/main/java/net/rubyeye/xmemcached/utils/ByteUtils.java index 56f935fab..f5b8454ef 100644 --- a/src/main/java/net/rubyeye/xmemcached/utils/ByteUtils.java +++ b/src/main/java/net/rubyeye/xmemcached/utils/ByteUtils.java @@ -48,6 +48,10 @@ public final class ByteUtils { private ByteUtils() { } + public static boolean isValidString(String s) { + return s != null && s.trim().length() > 0; + } + public static boolean isNumber(String string) { if (string == null || string.isEmpty()) { return false; diff --git a/src/main/java/net/rubyeye/xmemcached/utils/InetSocketAddressWrapper.java b/src/main/java/net/rubyeye/xmemcached/utils/InetSocketAddressWrapper.java index 853fce18e..f2c331710 100644 --- a/src/main/java/net/rubyeye/xmemcached/utils/InetSocketAddressWrapper.java +++ b/src/main/java/net/rubyeye/xmemcached/utils/InetSocketAddressWrapper.java @@ -39,7 +39,7 @@ public void setRemoteAddressStr(String remoteAddressStr) { } public final InetSocketAddress getInetSocketAddress() { - if (isValidHostName(this.hostName)) { + if (ByteUtils.isValidString(this.hostName)) { // If it has a hostName, we try to resolve it again. return new InetSocketAddress(this.hostName, this.inetSocketAddress.getPort()); @@ -48,10 +48,6 @@ public final InetSocketAddress getInetSocketAddress() { } } - private boolean isValidHostName(String h) { - return h != null && h.trim().length() > 0; - } - public final void setInetSocketAddress(InetSocketAddress inetSocketAddress) { this.inetSocketAddress = inetSocketAddress; if (inetSocketAddress != null) { @@ -72,7 +68,7 @@ public void setWeight(int weight) { } public InetSocketAddress getMainNodeAddress() { - if (this.isValidHostName(this.mainNodeHostName)) { + if (ByteUtils.isValidString(this.mainNodeHostName)) { return new InetSocketAddress(this.mainNodeHostName, this.mainNodeAddress.getPort()); } else { diff --git a/src/test/java/net/rubyeye/xmemcached/test/unittest/AWSElasticCacheClientIT.java b/src/test/java/net/rubyeye/xmemcached/test/unittest/AWSElasticCacheClientIT.java new file mode 100644 index 000000000..33d42c0b7 --- /dev/null +++ b/src/test/java/net/rubyeye/xmemcached/test/unittest/AWSElasticCacheClientIT.java @@ -0,0 +1,151 @@ +package net.rubyeye.xmemcached.test.unittest; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Properties; + +import net.rubyeye.xmemcached.aws.AWSElasticCacheClient; +import net.rubyeye.xmemcached.aws.ClusterConfigration; +import net.rubyeye.xmemcached.utils.AddrUtil; + +import org.junit.Test; + +import com.google.code.yanf4j.core.Session; +import com.google.code.yanf4j.core.impl.HandlerAdapter; +import com.google.code.yanf4j.core.impl.TextLineCodecFactory; +import com.google.code.yanf4j.nio.TCPController; +import com.google.code.yanf4j.util.ResourcesUtils; + +import junit.framework.TestCase; + +public class AWSElasticCacheClientIT extends TestCase { + + private String serverList; + private List addresses; + + /** + * elasticcache config node mock handler + * + * @author dennis + * + */ + private static final class MockHandler extends HandlerAdapter { + private final String response; + private int version; + + public MockHandler(int version, String response) { + super(); + this.response = response; + this.version = version; + } + + @Override + public void onMessageReceived(Session session, Object message) { + if (message.equals("quit")) { + session.close(); + return; + } + session.write("CONFIG cluster 0 " + this.response.length()); + session.write(String.valueOf(version) + "\n" + this.response); + session.write("END"); + this.version++; + } + + } + + @Override + public void setUp() throws Exception { + Properties properties = ResourcesUtils + .getResourceAsProperties("test.properties"); + List addresses = AddrUtil.getAddresses(properties + .getProperty("test.memcached.servers")); + StringBuffer sb = new StringBuffer(); + boolean wasFirst = true; + for (InetSocketAddress addr : addresses) { + if (wasFirst) { + wasFirst = false; + } else { + sb.append(" "); + } + sb.append(addr.getHostName() + "|" + addr.getHostName() + "|" + + addr.getPort()); + + } + + this.addresses = addresses; + serverList = sb.toString(); + } + + @Test + public void testInvalidConfig() throws Exception { + TCPController configServer = new TCPController(); + int version = 10; + configServer.setHandler(new MockHandler(version, "invalid")); + configServer.setCodecFactory(new TextLineCodecFactory()); + configServer.bind(new InetSocketAddress(2271)); + + try { + AWSElasticCacheClient client = new AWSElasticCacheClient( + new InetSocketAddress(2271)); + fail(); + } catch (IllegalStateException e) { + assert (e.getMessage().contains("Invalid server")); + } finally { + configServer.stop(); + } + } + + @Test + public void testPollConfigAndUsage() throws Exception { + TCPController configServer = new TCPController(); + int version = 10; + configServer.setHandler(new MockHandler(version, serverList)); + configServer.setCodecFactory(new TextLineCodecFactory()); + configServer.bind(new InetSocketAddress(2271)); + + try { + AWSElasticCacheClient client = new AWSElasticCacheClient( + new InetSocketAddress(2271)); + ClusterConfigration config = client.getCurrentConfig(); + assertEquals(config.getVersion(), version); + assertEquals(addresses.size(), config.getNodeList().size()); + + client.set("aws-cache", 0, "foobar"); + assertEquals("foobar", client.get("aws-cache")); + } finally { + configServer.stop(); + } + } + + @Test + public void testPollConfigInterval() throws Exception { + TCPController cs1 = new TCPController(); + int version = 10; + cs1.setHandler(new MockHandler(version, "localhost|localhost|2272")); + cs1.setCodecFactory(new TextLineCodecFactory()); + cs1.bind(new InetSocketAddress(2271)); + TCPController cs2 = new TCPController(); + cs2.setHandler(new MockHandler(version + 1, + "localhost|localhost|2271 localhost|localhost|2272")); + cs2.setCodecFactory(new TextLineCodecFactory()); + cs2.bind(new InetSocketAddress(2272)); + + try { + AWSElasticCacheClient client = new AWSElasticCacheClient( + new InetSocketAddress(2271), 3000); + ClusterConfigration config = client.getCurrentConfig(); + assertEquals(config.getVersion(), version); + assertEquals(1, config.getNodeList().size()); + assertEquals(2272, config.getNodeList().get(0).getPort()); + Thread.sleep(3500); + config = client.getCurrentConfig(); + assertEquals(config.getVersion(), version + 1); + assertEquals(2, config.getNodeList().size()); + assertEquals(2271, config.getNodeList().get(0).getPort()); + assertEquals(2272, config.getNodeList().get(1).getPort()); + } finally { + cs1.stop(); + cs2.stop(); + } + } +}