Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AddressMemcachedSessionComparator #99

Merged
merged 1 commit into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/main/java/net/rubyeye/xmemcached/MemcachedClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ public interface MemcachedClientBuilder {
*/
public void setSessionLocator(MemcachedSessionLocator sessionLocator);

/**
*
* @return net.rubyeye.xmemcached.MemcachedSessionComparator
*/
public MemcachedSessionComparator getSessionComparator();

/**
* Set the XmemcachedClient's session comparator.Use IndexMemcachedSessionComparator by default.
*
* @param sessionComparator
*/
public void setSessionComparator(MemcachedSessionComparator sessionComparator);

public BufferAllocator getBufferAllocator();

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Copyright [2009-2010] [dennis zhuang(killme2008@gmail.com)] Licensed under the Apache License,
* Version 2.0 (the "License"); you may not use this file except in compliance with the License. You
* may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
* applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License
*/
package net.rubyeye.xmemcached;

import java.util.Comparator;
import com.google.code.yanf4j.core.Session;

/**
* Session comparator.
*
* @author Jungsub Shin
*
*/
public interface MemcachedSessionComparator extends Comparator<Session> {
/**
* Returns a session by special key.
*
* @param key
* @return
*/
public int compare(Session o1, Session o2);
}
61 changes: 40 additions & 21 deletions src/main/java/net/rubyeye/xmemcached/XMemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import net.rubyeye.xmemcached.impl.MemcachedConnector;
import net.rubyeye.xmemcached.impl.MemcachedHandler;
import net.rubyeye.xmemcached.impl.MemcachedTCPSession;
import net.rubyeye.xmemcached.impl.IndexMemcachedSessionComparator;
import net.rubyeye.xmemcached.impl.ReconnectRequest;
import net.rubyeye.xmemcached.monitor.Constants;
import net.rubyeye.xmemcached.monitor.MemcachedClientNameHolder;
Expand All @@ -79,6 +80,7 @@ public class XMemcachedClient implements XMemcachedClientMBean, MemcachedClient

private static final Logger log = LoggerFactory.getLogger(XMemcachedClient.class);
protected MemcachedSessionLocator sessionLocator;
protected MemcachedSessionComparator sessionComparator;
private volatile boolean shutdown;
protected MemcachedConnector connector;
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -182,6 +184,10 @@ public final MemcachedSessionLocator getSessionLocator() {
return this.sessionLocator;
}

public final MemcachedSessionComparator getSessionComparator() {
return this.sessionComparator;
}

public final CommandFactory getCommandFactory() {
return this.commandFactory;
}
Expand Down Expand Up @@ -343,8 +349,8 @@ public XMemcachedClient(final String host, final int port, int weight) throws IO
throw new IllegalArgumentException("weight<=0");
}
this.checkServerPort(host, port);
this.buildConnector(new ArrayMemcachedSessionLocator(), new SimpleBufferAllocator(),
XMemcachedClientBuilder.getDefaultConfiguration(),
this.buildConnector(new ArrayMemcachedSessionLocator(), new IndexMemcachedSessionComparator(),
new SimpleBufferAllocator(), XMemcachedClientBuilder.getDefaultConfiguration(),
XMemcachedClientBuilder.getDefaultSocketOptions(), new TextCommandFactory(),
new SerializingTranscoder());
this.start0();
Expand Down Expand Up @@ -653,12 +659,15 @@ void setMaxQueuedNoReplyOperations(int maxQueuedNoReplyOperations) {
}

@SuppressWarnings("unchecked")
private void buildConnector(MemcachedSessionLocator locator, BufferAllocator bufferAllocator,
private void buildConnector(MemcachedSessionLocator locator,
MemcachedSessionComparator comparator, BufferAllocator bufferAllocator,
Configuration configuration, Map<SocketOption, Object> socketOptions,
CommandFactory commandFactory, Transcoder transcoder) {
if (locator == null) {
locator = new ArrayMemcachedSessionLocator();

}
if (comparator == null) {
comparator = new IndexMemcachedSessionComparator();
}
if (bufferAllocator == null) {
bufferAllocator = new SimpleBufferAllocator();
Expand All @@ -683,8 +692,10 @@ private void buildConnector(MemcachedSessionLocator locator, BufferAllocator buf
this.shutdown = true;
this.transcoder = transcoder;
this.sessionLocator = locator;
this.sessionComparator = comparator;
this.connector = this.newConnector(bufferAllocator, configuration, this.sessionLocator,
this.commandFactory, this.connectionPoolSize, this.maxQueuedNoReplyOperations);
this.sessionComparator, this.commandFactory, this.connectionPoolSize,
this.maxQueuedNoReplyOperations);
this.memcachedHandler = new MemcachedHandler(this);
this.connector.setHandler(this.memcachedHandler);
this.connector.setCodecFactory(new MemcachedCodecFactory());
Expand All @@ -699,11 +710,13 @@ private void buildConnector(MemcachedSessionLocator locator, BufferAllocator buf

protected MemcachedConnector newConnector(BufferAllocator bufferAllocator,
Configuration configuration, MemcachedSessionLocator memcachedSessionLocator,
CommandFactory commandFactory, int poolSize, int maxQueuedNoReplyOperations) {
MemcachedSessionComparator memcachedSessionComparator, CommandFactory commandFactory,
int poolSize, int maxQueuedNoReplyOperations) {
// make sure dispatch message thread count is zero
configuration.setDispatchMessageThreadCount(0);
return new MemcachedConnector(configuration, memcachedSessionLocator, bufferAllocator,
commandFactory, poolSize, maxQueuedNoReplyOperations);
return new MemcachedConnector(configuration, memcachedSessionLocator,
memcachedSessionComparator, bufferAllocator, commandFactory, poolSize,
maxQueuedNoReplyOperations);
}

private final void registerMBean() {
Expand Down Expand Up @@ -747,8 +760,8 @@ public XMemcachedClient(final InetSocketAddress inetSocketAddress, int weight,
if (weight <= 0) {
throw new IllegalArgumentException("weight<=0");
}
this.buildConnector(new ArrayMemcachedSessionLocator(), new SimpleBufferAllocator(),
XMemcachedClientBuilder.getDefaultConfiguration(),
this.buildConnector(new ArrayMemcachedSessionLocator(), new IndexMemcachedSessionComparator(),
new SimpleBufferAllocator(), XMemcachedClientBuilder.getDefaultConfiguration(),
XMemcachedClientBuilder.getDefaultSocketOptions(), cmdFactory, new SerializingTranscoder());
this.start0();
this.connect(new InetSocketAddressWrapper(inetSocketAddress,
Expand All @@ -766,8 +779,8 @@ public XMemcachedClient(final InetSocketAddress inetSocketAddress) throws IOExce

public XMemcachedClient() throws IOException {
super();
this.buildConnector(new ArrayMemcachedSessionLocator(), new SimpleBufferAllocator(),
XMemcachedClientBuilder.getDefaultConfiguration(),
this.buildConnector(new ArrayMemcachedSessionLocator(), new IndexMemcachedSessionComparator(),
new SimpleBufferAllocator(), XMemcachedClientBuilder.getDefaultConfiguration(),
XMemcachedClientBuilder.getDefaultSocketOptions(), new TextCommandFactory(),
new SerializingTranscoder());
this.start0();
Expand All @@ -778,6 +791,7 @@ public XMemcachedClient() throws IOException {
* instance by this method, use MemcachedClientBuilder instead.
*
* @param locator
* @param comparator
* @param allocator
* @param conf
* @param commandFactory
Expand All @@ -787,17 +801,19 @@ public XMemcachedClient() throws IOException {
* @throws IOException
*/
@SuppressWarnings("unchecked")
public XMemcachedClient(MemcachedSessionLocator locator, BufferAllocator allocator,
Configuration conf, Map<SocketOption, Object> socketOptions, CommandFactory commandFactory,
Transcoder transcoder, Map<InetSocketAddress, InetSocketAddress> addressMap,
public XMemcachedClient(MemcachedSessionLocator locator, MemcachedSessionComparator comparator,
BufferAllocator allocator, Configuration conf, Map<SocketOption, Object> socketOptions,
CommandFactory commandFactory, Transcoder transcoder,
Map<InetSocketAddress, InetSocketAddress> addressMap,
List<MemcachedClientStateListener> stateListeners, Map<InetSocketAddress, AuthInfo> map,
int poolSize, long connectTimeout, String name, boolean failureMode) throws IOException {
super();
this.setConnectTimeout(connectTimeout);
this.setFailureMode(failureMode);
this.setName(name);
this.optimiezeSetReadThreadCount(conf, addressMap == null ? 0 : addressMap.size());
this.buildConnector(locator, allocator, conf, socketOptions, commandFactory, transcoder);
this.buildConnector(locator, comparator, allocator, conf, socketOptions, commandFactory,
transcoder);
if (stateListeners != null) {
for (MemcachedClientStateListener stateListener : stateListeners) {
this.addStateListener(stateListener);
Expand All @@ -824,6 +840,7 @@ public XMemcachedClient(MemcachedSessionLocator locator, BufferAllocator allocat
* XMemcachedClient constructor.
*
* @param locator
* @param comparator
* @param allocator
* @param conf
* @param commandFactory
Expand All @@ -834,8 +851,9 @@ public XMemcachedClient(MemcachedSessionLocator locator, BufferAllocator allocat
* @throws IOException
*/
@SuppressWarnings("unchecked")
XMemcachedClient(MemcachedSessionLocator locator, BufferAllocator allocator, Configuration conf,
Map<SocketOption, Object> socketOptions, CommandFactory commandFactory, Transcoder transcoder,
XMemcachedClient(MemcachedSessionLocator locator, MemcachedSessionComparator comparator,
BufferAllocator allocator, Configuration conf, Map<SocketOption, Object> socketOptions,
CommandFactory commandFactory, Transcoder transcoder,
Map<InetSocketAddress, InetSocketAddress> addressMap, int[] weights,
List<MemcachedClientStateListener> stateListeners, Map<InetSocketAddress, AuthInfo> infoMap,
int poolSize, long connectTimeout, final String name, boolean failureMode)
Expand All @@ -862,7 +880,8 @@ public XMemcachedClient(MemcachedSessionLocator locator, BufferAllocator allocat
throw new IllegalArgumentException("weights.length is less than addressList.size()");
}
this.optimiezeSetReadThreadCount(conf, addressMap == null ? 0 : addressMap.size());
this.buildConnector(locator, allocator, conf, socketOptions, commandFactory, transcoder);
this.buildConnector(locator, comparator, allocator, conf, socketOptions, commandFactory,
transcoder);
if (stateListeners != null) {
for (MemcachedClientStateListener stateListener : stateListeners) {
this.addStateListener(stateListener);
Expand Down Expand Up @@ -932,8 +951,8 @@ public XMemcachedClient(List<InetSocketAddress> addressList, CommandFactory cmdF
throw new IllegalArgumentException("Empty address list");
}
BufferAllocator simpleBufferAllocator = new SimpleBufferAllocator();
this.buildConnector(new ArrayMemcachedSessionLocator(), simpleBufferAllocator,
XMemcachedClientBuilder.getDefaultConfiguration(),
this.buildConnector(new ArrayMemcachedSessionLocator(), new IndexMemcachedSessionComparator(),
simpleBufferAllocator, XMemcachedClientBuilder.getDefaultConfiguration(),
XMemcachedClientBuilder.getDefaultSocketOptions(), cmdFactory, new SerializingTranscoder());
this.start0();
for (InetSocketAddress inetSocketAddress : addressList) {
Expand Down
38 changes: 31 additions & 7 deletions src/main/java/net/rubyeye/xmemcached/XMemcachedClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import net.rubyeye.xmemcached.command.TextCommandFactory;
import net.rubyeye.xmemcached.impl.ArrayMemcachedSessionLocator;
import net.rubyeye.xmemcached.impl.DefaultKeyProvider;
import net.rubyeye.xmemcached.impl.IndexMemcachedSessionComparator;
import net.rubyeye.xmemcached.impl.RandomMemcachedSessionLocaltor;
import net.rubyeye.xmemcached.transcoders.SerializingTranscoder;
import net.rubyeye.xmemcached.transcoders.Transcoder;
Expand All @@ -35,6 +36,7 @@ public class XMemcachedClientBuilder implements MemcachedClientBuilder {
private static final Logger log = LoggerFactory.getLogger(XMemcachedClientBuilder.class);

protected MemcachedSessionLocator sessionLocator = new ArrayMemcachedSessionLocator();
protected MemcachedSessionComparator sessionComparator = new IndexMemcachedSessionComparator();
protected BufferAllocator bufferAllocator = new SimpleBufferAllocator();
protected Configuration configuration = getDefaultConfiguration();
protected Map<InetSocketAddress, InetSocketAddress> addressMap =
Expand Down Expand Up @@ -265,6 +267,28 @@ public void setSessionLocator(MemcachedSessionLocator sessionLocator) {
this.sessionLocator = sessionLocator;
}

/*
* (non-Javadoc)
*
* @see net.rubyeye.xmemcached.MemcachedClientBuilder#getSessionComparator()
*/
public MemcachedSessionComparator getSessionComparator() {
return this.sessionComparator;
}

/*
* (non-Javadoc)
*
* @see net.rubyeye.xmemcached.MemcachedClientBuilder#setSessionComparator(net. rubyeye
* .xmemcached.MemcachedSessionComparator)
*/
public void setSessionComparator(MemcachedSessionComparator sessionComparator) {
if (sessionComparator == null) {
throw new IllegalArgumentException("Null SessionComparator");
}
this.sessionComparator = sessionComparator;
}

/*
* (non-Javadoc)
*
Expand Down Expand Up @@ -321,10 +345,10 @@ public MemcachedClient build() throws IOException {
}
}
if (this.weights == null) {
memcachedClient = new XMemcachedClient(this.sessionLocator, this.bufferAllocator,
this.configuration, this.socketOptions, this.commandFactory, this.transcoder,
this.addressMap, this.stateListeners, this.authInfoMap, this.connectionPoolSize,
this.connectTimeout, this.name, this.failureMode);
memcachedClient = new XMemcachedClient(this.sessionLocator, this.sessionComparator,
this.bufferAllocator, this.configuration, this.socketOptions, this.commandFactory,
this.transcoder, this.addressMap, this.stateListeners, this.authInfoMap,
this.connectionPoolSize, this.connectTimeout, this.name, this.failureMode);

} else {
if (this.addressMap == null) {
Expand All @@ -333,9 +357,9 @@ public MemcachedClient build() throws IOException {
if (this.addressMap.size() > this.weights.length) {
throw new IllegalArgumentException("Weights Array's length is less than server's number");
}
memcachedClient = new XMemcachedClient(this.sessionLocator, this.bufferAllocator,
this.configuration, this.socketOptions, this.commandFactory, this.transcoder,
this.addressMap, this.weights, this.stateListeners, this.authInfoMap,
memcachedClient = new XMemcachedClient(this.sessionLocator, this.sessionComparator,
this.bufferAllocator, this.configuration, this.socketOptions, this.commandFactory,
this.transcoder, this.addressMap, this.weights, this.stateListeners, this.authInfoMap,
this.connectionPoolSize, this.connectTimeout, this.name, this.failureMode);
}
this.configureClient(memcachedClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.code.yanf4j.core.SocketOption;
import net.rubyeye.xmemcached.CommandFactory;
import net.rubyeye.xmemcached.MemcachedClientStateListener;
import net.rubyeye.xmemcached.MemcachedSessionComparator;
import net.rubyeye.xmemcached.MemcachedSessionLocator;
import net.rubyeye.xmemcached.XMemcachedClient;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
Expand All @@ -25,6 +26,7 @@
import net.rubyeye.xmemcached.command.TextCommandFactory;
import net.rubyeye.xmemcached.exception.MemcachedException;
import net.rubyeye.xmemcached.impl.ArrayMemcachedSessionLocator;
import net.rubyeye.xmemcached.impl.IndexMemcachedSessionComparator;
import net.rubyeye.xmemcached.transcoders.SerializingTranscoder;
import net.rubyeye.xmemcached.transcoders.Transcoder;
import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;
Expand Down Expand Up @@ -184,8 +186,8 @@ public AWSElasticCacheClient(List<InetSocketAddress> addrs, long pollConfigInter
@SuppressWarnings("unchecked")
public AWSElasticCacheClient(List<InetSocketAddress> addrs, long pollConfigIntervalMills,
CommandFactory commandFactory) throws IOException {
this(new ArrayMemcachedSessionLocator(), new SimpleBufferAllocator(),
XMemcachedClientBuilder.getDefaultConfiguration(),
this(new ArrayMemcachedSessionLocator(), new IndexMemcachedSessionComparator(),
new SimpleBufferAllocator(), XMemcachedClientBuilder.getDefaultConfiguration(),
XMemcachedClientBuilder.getDefaultSocketOptions(), new TextCommandFactory(),
new SerializingTranscoder(), (List<MemcachedClientStateListener>) Collections.EMPTY_LIST,
(Map<InetSocketAddress, AuthInfo>) Collections.EMPTY_MAP, 1,
Expand All @@ -203,13 +205,13 @@ private static Map<InetSocketAddress, InetSocketAddress> getAddressMapFromConfig
return m;
}

AWSElasticCacheClient(MemcachedSessionLocator locator, BufferAllocator allocator,
Configuration conf, Map<SocketOption, Object> socketOptions, CommandFactory commandFactory,
Transcoder transcoder, List<MemcachedClientStateListener> stateListeners,
Map<InetSocketAddress, AuthInfo> map, int poolSize, long connectTimeout, String name,
boolean failureMode, List<InetSocketAddress> configAddrs, long pollConfigIntervalMills)
throws IOException {
super(locator, allocator, conf, socketOptions, commandFactory, transcoder,
AWSElasticCacheClient(MemcachedSessionLocator locator, MemcachedSessionComparator comparator,
BufferAllocator allocator, Configuration conf, Map<SocketOption, Object> socketOptions,
CommandFactory commandFactory, Transcoder transcoder,
List<MemcachedClientStateListener> stateListeners, Map<InetSocketAddress, AuthInfo> map,
int poolSize, long connectTimeout, String name, boolean failureMode,
List<InetSocketAddress> configAddrs, long pollConfigIntervalMills) throws IOException {
super(locator, comparator, allocator, conf, socketOptions, commandFactory, transcoder,
getAddressMapFromConfigAddrs(configAddrs), stateListeners, map, poolSize, connectTimeout,
name, failureMode);
if (pollConfigIntervalMills <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ public AWSElasticCacheClientBuilder(List<InetSocketAddress> configAddrs) {
@Override
public AWSElasticCacheClient build() throws IOException {

AWSElasticCacheClient memcachedClient = new AWSElasticCacheClient(this.sessionLocator,
this.bufferAllocator, this.configuration, this.socketOptions, this.commandFactory,
this.transcoder, this.stateListeners, this.authInfoMap, this.connectionPoolSize,
this.connectTimeout, this.name, this.failureMode, configAddrs, this.pollConfigIntervalMs);
AWSElasticCacheClient memcachedClient =
new AWSElasticCacheClient(this.sessionLocator, this.sessionComparator, this.bufferAllocator,
this.configuration, this.socketOptions, this.commandFactory, this.transcoder,
this.stateListeners, this.authInfoMap, this.connectionPoolSize, this.connectTimeout,
this.name, this.failureMode, configAddrs, this.pollConfigIntervalMs);
this.configureClient(memcachedClient);

return memcachedClient;
Expand Down
Loading