diff --git a/.github/workflows/pr-ci.yml b/.github/workflows/pr-ci.yml index e00f43ccc5ff..99d7309fd0cf 100644 --- a/.github/workflows/pr-ci.yml +++ b/.github/workflows/pr-ci.yml @@ -21,7 +21,7 @@ jobs: - name: Build distribution tar run: | mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 name: Upload distribution tar with: name: rocketmq @@ -30,7 +30,7 @@ jobs: run: | mkdir -p ./pr echo ${{ github.event.number }} > ./pr/NR - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 with: name: pr path: pr/ diff --git a/.github/workflows/pr-e2e-test.yml b/.github/workflows/pr-e2e-test.yml index f9bb3bde75ae..b1ff83eec39f 100644 --- a/.github/workflows/pr-e2e-test.yml +++ b/.github/workflows/pr-e2e-test.yml @@ -68,7 +68,7 @@ jobs: mkdir versionlist touch versionlist/"${version}-`echo ${{ matrix.base-image }} | sed -e "s/:/-/g"`" sh ./build-image-local.sh ${version} ${{ matrix.base-image }} ${{ matrix.java-version }} ${DOCKER_REPO} - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 name: Upload distribution tar with: name: versionlist @@ -158,7 +158,7 @@ jobs: annotate_only: true include_passed: true detailed_summary: true - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 if: always() name: Upload test log with: @@ -199,7 +199,7 @@ jobs: annotate_only: true include_passed: true detailed_summary: true - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 if: always() name: Upload test log with: @@ -235,7 +235,7 @@ jobs: annotate_only: true include_passed: true detailed_summary: true - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 if: always() name: Upload test log with: diff --git a/.github/workflows/push-ci.yml b/.github/workflows/push-ci.yml index 2fe62dbeb069..a522241a0ac8 100644 --- a/.github/workflows/push-ci.yml +++ b/.github/workflows/push-ci.yml @@ -31,7 +31,7 @@ jobs: - name: Build distribution tar run: | mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 name: Upload distribution tar with: name: rocketmq @@ -72,7 +72,7 @@ jobs: mkdir versionlist touch versionlist/"${version}-`echo ${{ matrix.base-image }} | sed -e "s/:/-/g"`" sh ./build-image-local.sh ${version} ${{ matrix.base-image }} ${{ matrix.java-version }} ${DOCKER_REPO} - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 name: Upload distribution tar with: name: versionlist @@ -163,7 +163,7 @@ jobs: annotate_only: true include_passed: true detailed_summary: true - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 if: always() name: Upload test log with: @@ -204,7 +204,7 @@ jobs: annotate_only: true include_passed: true detailed_summary: true - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 if: always() name: Upload test log with: @@ -240,7 +240,7 @@ jobs: annotate_only: true include_passed: true detailed_summary: true - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 if: always() name: Upload test log with: diff --git a/.github/workflows/snapshot-automation.yml b/.github/workflows/snapshot-automation.yml index 99855d3aa0d0..63b19417fe02 100644 --- a/.github/workflows/snapshot-automation.yml +++ b/.github/workflows/snapshot-automation.yml @@ -69,7 +69,7 @@ jobs: MAVEN_SETTINGS: ${{ github.workspace }}/.github/asf-deploy-settings.xml run: | mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 name: Upload distribution tar with: name: rocketmq @@ -110,7 +110,7 @@ jobs: mkdir versionlist touch versionlist/"${version}-`echo ${{ matrix.base-image }} | sed -e "s/:/-/g"`" sh ./build-image-local.sh ${version} ${{ matrix.base-image }} ${{ matrix.java-version }} ${DOCKER_REPO} - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 name: Upload distribution tar with: name: versionlist @@ -200,7 +200,7 @@ jobs: annotate_only: true include_passed: true detailed_summary: true - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 if: always() name: Upload test log with: diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java b/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java index 02d5df236f56..e69abdaf8058 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java @@ -171,7 +171,7 @@ public List build(ChannelHandlerContext context, Re subject = User.of(fields.get(SessionCredentials.ACCESS_KEY)); } String remoteAddr = RemotingHelper.parseChannelRemoteAddr(context.channel()); - String sourceIp = StringUtils.substringBefore(remoteAddr, CommonConstants.COLON); + String sourceIp = StringUtils.substringBeforeLast(remoteAddr, CommonConstants.COLON); Resource topic; Resource group; @@ -394,7 +394,7 @@ private List newContext(Metadata metadata, QueryRou subject = User.of(metadata.get(GrpcConstants.AUTHORIZATION_AK)); } Resource resource = Resource.ofTopic(topic.getName()); - String sourceIp = StringUtils.substringBefore(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON); + String sourceIp = StringUtils.substringBeforeLast(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON); DefaultAuthorizationContext context = DefaultAuthorizationContext.of(subject, resource, Arrays.asList(Action.PUB, Action.SUB), sourceIp); return Collections.singletonList(context); } @@ -437,7 +437,7 @@ private static List newPubContext(Metadata metadata subject = User.of(metadata.get(GrpcConstants.AUTHORIZATION_AK)); } Resource resource = Resource.ofTopic(topic.getName()); - String sourceIp = StringUtils.substringBefore(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON); + String sourceIp = StringUtils.substringBeforeLast(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON); DefaultAuthorizationContext context = DefaultAuthorizationContext.of(subject, resource, Action.PUB, sourceIp); return Collections.singletonList(context); } @@ -483,7 +483,7 @@ private static List newSubContexts(Metadata metadat if (metadata.containsKey(GrpcConstants.AUTHORIZATION_AK)) { subject = User.of(metadata.get(GrpcConstants.AUTHORIZATION_AK)); } - String sourceIp = StringUtils.substringBefore(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON); + String sourceIp = StringUtils.substringBeforeLast(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON); result.add(DefaultAuthorizationContext.of(subject, resource, Action.SUB, sourceIp)); return result; } diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authorization/factory/AuthorizationFactory.java b/auth/src/main/java/org/apache/rocketmq/auth/authorization/factory/AuthorizationFactory.java index f87a5304cb74..29748a9ed448 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authorization/factory/AuthorizationFactory.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authorization/factory/AuthorizationFactory.java @@ -105,7 +105,7 @@ public static AuthorizationEvaluator getEvaluator(AuthConfig config, Supplier public static AuthorizationStrategy getStrategy(AuthConfig config, Supplier metadataService) { try { Class clazz = StatelessAuthorizationStrategy.class; - if (StringUtils.isNotBlank(config.getAuthenticationStrategy())) { + if (StringUtils.isNotBlank(config.getAuthorizationStrategy())) { clazz = (Class) Class.forName(config.getAuthorizationStrategy()); } return clazz.getDeclaredConstructor(AuthConfig.class, Supplier.class).newInstance(config, metadataService); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index a3276cd7823a..3f90b67ec996 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -164,10 +164,6 @@ private enum SubscriptionType { public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) { this.defaultLitePullConsumer = defaultLitePullConsumer; this.rpcHook = rpcHook; - this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor( - this.defaultLitePullConsumer.getPullThreadNums(), - new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup()) - ); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MonitorMessageQueueChangeThread")); this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException(); } @@ -293,6 +289,8 @@ public synchronized void start() throws MQClientException { this.defaultLitePullConsumer.changeInstanceNameToPID(); } + initScheduledThreadPoolExecutor(); + initMQClientFactory(); initRebalanceImpl(); @@ -324,6 +322,13 @@ public synchronized void start() throws MQClientException { } } + private void initScheduledThreadPoolExecutor() { + this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor( + this.defaultLitePullConsumer.getPullThreadNums(), + new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup()) + ); + } + private void initMQClientFactory() throws MQClientException { this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook); boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 0e70ee25951a..74a2516174a8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -194,6 +194,14 @@ public void setSemaphoreAsyncSendSize(int size) { semaphoreAsyncSendSize = new Semaphore(size, true); } + public int getSemaphoreAsyncSendNumAvailablePermits() { + return semaphoreAsyncSendNum == null ? 0 : semaphoreAsyncSendNum.availablePermits(); + } + + public int getSemaphoreAsyncSendSizeAvailablePermits() { + return semaphoreAsyncSendSize == null ? 0 : semaphoreAsyncSendSize.availablePermits(); + } + public void initTransactionEnv() { TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer; if (producer.getExecutorService() != null) { @@ -563,7 +571,7 @@ public void run() { class BackpressureSendCallBack implements SendCallback { public boolean isSemaphoreAsyncSizeAcquired = false; - public boolean isSemaphoreAsyncNumbAcquired = false; + public boolean isSemaphoreAsyncNumAcquired = false; public int msgLen; private final SendCallback sendCallback; @@ -573,24 +581,49 @@ public BackpressureSendCallBack(final SendCallback sendCallback) { @Override public void onSuccess(SendResult sendResult) { - if (isSemaphoreAsyncSizeAcquired) { - semaphoreAsyncSendSize.release(msgLen); - } - if (isSemaphoreAsyncNumbAcquired) { - semaphoreAsyncSendNum.release(); - } + semaphoreProcessor(); sendCallback.onSuccess(sendResult); } @Override public void onException(Throwable e) { + semaphoreProcessor(); + sendCallback.onException(e); + } + + public void semaphoreProcessor() { if (isSemaphoreAsyncSizeAcquired) { + defaultMQProducer.acquireBackPressureForAsyncSendSizeLock(); semaphoreAsyncSendSize.release(msgLen); + defaultMQProducer.releaseBackPressureForAsyncSendSizeLock(); } - if (isSemaphoreAsyncNumbAcquired) { + if (isSemaphoreAsyncNumAcquired) { + defaultMQProducer.acquireBackPressureForAsyncSendNumLock(); semaphoreAsyncSendNum.release(); + defaultMQProducer.releaseBackPressureForAsyncSendNumLock(); } - sendCallback.onException(e); + } + + public void semaphoreAsyncAdjust(int semaphoreAsyncNum, int semaphoreAsyncSize) throws InterruptedException { + defaultMQProducer.acquireBackPressureForAsyncSendNumLock(); + if (semaphoreAsyncNum > 0) { + semaphoreAsyncSendNum.release(semaphoreAsyncNum); + } else { + semaphoreAsyncSendNum.acquire(- semaphoreAsyncNum); + } + defaultMQProducer.setBackPressureForAsyncSendNumInsideAdjust(defaultMQProducer.getBackPressureForAsyncSendNum() + + semaphoreAsyncNum); + defaultMQProducer.releaseBackPressureForAsyncSendNumLock(); + + defaultMQProducer.acquireBackPressureForAsyncSendSizeLock(); + if (semaphoreAsyncSize > 0) { + semaphoreAsyncSendSize.release(semaphoreAsyncSize); + } else { + semaphoreAsyncSendSize.acquire(- semaphoreAsyncSize); + } + defaultMQProducer.setBackPressureForAsyncSendSizeInsideAdjust(defaultMQProducer.getBackPressureForAsyncSendSize() + + semaphoreAsyncSize); + defaultMQProducer.releaseBackPressureForAsyncSendSizeLock(); } } @@ -599,32 +632,40 @@ public void executeAsyncMessageSend(Runnable runnable, final Message msg, final throws MQClientException, InterruptedException { ExecutorService executor = this.getAsyncSenderExecutor(); boolean isEnableBackpressureForAsyncMode = this.getDefaultMQProducer().isEnableBackpressureForAsyncMode(); - boolean isSemaphoreAsyncNumbAcquired = false; + boolean isSemaphoreAsyncNumAcquired = false; boolean isSemaphoreAsyncSizeAcquired = false; int msgLen = msg.getBody() == null ? 1 : msg.getBody().length; + sendCallback.msgLen = msgLen; try { if (isEnableBackpressureForAsyncMode) { + defaultMQProducer.acquireBackPressureForAsyncSendNumLock(); long costTime = System.currentTimeMillis() - beginStartTime; - isSemaphoreAsyncNumbAcquired = timeout - costTime > 0 + + isSemaphoreAsyncNumAcquired = timeout - costTime > 0 && semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS); - if (!isSemaphoreAsyncNumbAcquired) { + sendCallback.isSemaphoreAsyncNumAcquired = isSemaphoreAsyncNumAcquired; + defaultMQProducer.releaseBackPressureForAsyncSendNumLock(); + if (!isSemaphoreAsyncNumAcquired) { sendCallback.onException( new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout")); return; } + + defaultMQProducer.acquireBackPressureForAsyncSendSizeLock(); costTime = System.currentTimeMillis() - beginStartTime; + isSemaphoreAsyncSizeAcquired = timeout - costTime > 0 && semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS); + sendCallback.isSemaphoreAsyncSizeAcquired = isSemaphoreAsyncSizeAcquired; + defaultMQProducer.releaseBackPressureForAsyncSendSizeLock(); if (!isSemaphoreAsyncSizeAcquired) { sendCallback.onException( new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout")); return; } } - sendCallback.isSemaphoreAsyncSizeAcquired = isSemaphoreAsyncSizeAcquired; - sendCallback.isSemaphoreAsyncNumbAcquired = isSemaphoreAsyncNumbAcquired; - sendCallback.msgLen = msgLen; + executor.submit(runnable); } catch (RejectedExecutionException e) { if (isEnableBackpressureForAsyncMode) { diff --git a/client/src/main/java/org/apache/rocketmq/client/lock/ReadWriteCASLock.java b/client/src/main/java/org/apache/rocketmq/client/lock/ReadWriteCASLock.java new file mode 100644 index 000000000000..3d157313715c --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/lock/ReadWriteCASLock.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.lock; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class ReadWriteCASLock { + //true : can lock ; false : not lock + private final AtomicBoolean writeLock = new AtomicBoolean(true); + + private final AtomicInteger readLock = new AtomicInteger(0); + + public void acquireWriteLock() { + boolean isLock = false; + do { + isLock = writeLock.compareAndSet(true, false); + } while (!isLock); + + do { + isLock = readLock.get() == 0; + } while (!isLock); + } + + public void releaseWriteLock() { + this.writeLock.compareAndSet(false, true); + } + + public void acquireReadLock() { + boolean isLock = false; + do { + isLock = writeLock.get(); + } while (!isLock); + readLock.getAndIncrement(); + } + + public void releaseReadLock() { + this.readLock.getAndDecrement(); + } + + public boolean getWriteLock() { + return this.writeLock.get() && this.readLock.get() == 0; + } + + public boolean getReadLock() { + return this.writeLock.get(); + } + +} diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index b47c01f6764a..f0842de8ba79 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.client.exception.RequestTimeoutException; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.lock.ReadWriteCASLock; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.hook.EndTransactionTraceHookImpl; @@ -175,6 +176,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { private RPCHook rpcHook = null; + /** + * backPressureForAsyncSendNum is guaranteed to be modified at runtime and no new requests are allowed + */ + private final ReadWriteCASLock backPressureForAsyncSendNumLock = new ReadWriteCASLock(); + + /** + * backPressureForAsyncSendSize is guaranteed to be modified at runtime and no new requests are allowed + */ + private final ReadWriteCASLock backPressureForAsyncSendSizeLock = new ReadWriteCASLock(); + /** * Compress level of compress algorithm. */ @@ -1334,18 +1345,64 @@ public int getBackPressureForAsyncSendNum() { return backPressureForAsyncSendNum; } + /** + * For user modify backPressureForAsyncSendNum at runtime + */ public void setBackPressureForAsyncSendNum(int backPressureForAsyncSendNum) { + this.backPressureForAsyncSendNumLock.acquireWriteLock(); + backPressureForAsyncSendNum = Math.max(backPressureForAsyncSendNum, 10); + int acquiredBackPressureForAsyncSendNum = this.backPressureForAsyncSendNum + - defaultMQProducerImpl.getSemaphoreAsyncSendNumAvailablePermits(); this.backPressureForAsyncSendNum = backPressureForAsyncSendNum; - defaultMQProducerImpl.setSemaphoreAsyncSendNum(backPressureForAsyncSendNum); + defaultMQProducerImpl.setSemaphoreAsyncSendNum(backPressureForAsyncSendNum - acquiredBackPressureForAsyncSendNum); + this.backPressureForAsyncSendNumLock.releaseWriteLock(); } public int getBackPressureForAsyncSendSize() { return backPressureForAsyncSendSize; } + /** + * For user modify backPressureForAsyncSendSize at runtime + */ public void setBackPressureForAsyncSendSize(int backPressureForAsyncSendSize) { + this.backPressureForAsyncSendSizeLock.acquireWriteLock(); + backPressureForAsyncSendSize = Math.max(backPressureForAsyncSendSize, 1024 * 1024); + int acquiredBackPressureForAsyncSendSize = this.backPressureForAsyncSendSize + - defaultMQProducerImpl.getSemaphoreAsyncSendSizeAvailablePermits(); + this.backPressureForAsyncSendSize = backPressureForAsyncSendSize; + defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize - acquiredBackPressureForAsyncSendSize); + this.backPressureForAsyncSendSizeLock.releaseWriteLock(); + } + + /** + * Used for system internal adjust backPressureForAsyncSendSize + */ + public void setBackPressureForAsyncSendSizeInsideAdjust(int backPressureForAsyncSendSize) { this.backPressureForAsyncSendSize = backPressureForAsyncSendSize; - defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize); + } + + /** + * Used for system internal adjust backPressureForAsyncSendNum + */ + public void setBackPressureForAsyncSendNumInsideAdjust(int backPressureForAsyncSendNum) { + this.backPressureForAsyncSendNum = backPressureForAsyncSendNum; + } + + public void acquireBackPressureForAsyncSendSizeLock() { + this.backPressureForAsyncSendSizeLock.acquireReadLock(); + } + + public void releaseBackPressureForAsyncSendSizeLock() { + this.backPressureForAsyncSendSizeLock.releaseReadLock(); + } + + public void acquireBackPressureForAsyncSendNumLock() { + this.backPressureForAsyncSendNumLock.acquireReadLock(); + } + + public void releaseBackPressureForAsyncSendNumLock() { + this.backPressureForAsyncSendNumLock.releaseReadLock(); } public List getTopics() { diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index be277f69bcf5..4cf899f9708b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -551,6 +551,50 @@ public void testBatchSendMessageSync_Success() throws RemotingException, Interru producer.setAutoBatch(false); } + + @Test + public void testRunningSetBackCompress() throws RemotingException, InterruptedException, MQClientException { + final CountDownLatch countDownLatch = new CountDownLatch(5); + SendCallback sendCallback = new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + countDownLatch.countDown(); + } + + @Override + public void onException(Throwable e) { + e.printStackTrace(); + countDownLatch.countDown(); + } + }; + + // on enableBackpressureForAsyncMode + producer.setEnableBackpressureForAsyncMode(true); + producer.setBackPressureForAsyncSendNum(10); + producer.setBackPressureForAsyncSendSize(50 * 1024 * 1024); + Message message = new Message(); + message.setTopic("test"); + message.setBody("hello world".getBytes()); + MessageQueue mq = new MessageQueue("test", "BrokerA", 1); + //this message is send success + for (int i = 0; i < 5; i++) { + new Thread(new Runnable() { + @Override + public void run() { + try { + producer.send(message, mq, sendCallback); + } catch (MQClientException | RemotingException | InterruptedException e) { + throw new RuntimeException(e); + } + } + }).start(); + } + producer.setBackPressureForAsyncSendNum(15); + countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + assertThat(producer.defaultMQProducerImpl.getSemaphoreAsyncSendNumAvailablePermits() + countDownLatch.getCount()).isEqualTo(15); + producer.setEnableBackpressureForAsyncMode(false); + } + public static TopicRouteData createTopicRoute() { TopicRouteData topicRouteData = new TopicRouteData(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java index 39d7057bddd4..518868831f4c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java @@ -118,11 +118,11 @@ protected HAProxyMessage buildHAProxyMessage(Channel inboundChannel) throws Ille } } else { String remoteAddr = RemotingHelper.parseChannelRemoteAddr(inboundChannel); - sourceAddress = StringUtils.substringBefore(remoteAddr, CommonConstants.COLON); + sourceAddress = StringUtils.substringBeforeLast(remoteAddr, CommonConstants.COLON); sourcePort = Integer.parseInt(StringUtils.substringAfterLast(remoteAddr, CommonConstants.COLON)); String localAddr = RemotingHelper.parseChannelLocalAddr(inboundChannel); - destinationAddress = StringUtils.substringBefore(localAddr, CommonConstants.COLON); + destinationAddress = StringUtils.substringBeforeLast(localAddr, CommonConstants.COLON); destinationPort = Integer.parseInt(StringUtils.substringAfterLast(localAddr, CommonConstants.COLON)); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 9f3136195b35..ffa372605942 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -39,8 +39,8 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import javax.annotation.Nullable; import org.apache.rocketmq.common.AbortProcessException; @@ -393,7 +393,7 @@ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cm responseFuture.release(); } } else { - log.warn("receive response, cmd={}, but not matched any request, address={}", cmd, RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + log.warn("receive response, cmd={}, but not matched any request, address={}, channelId={}", cmd, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), ctx.channel().id()); } } @@ -560,13 +560,13 @@ public void operationFail(Throwable throwable) { return; } requestFail(opaque); - log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); + log.warn("send a request command to channel <{}>, channelId={}, failed.", RemotingHelper.parseChannelRemoteAddr(channel), channel.id()); }); return future; } catch (Exception e) { responseTable.remove(opaque); responseFuture.release(); - log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); + log.warn("send a request command to channel <{}> channelId={} Exception", RemotingHelper.parseChannelRemoteAddr(channel), channel.id(), e); future.completeExceptionally(new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e)); return future; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 41976122b2f2..ef9762ddc67b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -49,7 +49,6 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.security.cert.CertificateException; -import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -416,14 +415,14 @@ public void closeChannel(final String addr, final Channel channel) { boolean removeItemFromTable = true; final ChannelWrapper prevCW = this.channelTables.get(addrRemote); - LOGGER.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null); + LOGGER.info("closeChannel: begin close the channel[addr={}, id={}] Found: {}", addrRemote, channel.id(), prevCW != null); if (null == prevCW) { - LOGGER.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote); + LOGGER.info("closeChannel: the channel[addr={}, id={}] has been removed from the channel table before", addrRemote, channel.id()); removeItemFromTable = false; } else if (prevCW.isWrapperOf(channel)) { - LOGGER.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.", - addrRemote); + LOGGER.info("closeChannel: the channel[addr={}, id={}] has been closed before, and has been created again, nothing to do.", + addrRemote, channel.id()); removeItemFromTable = false; } @@ -432,7 +431,7 @@ public void closeChannel(final String addr, final Channel channel) { if (channelWrapper != null && channelWrapper.tryClose(channel)) { this.channelTables.remove(addrRemote); } - LOGGER.info("closeChannel: the channel[{}] was removed from channel table", addrRemote); + LOGGER.info("closeChannel: the channel[addr={}, id={}] was removed from channel table", addrRemote, channel.id()); } RemotingHelper.closeChannel(channel); @@ -471,7 +470,7 @@ public void closeChannel(final Channel channel) { } if (null == prevCW) { - LOGGER.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote); + LOGGER.info("eventCloseChannel: the channel[addr={}, id={}] has been removed from the channel table before", RemotingHelper.parseChannelRemoteAddr(channel), channel.id()); removeItemFromTable = false; } @@ -480,11 +479,11 @@ public void closeChannel(final Channel channel) { if (channelWrapper != null && channelWrapper.tryClose(channel)) { this.channelTables.remove(addrRemote); } - LOGGER.info("closeChannel: the channel[{}] was removed from channel table", addrRemote); + LOGGER.info("closeChannel: the channel[addr={}, id={}] was removed from channel table", addrRemote, channel.id()); RemotingHelper.closeChannel(channel); } } catch (Exception e) { - LOGGER.error("closeChannel: close the channel exception", e); + LOGGER.error("closeChannel: close the channel[id={}] exception", channel.id(), e); } finally { this.lockChannelTables.unlock(); } @@ -562,9 +561,9 @@ public RemotingCommand invokeSync(String addr, final RemotingCommand request, lo boolean shouldClose = left > MIN_CLOSE_TIMEOUT_MILLIS || left > timeoutMillis / 4; if (nettyClientConfig.isClientCloseSocketIfTimeout() && shouldClose) { this.closeChannel(addr, channel); - LOGGER.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, channelRemoteAddr); + LOGGER.warn("invokeSync: close socket because of timeout, {}ms, channel[addr={}, id={}]", timeoutMillis, channelRemoteAddr, channel.id()); } - LOGGER.warn("invokeSync: wait response timeout exception, the channel[{}]", channelRemoteAddr); + LOGGER.warn("invokeSync: wait response timeout exception, the channel[addr={}, id={}]", channelRemoteAddr, channel.id()); throw e; } } else { @@ -819,10 +818,11 @@ public CompletableFuture invokeImpl(final Channel channel, final RemotingCommand response = responseFuture.getResponseCommand(); if (response.getCode() == ResponseCode.GO_AWAY) { if (nettyClientConfig.isEnableReconnectForGoAway()) { + LOGGER.info("Receive go away from channelId={}, channel={}", channel.id(), channel); ChannelWrapper channelWrapper = channelWrapperTables.computeIfPresent(channel, (channel0, channelWrapper0) -> { try { - if (channelWrapper0.reconnect()) { - LOGGER.info("Receive go away from channel {}, recreate the channel", channel0); + if (channelWrapper0.reconnect(channel0)) { + LOGGER.info("Receive go away from channelId={}, channel={}, recreate the channelId={}", channel0.id(), channel0, channelWrapper0.getChannel().id()); channelWrapperTables.put(channelWrapper0.getChannel(), channelWrapper0); } } catch (Throwable t) { @@ -830,10 +830,11 @@ public CompletableFuture invokeImpl(final Channel channel, final } return channelWrapper0; }); - if (channelWrapper != null) { + if (channelWrapper != null && !channelWrapper.isWrapperOf(channel)) { if (nettyClientConfig.isEnableTransparentRetry()) { RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader()); retryRequest.setBody(request.getBody()); + retryRequest.setExtFields(request.getExtFields()); if (channelWrapper.isOK()) { long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS); stopwatch.stop(); @@ -865,6 +866,8 @@ public CompletableFuture invokeImpl(final Channel channel, final return future; } } + } else { + LOGGER.warn("invokeImpl receive GO_AWAY, channelWrapper is null or channel is the same in wrapper, channelId={}", channel.id()); } } } @@ -1002,7 +1005,6 @@ class ChannelWrapper { // only affected by sync or async request, oneway is not included. private ChannelFuture channelToClose; private long lastResponseTime; - private volatile long lastReconnectTimestamp = 0L; private final String channelAddress; public ChannelWrapper(String address, ChannelFuture channelFuture) { @@ -1021,10 +1023,7 @@ public boolean isWritable() { } public boolean isWrapperOf(Channel channel) { - if (this.channelFuture.channel() != null && this.channelFuture.channel() == channel) { - return true; - } - return false; + return this.channelFuture.channel() != null && this.channelFuture.channel() == channel; } private Channel getChannel() { @@ -1052,20 +1051,27 @@ public String getChannelAddress() { return channelAddress; } - public boolean reconnect() { + public boolean reconnect(Channel channel) { + if (!isWrapperOf(channel)) { + LOGGER.warn("channelWrapper has reconnect, so do nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id()); + return false; + } if (lock.writeLock().tryLock()) { try { - if (lastReconnectTimestamp == 0L || System.currentTimeMillis() - lastReconnectTimestamp > Duration.ofSeconds(nettyClientConfig.getMaxReconnectIntervalTimeSeconds()).toMillis()) { + if (isWrapperOf(channel)) { channelToClose = channelFuture; String[] hostAndPort = getHostAndPort(channelAddress); channelFuture = fetchBootstrap(channelAddress) .connect(hostAndPort[0], Integer.parseInt(hostAndPort[1])); - lastReconnectTimestamp = System.currentTimeMillis(); return true; + } else { + LOGGER.warn("channelWrapper has reconnect, so do nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id()); } } finally { lock.writeLock().unlock(); } + } else { + LOGGER.warn("channelWrapper reconnect try lock fail, now channelId={}", getChannel().id()); } return false; } @@ -1152,7 +1158,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, Sock @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); - LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}", remoteAddress); + LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}, channelId={}", remoteAddress, ctx.channel().id()); super.channelActive(ctx); if (NettyRemotingClient.this.channelEventListener != null) { @@ -1175,7 +1181,7 @@ public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); - LOGGER.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress); + LOGGER.info("NETTY CLIENT PIPELINE: CLOSE channel[addr={}, id={}]", remoteAddress, ctx.channel().id()); closeChannel(ctx.channel()); super.close(ctx, promise); NettyRemotingClient.this.failFast(ctx.channel()); @@ -1187,7 +1193,7 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); - LOGGER.info("NETTY CLIENT PIPELINE: channelInactive, the channel[{}]", remoteAddress); + LOGGER.info("NETTY CLIENT PIPELINE: channelInactive, the channel[addr={}, id={}]", remoteAddress, ctx.channel().id()); closeChannel(ctx.channel()); super.channelInactive(ctx); } @@ -1198,7 +1204,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.ALL_IDLE)) { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); - LOGGER.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress); + LOGGER.warn("NETTY CLIENT PIPELINE: IDLE exception channel[addr={}, id={}]", remoteAddress, ctx.channel().id()); closeChannel(ctx.channel()); if (NettyRemotingClient.this.channelEventListener != null) { NettyRemotingClient.this @@ -1213,8 +1219,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); - LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress); - LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause); + LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught channel[addr={}, id={}]", remoteAddress, ctx.channel().id(), cause); closeChannel(ctx.channel()); if (NettyRemotingClient.this.channelEventListener != null) { NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel())); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index f34c6944c992..972e71aadd85 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -61,6 +61,7 @@ import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; import org.apache.rocketmq.store.logfile.MappedFile; +import org.apache.rocketmq.store.queue.MultiDispatchUtils; import org.apache.rocketmq.store.util.LibC; import org.rocksdb.RocksDBException; @@ -1903,7 +1904,7 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); - boolean isMultiDispatchMsg = messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner); + final boolean isMultiDispatchMsg = CommitLog.isMultiDispatchMsg(messageStoreConfig, msgInner); if (isMultiDispatchMsg) { AppendMessageResult appendMessageResult = handlePropertiesForLmqMsg(preEncodeBuffer, msgInner); if (appendMessageResult != null) { @@ -2244,8 +2245,9 @@ public FlushManager getFlushManager() { return flushManager; } - public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) { - return StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); + public static boolean isMultiDispatchMsg(MessageStoreConfig messageStoreConfig, MessageExtBrokerInner msg) { + return StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && + MultiDispatchUtils.isNeedHandleMultiDispatch(messageStoreConfig, msg.getTopic()); } private boolean isCloseReadAhead() { diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java index 20e9a652b7e1..5c74918d9e6f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java @@ -175,7 +175,7 @@ public PutMessageResult encodeWithoutProperties(MessageExtBrokerInner msgInner) public PutMessageResult encode(MessageExtBrokerInner msgInner) { this.byteBuf.clear(); - if (messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner)) { + if (CommitLog.isMultiDispatchMsg(messageStoreConfig, msgInner)) { return encodeWithoutProperties(msgInner); }