Skip to content

Commit

Permalink
[ISSUE #8705] Make MQClientAPIFactory shutdown async (#8706)
Browse files Browse the repository at this point in the history
  • Loading branch information
qianye1001 authored Sep 19, 2024
1 parent 6024db7 commit 0d6c94b
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
Expand Down Expand Up @@ -184,9 +185,9 @@
import org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetTopicsByClusterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetUserRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ListAclsRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ListUsersRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
Expand Down Expand Up @@ -247,7 +248,7 @@

import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;

public class MQClientAPIImpl implements NameServerUpdateCallback {
public class MQClientAPIImpl implements NameServerUpdateCallback, StartAndShutdown {
private final static Logger log = LoggerFactory.getLogger(MQClientAPIImpl.class);
private static boolean sendSmartMsg =
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.rocketmq.client.common.NameserverAccessConfig;
import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.utils.AsyncShutdownHelper;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
Expand Down Expand Up @@ -85,9 +86,11 @@ public void start() throws Exception {

@Override
public void shutdown() throws Exception {
AsyncShutdownHelper helper = new AsyncShutdownHelper();
for (int i = 0; i < this.clientNum; i++) {
clients[i].shutdown();
helper.addTarget(clients[i]);
}
helper.shutdown().await(Integer.MAX_VALUE, TimeUnit.SECONDS);
}

protected MQClientAPIExt createAndStart(String instanceName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class AsyncShutdownHelper {
private final AtomicBoolean shutdown;
private final List<Shutdown> targetList;

private CountDownLatch countDownLatch;

public AsyncShutdownHelper() {
this.targetList = new ArrayList<>();
this.shutdown = new AtomicBoolean(false);
}

public void addTarget(Shutdown target) {
if (shutdown.get()) {
return;
}
targetList.add(target);
}

public AsyncShutdownHelper shutdown() {
if (shutdown.get()) {
return this;
}
if (targetList.isEmpty()) {
return this;
}
this.countDownLatch = new CountDownLatch(targetList.size());
for (Shutdown target : targetList) {
Runnable runnable = () -> {
try {
target.shutdown();
} catch (Exception ignored) {

} finally {
countDownLatch.countDown();
}
};
new Thread(runnable).start();
}
return this;
}

public boolean await(long time, TimeUnit unit) throws InterruptedException {
if (shutdown.get()) {
return false;
}
try {
return this.countDownLatch.await(time, unit);
} finally {
shutdown.compareAndSet(false, true);
}
}
}

0 comments on commit 0d6c94b

Please sign in to comment.