Skip to content

Commit

Permalink
Merge pull request #99 from ssup2/master
Browse files Browse the repository at this point in the history
Add AddressMemcachedSessionComparator
  • Loading branch information
killme2008 authored Mar 25, 2019
2 parents a78a457 + 93154f4 commit 6615091
Show file tree
Hide file tree
Showing 13 changed files with 471 additions and 51 deletions.
13 changes: 13 additions & 0 deletions src/main/java/net/rubyeye/xmemcached/MemcachedClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ public interface MemcachedClientBuilder {
*/
public void setSessionLocator(MemcachedSessionLocator sessionLocator);

/**
*
* @return net.rubyeye.xmemcached.MemcachedSessionComparator
*/
public MemcachedSessionComparator getSessionComparator();

/**
* Set the XmemcachedClient's session comparator.Use IndexMemcachedSessionComparator by default.
*
* @param sessionComparator
*/
public void setSessionComparator(MemcachedSessionComparator sessionComparator);

public BufferAllocator getBufferAllocator();

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Copyright [2009-2010] [dennis zhuang(killme2008@gmail.com)] Licensed under the Apache License,
* Version 2.0 (the "License"); you may not use this file except in compliance with the License. You
* may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
* applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License
*/
package net.rubyeye.xmemcached;

import java.util.Comparator;
import com.google.code.yanf4j.core.Session;

/**
* Session comparator.
*
* @author Jungsub Shin
*
*/
public interface MemcachedSessionComparator extends Comparator<Session> {
/**
* Returns a session by special key.
*
* @param key
* @return
*/
public int compare(Session o1, Session o2);
}
61 changes: 40 additions & 21 deletions src/main/java/net/rubyeye/xmemcached/XMemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import net.rubyeye.xmemcached.impl.MemcachedConnector;
import net.rubyeye.xmemcached.impl.MemcachedHandler;
import net.rubyeye.xmemcached.impl.MemcachedTCPSession;
import net.rubyeye.xmemcached.impl.IndexMemcachedSessionComparator;
import net.rubyeye.xmemcached.impl.ReconnectRequest;
import net.rubyeye.xmemcached.monitor.Constants;
import net.rubyeye.xmemcached.monitor.MemcachedClientNameHolder;
Expand All @@ -79,6 +80,7 @@ public class XMemcachedClient implements XMemcachedClientMBean, MemcachedClient

private static final Logger log = LoggerFactory.getLogger(XMemcachedClient.class);
protected MemcachedSessionLocator sessionLocator;
protected MemcachedSessionComparator sessionComparator;
private volatile boolean shutdown;
protected MemcachedConnector connector;
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -182,6 +184,10 @@ public final MemcachedSessionLocator getSessionLocator() {
return this.sessionLocator;
}

public final MemcachedSessionComparator getSessionComparator() {
return this.sessionComparator;
}

public final CommandFactory getCommandFactory() {
return this.commandFactory;
}
Expand Down Expand Up @@ -343,8 +349,8 @@ public XMemcachedClient(final String host, final int port, int weight) throws IO
throw new IllegalArgumentException("weight<=0");
}
this.checkServerPort(host, port);
this.buildConnector(new ArrayMemcachedSessionLocator(), new SimpleBufferAllocator(),
XMemcachedClientBuilder.getDefaultConfiguration(),
this.buildConnector(new ArrayMemcachedSessionLocator(), new IndexMemcachedSessionComparator(),
new SimpleBufferAllocator(), XMemcachedClientBuilder.getDefaultConfiguration(),
XMemcachedClientBuilder.getDefaultSocketOptions(), new TextCommandFactory(),
new SerializingTranscoder());
this.start0();
Expand Down Expand Up @@ -653,12 +659,15 @@ void setMaxQueuedNoReplyOperations(int maxQueuedNoReplyOperations) {
}

@SuppressWarnings("unchecked")
private void buildConnector(MemcachedSessionLocator locator, BufferAllocator bufferAllocator,
private void buildConnector(MemcachedSessionLocator locator,
MemcachedSessionComparator comparator, BufferAllocator bufferAllocator,
Configuration configuration, Map<SocketOption, Object> socketOptions,
CommandFactory commandFactory, Transcoder transcoder) {
if (locator == null) {
locator = new ArrayMemcachedSessionLocator();

}
if (comparator == null) {
comparator = new IndexMemcachedSessionComparator();
}
if (bufferAllocator == null) {
bufferAllocator = new SimpleBufferAllocator();
Expand All @@ -683,8 +692,10 @@ private void buildConnector(MemcachedSessionLocator locator, BufferAllocator buf
this.shutdown = true;
this.transcoder = transcoder;
this.sessionLocator = locator;
this.sessionComparator = comparator;
this.connector = this.newConnector(bufferAllocator, configuration, this.sessionLocator,
this.commandFactory, this.connectionPoolSize, this.maxQueuedNoReplyOperations);
this.sessionComparator, this.commandFactory, this.connectionPoolSize,
this.maxQueuedNoReplyOperations);
this.memcachedHandler = new MemcachedHandler(this);
this.connector.setHandler(this.memcachedHandler);
this.connector.setCodecFactory(new MemcachedCodecFactory());
Expand All @@ -699,11 +710,13 @@ private void buildConnector(MemcachedSessionLocator locator, BufferAllocator buf

protected MemcachedConnector newConnector(BufferAllocator bufferAllocator,
Configuration configuration, MemcachedSessionLocator memcachedSessionLocator,
CommandFactory commandFactory, int poolSize, int maxQueuedNoReplyOperations) {
MemcachedSessionComparator memcachedSessionComparator, CommandFactory commandFactory,
int poolSize, int maxQueuedNoReplyOperations) {
// make sure dispatch message thread count is zero
configuration.setDispatchMessageThreadCount(0);
return new MemcachedConnector(configuration, memcachedSessionLocator, bufferAllocator,
commandFactory, poolSize, maxQueuedNoReplyOperations);
return new MemcachedConnector(configuration, memcachedSessionLocator,
memcachedSessionComparator, bufferAllocator, commandFactory, poolSize,
maxQueuedNoReplyOperations);
}

private final void registerMBean() {
Expand Down Expand Up @@ -747,8 +760,8 @@ public XMemcachedClient(final InetSocketAddress inetSocketAddress, int weight,
if (weight <= 0) {
throw new IllegalArgumentException("weight<=0");
}
this.buildConnector(new ArrayMemcachedSessionLocator(), new SimpleBufferAllocator(),
XMemcachedClientBuilder.getDefaultConfiguration(),
this.buildConnector(new ArrayMemcachedSessionLocator(), new IndexMemcachedSessionComparator(),
new SimpleBufferAllocator(), XMemcachedClientBuilder.getDefaultConfiguration(),
XMemcachedClientBuilder.getDefaultSocketOptions(), cmdFactory, new SerializingTranscoder());
this.start0();
this.connect(new InetSocketAddressWrapper(inetSocketAddress,
Expand All @@ -766,8 +779,8 @@ public XMemcachedClient(final InetSocketAddress inetSocketAddress) throws IOExce

public XMemcachedClient() throws IOException {
super();
this.buildConnector(new ArrayMemcachedSessionLocator(), new SimpleBufferAllocator(),
XMemcachedClientBuilder.getDefaultConfiguration(),
this.buildConnector(new ArrayMemcachedSessionLocator(), new IndexMemcachedSessionComparator(),
new SimpleBufferAllocator(), XMemcachedClientBuilder.getDefaultConfiguration(),
XMemcachedClientBuilder.getDefaultSocketOptions(), new TextCommandFactory(),
new SerializingTranscoder());
this.start0();
Expand All @@ -778,6 +791,7 @@ public XMemcachedClient() throws IOException {
* instance by this method, use MemcachedClientBuilder instead.
*
* @param locator
* @param comparator
* @param allocator
* @param conf
* @param commandFactory
Expand All @@ -787,17 +801,19 @@ public XMemcachedClient() throws IOException {
* @throws IOException
*/
@SuppressWarnings("unchecked")
public XMemcachedClient(MemcachedSessionLocator locator, BufferAllocator allocator,
Configuration conf, Map<SocketOption, Object> socketOptions, CommandFactory commandFactory,
Transcoder transcoder, Map<InetSocketAddress, InetSocketAddress> addressMap,
public XMemcachedClient(MemcachedSessionLocator locator, MemcachedSessionComparator comparator,
BufferAllocator allocator, Configuration conf, Map<SocketOption, Object> socketOptions,
CommandFactory commandFactory, Transcoder transcoder,
Map<InetSocketAddress, InetSocketAddress> addressMap,
List<MemcachedClientStateListener> stateListeners, Map<InetSocketAddress, AuthInfo> map,
int poolSize, long connectTimeout, String name, boolean failureMode) throws IOException {
super();
this.setConnectTimeout(connectTimeout);
this.setFailureMode(failureMode);
this.setName(name);
this.optimiezeSetReadThreadCount(conf, addressMap == null ? 0 : addressMap.size());
this.buildConnector(locator, allocator, conf, socketOptions, commandFactory, transcoder);
this.buildConnector(locator, comparator, allocator, conf, socketOptions, commandFactory,
transcoder);
if (stateListeners != null) {
for (MemcachedClientStateListener stateListener : stateListeners) {
this.addStateListener(stateListener);
Expand All @@ -824,6 +840,7 @@ public XMemcachedClient(MemcachedSessionLocator locator, BufferAllocator allocat
* XMemcachedClient constructor.
*
* @param locator
* @param comparator
* @param allocator
* @param conf
* @param commandFactory
Expand All @@ -834,8 +851,9 @@ public XMemcachedClient(MemcachedSessionLocator locator, BufferAllocator allocat
* @throws IOException
*/
@SuppressWarnings("unchecked")
XMemcachedClient(MemcachedSessionLocator locator, BufferAllocator allocator, Configuration conf,
Map<SocketOption, Object> socketOptions, CommandFactory commandFactory, Transcoder transcoder,
XMemcachedClient(MemcachedSessionLocator locator, MemcachedSessionComparator comparator,
BufferAllocator allocator, Configuration conf, Map<SocketOption, Object> socketOptions,
CommandFactory commandFactory, Transcoder transcoder,
Map<InetSocketAddress, InetSocketAddress> addressMap, int[] weights,
List<MemcachedClientStateListener> stateListeners, Map<InetSocketAddress, AuthInfo> infoMap,
int poolSize, long connectTimeout, final String name, boolean failureMode)
Expand All @@ -862,7 +880,8 @@ public XMemcachedClient(MemcachedSessionLocator locator, BufferAllocator allocat
throw new IllegalArgumentException("weights.length is less than addressList.size()");
}
this.optimiezeSetReadThreadCount(conf, addressMap == null ? 0 : addressMap.size());
this.buildConnector(locator, allocator, conf, socketOptions, commandFactory, transcoder);
this.buildConnector(locator, comparator, allocator, conf, socketOptions, commandFactory,
transcoder);
if (stateListeners != null) {
for (MemcachedClientStateListener stateListener : stateListeners) {
this.addStateListener(stateListener);
Expand Down Expand Up @@ -932,8 +951,8 @@ public XMemcachedClient(List<InetSocketAddress> addressList, CommandFactory cmdF
throw new IllegalArgumentException("Empty address list");
}
BufferAllocator simpleBufferAllocator = new SimpleBufferAllocator();
this.buildConnector(new ArrayMemcachedSessionLocator(), simpleBufferAllocator,
XMemcachedClientBuilder.getDefaultConfiguration(),
this.buildConnector(new ArrayMemcachedSessionLocator(), new IndexMemcachedSessionComparator(),
simpleBufferAllocator, XMemcachedClientBuilder.getDefaultConfiguration(),
XMemcachedClientBuilder.getDefaultSocketOptions(), cmdFactory, new SerializingTranscoder());
this.start0();
for (InetSocketAddress inetSocketAddress : addressList) {
Expand Down
38 changes: 31 additions & 7 deletions src/main/java/net/rubyeye/xmemcached/XMemcachedClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import net.rubyeye.xmemcached.command.TextCommandFactory;
import net.rubyeye.xmemcached.impl.ArrayMemcachedSessionLocator;
import net.rubyeye.xmemcached.impl.DefaultKeyProvider;
import net.rubyeye.xmemcached.impl.IndexMemcachedSessionComparator;
import net.rubyeye.xmemcached.impl.RandomMemcachedSessionLocaltor;
import net.rubyeye.xmemcached.transcoders.SerializingTranscoder;
import net.rubyeye.xmemcached.transcoders.Transcoder;
Expand All @@ -35,6 +36,7 @@ public class XMemcachedClientBuilder implements MemcachedClientBuilder {
private static final Logger log = LoggerFactory.getLogger(XMemcachedClientBuilder.class);

protected MemcachedSessionLocator sessionLocator = new ArrayMemcachedSessionLocator();
protected MemcachedSessionComparator sessionComparator = new IndexMemcachedSessionComparator();
protected BufferAllocator bufferAllocator = new SimpleBufferAllocator();
protected Configuration configuration = getDefaultConfiguration();
protected Map<InetSocketAddress, InetSocketAddress> addressMap =
Expand Down Expand Up @@ -265,6 +267,28 @@ public void setSessionLocator(MemcachedSessionLocator sessionLocator) {
this.sessionLocator = sessionLocator;
}

/*
* (non-Javadoc)
*
* @see net.rubyeye.xmemcached.MemcachedClientBuilder#getSessionComparator()
*/
public MemcachedSessionComparator getSessionComparator() {
return this.sessionComparator;
}

/*
* (non-Javadoc)
*
* @see net.rubyeye.xmemcached.MemcachedClientBuilder#setSessionComparator(net. rubyeye
* .xmemcached.MemcachedSessionComparator)
*/
public void setSessionComparator(MemcachedSessionComparator sessionComparator) {
if (sessionComparator == null) {
throw new IllegalArgumentException("Null SessionComparator");
}
this.sessionComparator = sessionComparator;
}

/*
* (non-Javadoc)
*
Expand Down Expand Up @@ -321,10 +345,10 @@ public MemcachedClient build() throws IOException {
}
}
if (this.weights == null) {
memcachedClient = new XMemcachedClient(this.sessionLocator, this.bufferAllocator,
this.configuration, this.socketOptions, this.commandFactory, this.transcoder,
this.addressMap, this.stateListeners, this.authInfoMap, this.connectionPoolSize,
this.connectTimeout, this.name, this.failureMode);
memcachedClient = new XMemcachedClient(this.sessionLocator, this.sessionComparator,
this.bufferAllocator, this.configuration, this.socketOptions, this.commandFactory,
this.transcoder, this.addressMap, this.stateListeners, this.authInfoMap,
this.connectionPoolSize, this.connectTimeout, this.name, this.failureMode);

} else {
if (this.addressMap == null) {
Expand All @@ -333,9 +357,9 @@ public MemcachedClient build() throws IOException {
if (this.addressMap.size() > this.weights.length) {
throw new IllegalArgumentException("Weights Array's length is less than server's number");
}
memcachedClient = new XMemcachedClient(this.sessionLocator, this.bufferAllocator,
this.configuration, this.socketOptions, this.commandFactory, this.transcoder,
this.addressMap, this.weights, this.stateListeners, this.authInfoMap,
memcachedClient = new XMemcachedClient(this.sessionLocator, this.sessionComparator,
this.bufferAllocator, this.configuration, this.socketOptions, this.commandFactory,
this.transcoder, this.addressMap, this.weights, this.stateListeners, this.authInfoMap,
this.connectionPoolSize, this.connectTimeout, this.name, this.failureMode);
}
this.configureClient(memcachedClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.code.yanf4j.core.SocketOption;
import net.rubyeye.xmemcached.CommandFactory;
import net.rubyeye.xmemcached.MemcachedClientStateListener;
import net.rubyeye.xmemcached.MemcachedSessionComparator;
import net.rubyeye.xmemcached.MemcachedSessionLocator;
import net.rubyeye.xmemcached.XMemcachedClient;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
Expand All @@ -25,6 +26,7 @@
import net.rubyeye.xmemcached.command.TextCommandFactory;
import net.rubyeye.xmemcached.exception.MemcachedException;
import net.rubyeye.xmemcached.impl.ArrayMemcachedSessionLocator;
import net.rubyeye.xmemcached.impl.IndexMemcachedSessionComparator;
import net.rubyeye.xmemcached.transcoders.SerializingTranscoder;
import net.rubyeye.xmemcached.transcoders.Transcoder;
import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;
Expand Down Expand Up @@ -184,8 +186,8 @@ public AWSElasticCacheClient(List<InetSocketAddress> addrs, long pollConfigInter
@SuppressWarnings("unchecked")
public AWSElasticCacheClient(List<InetSocketAddress> addrs, long pollConfigIntervalMills,
CommandFactory commandFactory) throws IOException {
this(new ArrayMemcachedSessionLocator(), new SimpleBufferAllocator(),
XMemcachedClientBuilder.getDefaultConfiguration(),
this(new ArrayMemcachedSessionLocator(), new IndexMemcachedSessionComparator(),
new SimpleBufferAllocator(), XMemcachedClientBuilder.getDefaultConfiguration(),
XMemcachedClientBuilder.getDefaultSocketOptions(), new TextCommandFactory(),
new SerializingTranscoder(), (List<MemcachedClientStateListener>) Collections.EMPTY_LIST,
(Map<InetSocketAddress, AuthInfo>) Collections.EMPTY_MAP, 1,
Expand All @@ -203,13 +205,13 @@ private static Map<InetSocketAddress, InetSocketAddress> getAddressMapFromConfig
return m;
}

AWSElasticCacheClient(MemcachedSessionLocator locator, BufferAllocator allocator,
Configuration conf, Map<SocketOption, Object> socketOptions, CommandFactory commandFactory,
Transcoder transcoder, List<MemcachedClientStateListener> stateListeners,
Map<InetSocketAddress, AuthInfo> map, int poolSize, long connectTimeout, String name,
boolean failureMode, List<InetSocketAddress> configAddrs, long pollConfigIntervalMills)
throws IOException {
super(locator, allocator, conf, socketOptions, commandFactory, transcoder,
AWSElasticCacheClient(MemcachedSessionLocator locator, MemcachedSessionComparator comparator,
BufferAllocator allocator, Configuration conf, Map<SocketOption, Object> socketOptions,
CommandFactory commandFactory, Transcoder transcoder,
List<MemcachedClientStateListener> stateListeners, Map<InetSocketAddress, AuthInfo> map,
int poolSize, long connectTimeout, String name, boolean failureMode,
List<InetSocketAddress> configAddrs, long pollConfigIntervalMills) throws IOException {
super(locator, comparator, allocator, conf, socketOptions, commandFactory, transcoder,
getAddressMapFromConfigAddrs(configAddrs), stateListeners, map, poolSize, connectTimeout,
name, failureMode);
if (pollConfigIntervalMills <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ public AWSElasticCacheClientBuilder(List<InetSocketAddress> configAddrs) {
@Override
public AWSElasticCacheClient build() throws IOException {

AWSElasticCacheClient memcachedClient = new AWSElasticCacheClient(this.sessionLocator,
this.bufferAllocator, this.configuration, this.socketOptions, this.commandFactory,
this.transcoder, this.stateListeners, this.authInfoMap, this.connectionPoolSize,
this.connectTimeout, this.name, this.failureMode, configAddrs, this.pollConfigIntervalMs);
AWSElasticCacheClient memcachedClient =
new AWSElasticCacheClient(this.sessionLocator, this.sessionComparator, this.bufferAllocator,
this.configuration, this.socketOptions, this.commandFactory, this.transcoder,
this.stateListeners, this.authInfoMap, this.connectionPoolSize, this.connectTimeout,
this.name, this.failureMode, configAddrs, this.pollConfigIntervalMs);
this.configureClient(memcachedClient);

return memcachedClient;
Expand Down
Loading

0 comments on commit 6615091

Please sign in to comment.