Skip to content

Commit

Permalink
Merge branch 'develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
LetLetMe authored Sep 19, 2024
2 parents db62acd + 280804c commit eaf8c85
Show file tree
Hide file tree
Showing 16 changed files with 295 additions and 78 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pr-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Build distribution tar
run: |
mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
name: Upload distribution tar
with:
name: rocketmq
Expand All @@ -30,7 +30,7 @@ jobs:
run: |
mkdir -p ./pr
echo ${{ github.event.number }} > ./pr/NR
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
with:
name: pr
path: pr/
8 changes: 4 additions & 4 deletions .github/workflows/pr-e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
mkdir versionlist
touch versionlist/"${version}-`echo ${{ matrix.base-image }} | sed -e "s/:/-/g"`"
sh ./build-image-local.sh ${version} ${{ matrix.base-image }} ${{ matrix.java-version }} ${DOCKER_REPO}
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
name: Upload distribution tar
with:
name: versionlist
Expand Down Expand Up @@ -158,7 +158,7 @@ jobs:
annotate_only: true
include_passed: true
detailed_summary: true
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
if: always()
name: Upload test log
with:
Expand Down Expand Up @@ -199,7 +199,7 @@ jobs:
annotate_only: true
include_passed: true
detailed_summary: true
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
if: always()
name: Upload test log
with:
Expand Down Expand Up @@ -235,7 +235,7 @@ jobs:
annotate_only: true
include_passed: true
detailed_summary: true
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
if: always()
name: Upload test log
with:
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/push-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- name: Build distribution tar
run: |
mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
name: Upload distribution tar
with:
name: rocketmq
Expand Down Expand Up @@ -72,7 +72,7 @@ jobs:
mkdir versionlist
touch versionlist/"${version}-`echo ${{ matrix.base-image }} | sed -e "s/:/-/g"`"
sh ./build-image-local.sh ${version} ${{ matrix.base-image }} ${{ matrix.java-version }} ${DOCKER_REPO}
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
name: Upload distribution tar
with:
name: versionlist
Expand Down Expand Up @@ -163,7 +163,7 @@ jobs:
annotate_only: true
include_passed: true
detailed_summary: true
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
if: always()
name: Upload test log
with:
Expand Down Expand Up @@ -204,7 +204,7 @@ jobs:
annotate_only: true
include_passed: true
detailed_summary: true
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
if: always()
name: Upload test log
with:
Expand Down Expand Up @@ -240,7 +240,7 @@ jobs:
annotate_only: true
include_passed: true
detailed_summary: true
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
if: always()
name: Upload test log
with:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/snapshot-automation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
MAVEN_SETTINGS: ${{ github.workspace }}/.github/asf-deploy-settings.xml
run: |
mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
name: Upload distribution tar
with:
name: rocketmq
Expand Down Expand Up @@ -110,7 +110,7 @@ jobs:
mkdir versionlist
touch versionlist/"${version}-`echo ${{ matrix.base-image }} | sed -e "s/:/-/g"`"
sh ./build-image-local.sh ${version} ${{ matrix.base-image }} ${{ matrix.java-version }} ${DOCKER_REPO}
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
name: Upload distribution tar
with:
name: versionlist
Expand Down Expand Up @@ -200,7 +200,7 @@ jobs:
annotate_only: true
include_passed: true
detailed_summary: true
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
if: always()
name: Upload test log
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public List<DefaultAuthorizationContext> build(ChannelHandlerContext context, Re
subject = User.of(fields.get(SessionCredentials.ACCESS_KEY));
}
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(context.channel());
String sourceIp = StringUtils.substringBefore(remoteAddr, CommonConstants.COLON);
String sourceIp = StringUtils.substringBeforeLast(remoteAddr, CommonConstants.COLON);

Resource topic;
Resource group;
Expand Down Expand Up @@ -394,7 +394,7 @@ private List<DefaultAuthorizationContext> newContext(Metadata metadata, QueryRou
subject = User.of(metadata.get(GrpcConstants.AUTHORIZATION_AK));
}
Resource resource = Resource.ofTopic(topic.getName());
String sourceIp = StringUtils.substringBefore(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON);
String sourceIp = StringUtils.substringBeforeLast(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON);
DefaultAuthorizationContext context = DefaultAuthorizationContext.of(subject, resource, Arrays.asList(Action.PUB, Action.SUB), sourceIp);
return Collections.singletonList(context);
}
Expand Down Expand Up @@ -437,7 +437,7 @@ private static List<DefaultAuthorizationContext> newPubContext(Metadata metadata
subject = User.of(metadata.get(GrpcConstants.AUTHORIZATION_AK));
}
Resource resource = Resource.ofTopic(topic.getName());
String sourceIp = StringUtils.substringBefore(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON);
String sourceIp = StringUtils.substringBeforeLast(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON);
DefaultAuthorizationContext context = DefaultAuthorizationContext.of(subject, resource, Action.PUB, sourceIp);
return Collections.singletonList(context);
}
Expand Down Expand Up @@ -483,7 +483,7 @@ private static List<DefaultAuthorizationContext> newSubContexts(Metadata metadat
if (metadata.containsKey(GrpcConstants.AUTHORIZATION_AK)) {
subject = User.of(metadata.get(GrpcConstants.AUTHORIZATION_AK));
}
String sourceIp = StringUtils.substringBefore(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON);
String sourceIp = StringUtils.substringBeforeLast(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON);
result.add(DefaultAuthorizationContext.of(subject, resource, Action.SUB, sourceIp));
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public static AuthorizationEvaluator getEvaluator(AuthConfig config, Supplier<?>
public static AuthorizationStrategy getStrategy(AuthConfig config, Supplier<?> metadataService) {
try {
Class<? extends AuthorizationStrategy> clazz = StatelessAuthorizationStrategy.class;
if (StringUtils.isNotBlank(config.getAuthenticationStrategy())) {
if (StringUtils.isNotBlank(config.getAuthorizationStrategy())) {
clazz = (Class<? extends AuthorizationStrategy>) Class.forName(config.getAuthorizationStrategy());
}
return clazz.getDeclaredConstructor(AuthConfig.class, Supplier.class).newInstance(config, metadataService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,6 @@ private enum SubscriptionType {
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
this.defaultLitePullConsumer.getPullThreadNums(),
new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
);
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MonitorMessageQueueChangeThread"));
this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}
Expand Down Expand Up @@ -293,6 +289,8 @@ public synchronized void start() throws MQClientException {
this.defaultLitePullConsumer.changeInstanceNameToPID();
}

initScheduledThreadPoolExecutor();

initMQClientFactory();

initRebalanceImpl();
Expand Down Expand Up @@ -324,6 +322,13 @@ public synchronized void start() throws MQClientException {
}
}

private void initScheduledThreadPoolExecutor() {
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
this.defaultLitePullConsumer.getPullThreadNums(),
new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
);
}

private void initMQClientFactory() throws MQClientException {
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ public void setSemaphoreAsyncSendSize(int size) {
semaphoreAsyncSendSize = new Semaphore(size, true);
}

public int getSemaphoreAsyncSendNumAvailablePermits() {
return semaphoreAsyncSendNum == null ? 0 : semaphoreAsyncSendNum.availablePermits();
}

public int getSemaphoreAsyncSendSizeAvailablePermits() {
return semaphoreAsyncSendSize == null ? 0 : semaphoreAsyncSendSize.availablePermits();
}

public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
if (producer.getExecutorService() != null) {
Expand Down Expand Up @@ -563,7 +571,7 @@ public void run() {

class BackpressureSendCallBack implements SendCallback {
public boolean isSemaphoreAsyncSizeAcquired = false;
public boolean isSemaphoreAsyncNumbAcquired = false;
public boolean isSemaphoreAsyncNumAcquired = false;
public int msgLen;
private final SendCallback sendCallback;

Expand All @@ -573,24 +581,49 @@ public BackpressureSendCallBack(final SendCallback sendCallback) {

@Override
public void onSuccess(SendResult sendResult) {
if (isSemaphoreAsyncSizeAcquired) {
semaphoreAsyncSendSize.release(msgLen);
}
if (isSemaphoreAsyncNumbAcquired) {
semaphoreAsyncSendNum.release();
}
semaphoreProcessor();
sendCallback.onSuccess(sendResult);
}

@Override
public void onException(Throwable e) {
semaphoreProcessor();
sendCallback.onException(e);
}

public void semaphoreProcessor() {
if (isSemaphoreAsyncSizeAcquired) {
defaultMQProducer.acquireBackPressureForAsyncSendSizeLock();
semaphoreAsyncSendSize.release(msgLen);
defaultMQProducer.releaseBackPressureForAsyncSendSizeLock();
}
if (isSemaphoreAsyncNumbAcquired) {
if (isSemaphoreAsyncNumAcquired) {
defaultMQProducer.acquireBackPressureForAsyncSendNumLock();
semaphoreAsyncSendNum.release();
defaultMQProducer.releaseBackPressureForAsyncSendNumLock();
}
sendCallback.onException(e);
}

public void semaphoreAsyncAdjust(int semaphoreAsyncNum, int semaphoreAsyncSize) throws InterruptedException {
defaultMQProducer.acquireBackPressureForAsyncSendNumLock();
if (semaphoreAsyncNum > 0) {
semaphoreAsyncSendNum.release(semaphoreAsyncNum);
} else {
semaphoreAsyncSendNum.acquire(- semaphoreAsyncNum);
}
defaultMQProducer.setBackPressureForAsyncSendNumInsideAdjust(defaultMQProducer.getBackPressureForAsyncSendNum()
+ semaphoreAsyncNum);
defaultMQProducer.releaseBackPressureForAsyncSendNumLock();

defaultMQProducer.acquireBackPressureForAsyncSendSizeLock();
if (semaphoreAsyncSize > 0) {
semaphoreAsyncSendSize.release(semaphoreAsyncSize);
} else {
semaphoreAsyncSendSize.acquire(- semaphoreAsyncSize);
}
defaultMQProducer.setBackPressureForAsyncSendSizeInsideAdjust(defaultMQProducer.getBackPressureForAsyncSendSize()
+ semaphoreAsyncSize);
defaultMQProducer.releaseBackPressureForAsyncSendSizeLock();
}
}

Expand All @@ -599,32 +632,40 @@ public void executeAsyncMessageSend(Runnable runnable, final Message msg, final
throws MQClientException, InterruptedException {
ExecutorService executor = this.getAsyncSenderExecutor();
boolean isEnableBackpressureForAsyncMode = this.getDefaultMQProducer().isEnableBackpressureForAsyncMode();
boolean isSemaphoreAsyncNumbAcquired = false;
boolean isSemaphoreAsyncNumAcquired = false;
boolean isSemaphoreAsyncSizeAcquired = false;
int msgLen = msg.getBody() == null ? 1 : msg.getBody().length;
sendCallback.msgLen = msgLen;

try {
if (isEnableBackpressureForAsyncMode) {
defaultMQProducer.acquireBackPressureForAsyncSendNumLock();
long costTime = System.currentTimeMillis() - beginStartTime;
isSemaphoreAsyncNumbAcquired = timeout - costTime > 0

isSemaphoreAsyncNumAcquired = timeout - costTime > 0
&& semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS);
if (!isSemaphoreAsyncNumbAcquired) {
sendCallback.isSemaphoreAsyncNumAcquired = isSemaphoreAsyncNumAcquired;
defaultMQProducer.releaseBackPressureForAsyncSendNumLock();
if (!isSemaphoreAsyncNumAcquired) {
sendCallback.onException(
new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout"));
return;
}

defaultMQProducer.acquireBackPressureForAsyncSendSizeLock();
costTime = System.currentTimeMillis() - beginStartTime;

isSemaphoreAsyncSizeAcquired = timeout - costTime > 0
&& semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS);
sendCallback.isSemaphoreAsyncSizeAcquired = isSemaphoreAsyncSizeAcquired;
defaultMQProducer.releaseBackPressureForAsyncSendSizeLock();
if (!isSemaphoreAsyncSizeAcquired) {
sendCallback.onException(
new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout"));
return;
}
}
sendCallback.isSemaphoreAsyncSizeAcquired = isSemaphoreAsyncSizeAcquired;
sendCallback.isSemaphoreAsyncNumbAcquired = isSemaphoreAsyncNumbAcquired;
sendCallback.msgLen = msgLen;

executor.submit(runnable);
} catch (RejectedExecutionException e) {
if (isEnableBackpressureForAsyncMode) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.lock;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class ReadWriteCASLock {
//true : can lock ; false : not lock
private final AtomicBoolean writeLock = new AtomicBoolean(true);

private final AtomicInteger readLock = new AtomicInteger(0);

public void acquireWriteLock() {
boolean isLock = false;
do {
isLock = writeLock.compareAndSet(true, false);
} while (!isLock);

do {
isLock = readLock.get() == 0;
} while (!isLock);
}

public void releaseWriteLock() {
this.writeLock.compareAndSet(false, true);
}

public void acquireReadLock() {
boolean isLock = false;
do {
isLock = writeLock.get();
} while (!isLock);
readLock.getAndIncrement();
}

public void releaseReadLock() {
this.readLock.getAndDecrement();
}

public boolean getWriteLock() {
return this.writeLock.get() && this.readLock.get() == 0;
}

public boolean getReadLock() {
return this.writeLock.get();
}

}
Loading

0 comments on commit eaf8c85

Please sign in to comment.