Skip to content

Commit

Permalink
Support multiple shared links (#2457)
Browse files Browse the repository at this point in the history
* make dubbo support multiple shared links, upgrading RPC throughput

* Fix compilation error

* Fix compilation error

* opti import

* if add {}

* checkstyle fail

* fix getSharedClient referenceCount calculation error bug

* 优化 import

* Fix the problem that the getSharedClient thread is not safe

* Fix the problem that the getSharedClient thread is not safe

* Try fixing ci error, https://travis-ci.org/apache/incubator-dubbo/jobs/453185295

* 将DEFAULT_CONNECTIONS_KEY修改成SERVICE_CONNECTIONS_KEY

* dubbo.xsd add shareconnections attribute,

* Optimize code format

* Fix mult connect ghost connect  problem

* format code

* Remove the concept of ghostClientMap and ghost connection. In fact, ghostClient is LazyConnectExchangeClient. At present, the LazyConnectExchangeClient object is added directly in ReferenceCountExchangeClient to realize the mapping relationship with ReferenceCountExchangeClient. The relationship between previous ghostClient and url mapping is not applicable to the current new share. Multiple connections.

* Optimize the ReferenceCountExchangeClient and remove the reference to the lazyConnectExchangeClient because it doesn't make much sense; add locks in the close operation of the AbstractClient, because connect, disconnect, and close should not be done at the same time.

* format code

* try remove close lock

* Restore close method

* Restore ReferenceCountExchangeClient reference to LazyConnectExchangeClient object

* Optimize the logic of using the LazyConnectExchangeClient inside the ReferenceCountExchangeClient; Supplemental shared multi-connected unit test
  • Loading branch information
yizhenqiang authored and beiwei30 committed Feb 3, 2019
1 parent c2c9de9 commit d2b5914
Show file tree
Hide file tree
Showing 8 changed files with 379 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,13 @@ public class Constants {

public static final int DEFAULT_ALIVE = 60 * 1000;

public static final int DEFAULT_CONNECTIONS = 0;
/**
* By default, a consumer JVM instance and a provider JVM instance share a long TCP connection (except when connections are set),
* which can set the number of long TCP connections shared to avoid the bottleneck of sharing a single long TCP connection.
*/
public static final String DEFAULT_SHARE_CONNECTIONS = "1";

public static final String SHARE_CONNECTIONS_KEY = "shareconnections";

public static final int DEFAULT_ACCEPTS = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ public class ConsumerConfig extends AbstractReferenceConfig {
*/
private Integer queues;

/**
* By default, a TCP long-connection communication is shared between the consumer process and the provider process.
* This property can be set to share multiple TCP long-connection communications. Note that only the dubbo protocol takes effect.
*/
private Integer shareconnections;

@Override
public void setTimeout(Integer timeout) {
super.setTimeout(timeout);
Expand Down Expand Up @@ -118,4 +124,12 @@ public Integer getQueues() {
public void setQueues(Integer queues) {
this.queues = queues;
}

public Integer getShareconnections() {
return shareconnections;
}

public void setShareconnections(Integer shareconnections) {
this.shareconnections = shareconnections;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,12 @@
<xsd:documentation><![CDATA[ The thread pool queue size. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="shareconnections" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
<![CDATA[ The default share connections. default share one connection. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:anyAttribute namespace="##other" processContents="lax"/>
</xsd:extension>
</xsd:complexContent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,30 +174,38 @@ public void send(Object message, boolean sent) throws RemotingException {
}

protected void connect() throws RemotingException {

connectLock.lock();

try {

if (isConnected()) {
return;
}

doConnect();

if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");

} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}

} catch (RemotingException e) {
throw e;

} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);

} finally {
connectLock.unlock();
}
Expand Down Expand Up @@ -241,23 +249,27 @@ public void reconnect() throws RemotingException {

@Override
public void close() {

try {
super.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}

try {
if (executor != null) {
ExecutorUtil.shutdownNow(executor, 100);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}

try {
disconnect();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}

try {
doClose();
} catch (Throwable e) {
Expand Down Expand Up @@ -310,5 +322,4 @@ public String toString() {
* @return channel
*/
protected abstract Channel getChannel();

}
Loading

0 comments on commit d2b5914

Please sign in to comment.