Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support jedis cluster #3822

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public class MetadataReportConfig extends AbstractConfig {
// Request timeout in milliseconds for register center
private Integer timeout;

/**
* It is the same as registry
*/
private String cluster;

/**
* The group the metadata in . It is the same as registry
*/
Expand Down Expand Up @@ -111,6 +116,14 @@ public void setTimeout(Integer timeout) {
this.timeout = timeout;
}

public String getCluster() {
return cluster;
}

public void setCluster(String cluster) {
this.cluster = cluster;
}

public Map<String, String> getParameters() {
return parameters;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,22 @@
import java.util.function.Supplier;

/**
* MetadataReportService
*
* <p>to publish provider {@link FullServiceDefinition} and consumer define</p>
*
* @since 2.7.0
*/
public class MetadataReportService {

protected final Logger logger = LoggerFactory.getLogger(getClass());

private static volatile MetadataReportService metadataReportService;
private static Object lock = new Object();

private MetadataReportFactory metadataReportFactory = ExtensionLoader.getExtensionLoader(MetadataReportFactory.class).getAdaptiveExtension();
private static final Object LOCK = new Object();

MetadataReport metadataReport;

URL metadataReportUrl;

MetadataReportService(URL metadataReportURL) {
Expand All @@ -55,14 +60,15 @@ public class MetadataReportService {
.build();
}
this.metadataReportUrl = metadataReportURL;
MetadataReportFactory metadataReportFactory = ExtensionLoader.getExtensionLoader(MetadataReportFactory.class).getAdaptiveExtension();
metadataReport = metadataReportFactory.getMetadataReport(this.metadataReportUrl);

}


public static MetadataReportService instance(Supplier<URL> metadataReportUrl) {
if (metadataReportService == null) {
synchronized (lock) {
synchronized (LOCK) {
if (metadataReportService == null) {
URL metadataReportURLTmp = metadataReportUrl.get();
if (metadataReportURLTmp == null) {
Expand All @@ -76,18 +82,16 @@ public static MetadataReportService instance(Supplier<URL> metadataReportUrl) {
}

public void publishProvider(URL providerUrl) throws RpcException {
//first add into the list
// first add into the list
// remove the individul param
providerUrl = providerUrl.removeParameters(Constants.PID_KEY, Constants.TIMESTAMP_KEY, Constants.BIND_IP_KEY, Constants.BIND_PORT_KEY, Constants.TIMESTAMP_KEY);
providerUrl = removeIndividulParameters(providerUrl);

try {
String interfaceName = providerUrl.getParameter(Constants.INTERFACE_KEY);
if (StringUtils.isNotEmpty(interfaceName)) {
Class interfaceClass = Class.forName(interfaceName);
FullServiceDefinition fullServiceDefinition = ServiceDefinitionBuilder.buildFullDefinition(interfaceClass, providerUrl.getParameters());
metadataReport.storeProviderMetadata(new MetadataIdentifier(providerUrl.getServiceInterface(),
providerUrl.getParameter(Constants.VERSION_KEY), providerUrl.getParameter(Constants.GROUP_KEY),
Constants.PROVIDER_SIDE,providerUrl.getParameter(Constants.APPLICATION_KEY)), fullServiceDefinition);
metadataReport.storeProviderMetadata(generateMetadataIdentifierByURL(providerUrl, Constants.PROVIDER_SIDE), fullServiceDefinition);
return;
}
logger.error("publishProvider interfaceName is empty . providerUrl: " + providerUrl.toFullString());
Expand All @@ -97,11 +101,39 @@ public void publishProvider(URL providerUrl) throws RpcException {
}
}




public void publishConsumer(URL consumerURL) throws RpcException {
consumerURL = consumerURL.removeParameters(Constants.PID_KEY, Constants.TIMESTAMP_KEY, Constants.BIND_IP_KEY, Constants.BIND_PORT_KEY, Constants.TIMESTAMP_KEY);
metadataReport.storeConsumerMetadata(new MetadataIdentifier(consumerURL.getServiceInterface(),
consumerURL.getParameter(Constants.VERSION_KEY), consumerURL.getParameter(Constants.GROUP_KEY),Constants.CONSUMER_SIDE,
consumerURL.getParameter(Constants.APPLICATION_KEY)), consumerURL.getParameters());
consumerURL = removeIndividulParameters(consumerURL);
metadataReport.storeConsumerMetadata(generateMetadataIdentifierByURL(consumerURL, Constants.CONSUMER_SIDE),
consumerURL.getParameters());
}

/**
* according to {@link URL} to generate {@link MetadataIdentifier}
*
* @param url {@link URL}
* @param side provider/consumer
* @return MetadataIdentifier
*/
private MetadataIdentifier generateMetadataIdentifierByURL(URL url, String side) {
return new MetadataIdentifier(url.getServiceInterface(),
url.getParameter(Constants.VERSION_KEY), url.getParameter(Constants.GROUP_KEY),
side, url.getParameter(Constants.APPLICATION_KEY));
}

/**
* remove the individul param
*
* <p>such as {pid_key, timestamp, etc..}, those keys do not need to store metadata center,
* because it can not to help user to know something. </p>
*
* @param url {@link URL}
* @return url
*/
private URL removeIndividulParameters(URL url) {
return url.removeParameters(Constants.PID_KEY, Constants.TIMESTAMP_KEY, Constants.BIND_IP_KEY, Constants.BIND_PORT_KEY, Constants.TIMESTAMP_KEY);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
*/
package org.apache.dubbo.metadata.store.redis;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.metadata.identifier.MetadataIdentifier;
import org.apache.dubbo.metadata.store.redis.support.RedisUrlUtils;
import org.apache.dubbo.metadata.support.AbstractMetadataReport;
import org.apache.dubbo.rpc.RpcException;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.*;

import java.util.Set;

/**
* RedisMetadataReport
Expand All @@ -33,11 +35,26 @@ public class RedisMetadataReport extends AbstractMetadataReport {

private final static Logger logger = LoggerFactory.getLogger(RedisMetadataReport.class);

final JedisPool pool;
JedisPool pool;

JedisCluster cluster;

public RedisMetadataReport(URL url) {
super(url);
pool = new JedisPool(new JedisPoolConfig(), url.getHost(), url.getPort());
if (isSingleton(url)) {
pool = new JedisPool(parsePoolConfig(url), url.getHost(), url.getPort());
} else {
Set<HostAndPort> hostAndPortsSet = RedisUrlUtils.parseHostAndPorts(url);
cluster = new JedisCluster(hostAndPortsSet, parsePoolConfig(url));
}
}

private boolean isSingleton(URL url) {
// if address contains split char ",", return false
if (Constants.COMMA_SPLIT_PATTERN.matcher(url.getAddress()).find()) {
return false;
}
return url.getParameter("cluster") == null || "false".equals(url.getParameter("cluster"));
}

@Override
Expand All @@ -51,13 +68,32 @@ protected void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdenti
}

private void storeMetadata(MetadataIdentifier metadataIdentifier, String v) {
try (Jedis jedis = pool.getResource()) {
jedis.set(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG, v);
try {
doStoreMetadata(metadataIdentifier, v);
} catch (Throwable e) {
logger.error("Failed to put " + metadataIdentifier + " to redis " + v + ", cause: " + e.getMessage(), e);
throw new RpcException("Failed to put " + metadataIdentifier + " to redis " + v + ", cause: " + e.getMessage(), e);
}
}

private void doStoreMetadata(MetadataIdentifier metadataIdentifier, String v) {
if (cluster == null) {
try (Jedis jedis = pool.getResource()) {
jedis.set(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG, v);
} catch (Throwable e) {
throw e;
}
} else {
cluster.set(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG, v);
}
}

private JedisPoolConfig parsePoolConfig(URL url) {
JedisPoolConfig jpc = new JedisPoolConfig();
//FIXME should add some config
return jpc;
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.metadata.store.redis.support;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils;
import redis.clients.jedis.HostAndPort;

import java.util.LinkedHashSet;
import java.util.Set;

/**
* redis url helper
*
* @author wuhulala
* @date 2019/4/7
*/
public class RedisUrlUtils {

/**
* parse url
*
* @param url url such as redis://127.0.0.1:6379,127.0.0.1:6380
* @return such as return {HostAndPort("127.0.0.1",6379), HostAndPort("127.0.0.1",6380)}
*/
public static Set<HostAndPort> parseHostAndPorts(URL url) {
Set<HostAndPort> hostAndPortsSet = new LinkedHashSet<>(32);
String hapStr = url.getAddress();
if (StringUtils.isNotEmpty(hapStr)) {
String[] nodes = Constants.COMMA_SPLIT_PATTERN.split(hapStr);
for (String node : nodes) {
int splitIndex = node.indexOf(":");
String host = node.substring(0, splitIndex);
int port = Integer.valueOf(node.substring(splitIndex + 1));
hostAndPortsSet.add(new HostAndPort(host, port));
}
}
return hostAndPortsSet;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.metadata.definition.ServiceDefinitionBuilder;
import org.apache.dubbo.metadata.definition.model.FullServiceDefinition;
import org.apache.dubbo.metadata.identifier.MetadataIdentifier;
Expand All @@ -28,12 +29,18 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.embedded.RedisServer;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import static org.apache.dubbo.common.Constants.SYNC_REPORT_KEY;
import static org.apache.dubbo.metadata.store.MetadataReport.META_DATA_STORE_TAG;
Expand Down Expand Up @@ -77,7 +84,7 @@ private void testStoreProvider(RedisMetadataReport redisMetadataReport, String v
String interfaceName = "org.apache.dubbo.metadata.store.redis.RedisMetadata4TstService";
String group = null;
String application = "vic.redis.md";
MetadataIdentifier providerMetadataIdentifier = storePrivider(redisMetadataReport, interfaceName, version, group, application);
MetadataIdentifier providerMetadataIdentifier = storeProvider(redisMetadataReport, interfaceName, version, group, application);
Jedis jedis = null;
try {
jedis = redisMetadataReport.pool.getResource();
Expand Down Expand Up @@ -138,7 +145,7 @@ private void testStoreConsumer(RedisMetadataReport redisMetadataReport, String v
}
}

private MetadataIdentifier storePrivider(RedisMetadataReport redisMetadataReport, String interfaceName, String version, String group, String application) throws ClassNotFoundException {
private MetadataIdentifier storeProvider(RedisMetadataReport redisMetadataReport, String interfaceName, String version, String group, String application) throws ClassNotFoundException {
URL url = URL.valueOf("xxx://" + NetUtils.getLocalAddress().getHostName() + ":4444/" + interfaceName + "?paramTest=redisTest&version=" + version + "&application="
+ application + (group == null ? "" : "&group=" + group));

Expand Down Expand Up @@ -173,4 +180,29 @@ private MetadataIdentifier storeConsumer(RedisMetadataReport redisMetadataReport
return consumerMetadataIdentifier;
}

public static class RedisMetadataReportInnerTest {

@Test
public void testParseSingletonUrl() {
URL url = URL.valueOf("redis://127.0.0.1:6379");
RedisMetadataReport redisMetadataReport = new RedisMetadataReport(url);
Assertions.assertNotNull(redisMetadataReport.pool);
}

@Test
public void testParseClusterUrlWithClusterConfig() {
URL url = URL.valueOf("redis://127.0.0.1:6379?cluster=true");
RedisMetadataReport redisMetadataReport = new RedisMetadataReport(url);
Assertions.assertNotNull(redisMetadataReport.cluster);
}

@Test
public void testParseClusterUrl() {
URL url = URL.valueOf("redis://127.0.0.1:6379,127.0.0.1:6380,127.0.0.1:6381,127.0.0.1:6382,127.0.0.1:6383,127.0.0.1:6384");
RedisMetadataReport redisMetadataReport = new RedisMetadataReport(url);
Assertions.assertNotNull(redisMetadataReport.cluster);
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.metadata.store.redis.support;

import org.apache.dubbo.common.URL;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.HostAndPort;

import java.util.Arrays;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.*;

class RedisUrlUtilsTest {

@Test
void parseHostAndPorts() {
URL url = URL.valueOf("redis://127.0.0.1:6379,127.0.0.1:6380,127.0.0.1:6381,127.0.0.1:6382,127.0.0.1:6383,127.0.0.1:6384");
Set<HostAndPort> hostAndPorts = RedisUrlUtils.parseHostAndPorts(url);

Assertions.assertIterableEquals(hostAndPorts, Arrays.asList(
new HostAndPort("127.0.0.1", 6379),
new HostAndPort("127.0.0.1", 6380),
new HostAndPort("127.0.0.1", 6381),
new HostAndPort("127.0.0.1", 6382),
new HostAndPort("127.0.0.1", 6383),
new HostAndPort("127.0.0.1", 6384)
));
}
}