Skip to content

Commit

Permalink
[RIP-70-1] Optimize the back pressure mechanism of the client (#8661)
Browse files Browse the repository at this point in the history
* Fix semaphore exception that failed halfway in the case of asynchronous message sending back pressure

* Fix semaphore exception that failed halfway in the case of asynchronous message sending back pressure

* Fix semaphore exception that failed halfway in the case of asynchronous message sending back pressure

* Fixed variable typo

* Fix semaphore exception that failed halfway in the case of asynchrono

* Fix modifying the semaphore size at run time results in inaccurate semaphore calculations

* optimize lock and introduce new lock

* optimize code typo

* Fix runtime modifications to semaphoreAsyncSendNum and semaphoreAsyncSendSize result in inaccurate actual semaphore values

* fix fail test

* adjust code typo

* increase test

* increase test

* fix test

* fix test
  • Loading branch information
3424672656 authored Sep 18, 2024
1 parent d3e5f70 commit a28c2cb
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ public void setSemaphoreAsyncSendSize(int size) {
semaphoreAsyncSendSize = new Semaphore(size, true);
}

public int getSemaphoreAsyncSendNumAvailablePermits() {
return semaphoreAsyncSendNum == null ? 0 : semaphoreAsyncSendNum.availablePermits();
}

public int getSemaphoreAsyncSendSizeAvailablePermits() {
return semaphoreAsyncSendSize == null ? 0 : semaphoreAsyncSendSize.availablePermits();
}

public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
if (producer.getExecutorService() != null) {
Expand Down Expand Up @@ -563,7 +571,7 @@ public void run() {

class BackpressureSendCallBack implements SendCallback {
public boolean isSemaphoreAsyncSizeAcquired = false;
public boolean isSemaphoreAsyncNumbAcquired = false;
public boolean isSemaphoreAsyncNumAcquired = false;
public int msgLen;
private final SendCallback sendCallback;

Expand All @@ -573,24 +581,49 @@ public BackpressureSendCallBack(final SendCallback sendCallback) {

@Override
public void onSuccess(SendResult sendResult) {
if (isSemaphoreAsyncSizeAcquired) {
semaphoreAsyncSendSize.release(msgLen);
}
if (isSemaphoreAsyncNumbAcquired) {
semaphoreAsyncSendNum.release();
}
semaphoreProcessor();
sendCallback.onSuccess(sendResult);
}

@Override
public void onException(Throwable e) {
semaphoreProcessor();
sendCallback.onException(e);
}

public void semaphoreProcessor() {
if (isSemaphoreAsyncSizeAcquired) {
defaultMQProducer.acquireBackPressureForAsyncSendSizeLock();
semaphoreAsyncSendSize.release(msgLen);
defaultMQProducer.releaseBackPressureForAsyncSendSizeLock();
}
if (isSemaphoreAsyncNumbAcquired) {
if (isSemaphoreAsyncNumAcquired) {
defaultMQProducer.acquireBackPressureForAsyncSendNumLock();
semaphoreAsyncSendNum.release();
defaultMQProducer.releaseBackPressureForAsyncSendNumLock();
}
sendCallback.onException(e);
}

public void semaphoreAsyncAdjust(int semaphoreAsyncNum, int semaphoreAsyncSize) throws InterruptedException {
defaultMQProducer.acquireBackPressureForAsyncSendNumLock();
if (semaphoreAsyncNum > 0) {
semaphoreAsyncSendNum.release(semaphoreAsyncNum);
} else {
semaphoreAsyncSendNum.acquire(- semaphoreAsyncNum);
}
defaultMQProducer.setBackPressureForAsyncSendNumInsideAdjust(defaultMQProducer.getBackPressureForAsyncSendNum()
+ semaphoreAsyncNum);
defaultMQProducer.releaseBackPressureForAsyncSendNumLock();

defaultMQProducer.acquireBackPressureForAsyncSendSizeLock();
if (semaphoreAsyncSize > 0) {
semaphoreAsyncSendSize.release(semaphoreAsyncSize);
} else {
semaphoreAsyncSendSize.acquire(- semaphoreAsyncSize);
}
defaultMQProducer.setBackPressureForAsyncSendSizeInsideAdjust(defaultMQProducer.getBackPressureForAsyncSendSize()
+ semaphoreAsyncSize);
defaultMQProducer.releaseBackPressureForAsyncSendSizeLock();
}
}

Expand All @@ -599,32 +632,40 @@ public void executeAsyncMessageSend(Runnable runnable, final Message msg, final
throws MQClientException, InterruptedException {
ExecutorService executor = this.getAsyncSenderExecutor();
boolean isEnableBackpressureForAsyncMode = this.getDefaultMQProducer().isEnableBackpressureForAsyncMode();
boolean isSemaphoreAsyncNumbAcquired = false;
boolean isSemaphoreAsyncNumAcquired = false;
boolean isSemaphoreAsyncSizeAcquired = false;
int msgLen = msg.getBody() == null ? 1 : msg.getBody().length;
sendCallback.msgLen = msgLen;

try {
if (isEnableBackpressureForAsyncMode) {
defaultMQProducer.acquireBackPressureForAsyncSendNumLock();
long costTime = System.currentTimeMillis() - beginStartTime;
isSemaphoreAsyncNumbAcquired = timeout - costTime > 0

isSemaphoreAsyncNumAcquired = timeout - costTime > 0
&& semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS);
if (!isSemaphoreAsyncNumbAcquired) {
sendCallback.isSemaphoreAsyncNumAcquired = isSemaphoreAsyncNumAcquired;
defaultMQProducer.releaseBackPressureForAsyncSendNumLock();
if (!isSemaphoreAsyncNumAcquired) {
sendCallback.onException(
new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout"));
return;
}

defaultMQProducer.acquireBackPressureForAsyncSendSizeLock();
costTime = System.currentTimeMillis() - beginStartTime;

isSemaphoreAsyncSizeAcquired = timeout - costTime > 0
&& semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS);
sendCallback.isSemaphoreAsyncSizeAcquired = isSemaphoreAsyncSizeAcquired;
defaultMQProducer.releaseBackPressureForAsyncSendSizeLock();
if (!isSemaphoreAsyncSizeAcquired) {
sendCallback.onException(
new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout"));
return;
}
}
sendCallback.isSemaphoreAsyncSizeAcquired = isSemaphoreAsyncSizeAcquired;
sendCallback.isSemaphoreAsyncNumbAcquired = isSemaphoreAsyncNumbAcquired;
sendCallback.msgLen = msgLen;

executor.submit(runnable);
} catch (RejectedExecutionException e) {
if (isEnableBackpressureForAsyncMode) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.lock;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class ReadWriteCASLock {
//true : can lock ; false : not lock
private final AtomicBoolean writeLock = new AtomicBoolean(true);

private final AtomicInteger readLock = new AtomicInteger(0);

public void acquireWriteLock() {
boolean isLock = false;
do {
isLock = writeLock.compareAndSet(true, false);
} while (!isLock);

do {
isLock = readLock.get() == 0;
} while (!isLock);
}

public void releaseWriteLock() {
this.writeLock.compareAndSet(false, true);
}

public void acquireReadLock() {
boolean isLock = false;
do {
isLock = writeLock.get();
} while (!isLock);
readLock.getAndIncrement();
}

public void releaseReadLock() {
this.readLock.getAndDecrement();
}

public boolean getWriteLock() {
return this.writeLock.get() && this.readLock.get() == 0;
}

public boolean getReadLock() {
return this.writeLock.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.lock.ReadWriteCASLock;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.EndTransactionTraceHookImpl;
Expand Down Expand Up @@ -175,6 +176,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {

private RPCHook rpcHook = null;

/**
* backPressureForAsyncSendNum is guaranteed to be modified at runtime and no new requests are allowed
*/
private final ReadWriteCASLock backPressureForAsyncSendNumLock = new ReadWriteCASLock();

/**
* backPressureForAsyncSendSize is guaranteed to be modified at runtime and no new requests are allowed
*/
private final ReadWriteCASLock backPressureForAsyncSendSizeLock = new ReadWriteCASLock();

/**
* Compress level of compress algorithm.
*/
Expand Down Expand Up @@ -1334,18 +1345,64 @@ public int getBackPressureForAsyncSendNum() {
return backPressureForAsyncSendNum;
}

/**
* For user modify backPressureForAsyncSendNum at runtime
*/
public void setBackPressureForAsyncSendNum(int backPressureForAsyncSendNum) {
this.backPressureForAsyncSendNumLock.acquireWriteLock();
backPressureForAsyncSendNum = Math.max(backPressureForAsyncSendNum, 10);
int acquiredBackPressureForAsyncSendNum = this.backPressureForAsyncSendNum
- defaultMQProducerImpl.getSemaphoreAsyncSendNumAvailablePermits();
this.backPressureForAsyncSendNum = backPressureForAsyncSendNum;
defaultMQProducerImpl.setSemaphoreAsyncSendNum(backPressureForAsyncSendNum);
defaultMQProducerImpl.setSemaphoreAsyncSendNum(backPressureForAsyncSendNum - acquiredBackPressureForAsyncSendNum);
this.backPressureForAsyncSendNumLock.releaseWriteLock();
}

public int getBackPressureForAsyncSendSize() {
return backPressureForAsyncSendSize;
}

/**
* For user modify backPressureForAsyncSendSize at runtime
*/
public void setBackPressureForAsyncSendSize(int backPressureForAsyncSendSize) {
this.backPressureForAsyncSendSizeLock.acquireWriteLock();
backPressureForAsyncSendSize = Math.max(backPressureForAsyncSendSize, 1024 * 1024);
int acquiredBackPressureForAsyncSendSize = this.backPressureForAsyncSendSize
- defaultMQProducerImpl.getSemaphoreAsyncSendSizeAvailablePermits();
this.backPressureForAsyncSendSize = backPressureForAsyncSendSize;
defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize - acquiredBackPressureForAsyncSendSize);
this.backPressureForAsyncSendSizeLock.releaseWriteLock();
}

/**
* Used for system internal adjust backPressureForAsyncSendSize
*/
public void setBackPressureForAsyncSendSizeInsideAdjust(int backPressureForAsyncSendSize) {
this.backPressureForAsyncSendSize = backPressureForAsyncSendSize;
defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize);
}

/**
* Used for system internal adjust backPressureForAsyncSendNum
*/
public void setBackPressureForAsyncSendNumInsideAdjust(int backPressureForAsyncSendNum) {
this.backPressureForAsyncSendNum = backPressureForAsyncSendNum;
}

public void acquireBackPressureForAsyncSendSizeLock() {
this.backPressureForAsyncSendSizeLock.acquireReadLock();
}

public void releaseBackPressureForAsyncSendSizeLock() {
this.backPressureForAsyncSendSizeLock.releaseReadLock();
}

public void acquireBackPressureForAsyncSendNumLock() {
this.backPressureForAsyncSendNumLock.acquireReadLock();
}

public void releaseBackPressureForAsyncSendNumLock() {
this.backPressureForAsyncSendNumLock.releaseReadLock();
}

public List<String> getTopics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,50 @@ public void testBatchSendMessageSync_Success() throws RemotingException, Interru
producer.setAutoBatch(false);
}


@Test
public void testRunningSetBackCompress() throws RemotingException, InterruptedException, MQClientException {
final CountDownLatch countDownLatch = new CountDownLatch(5);
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
}

@Override
public void onException(Throwable e) {
e.printStackTrace();
countDownLatch.countDown();
}
};

// on enableBackpressureForAsyncMode
producer.setEnableBackpressureForAsyncMode(true);
producer.setBackPressureForAsyncSendNum(10);
producer.setBackPressureForAsyncSendSize(50 * 1024 * 1024);
Message message = new Message();
message.setTopic("test");
message.setBody("hello world".getBytes());
MessageQueue mq = new MessageQueue("test", "BrokerA", 1);
//this message is send success
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
producer.send(message, mq, sendCallback);
} catch (MQClientException | RemotingException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
}
producer.setBackPressureForAsyncSendNum(15);
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
assertThat(producer.defaultMQProducerImpl.getSemaphoreAsyncSendNumAvailablePermits() + countDownLatch.getCount()).isEqualTo(15);
producer.setEnableBackpressureForAsyncMode(false);
}

public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();

Expand Down

0 comments on commit a28c2cb

Please sign in to comment.