Skip to content

Commit

Permalink
[ISSUE apache#8806] fix autoBatch bug when connecting multiple Rocket…
Browse files Browse the repository at this point in the history
…MQ clusters. (apache#8807)
  • Loading branch information
luozongle01 authored Oct 11, 2024
1 parent e75554d commit 11f0002
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ public void start(final boolean startFactory) throws MQClientException {

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

defaultMQProducer.initProduceAccumulator();

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
private int backPressureForAsyncSendSize = 100 * 1024 * 1024;

/**
* Maximum hold time of accumulator.
*/
private int batchMaxDelayMs = -1;

/**
* Maximum accumulation message body size for a single messageAccumulation.
*/
private long batchMaxBytes = -1;

/**
* Maximum message body size for produceAccumulator.
*/
private long totalBatchMaxBytes = -1;

private RPCHook rpcHook = null;

/**
Expand Down Expand Up @@ -293,7 +308,6 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, final List
this.enableTrace = enableMsgTrace;
this.traceTopic = customizedTraceTopic;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}

/**
Expand All @@ -320,7 +334,6 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC
this.producerGroup = producerGroup;
this.rpcHook = rpcHook;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}

/**
Expand Down Expand Up @@ -1168,10 +1181,10 @@ public int getBatchMaxDelayMs() {
}

public void batchMaxDelayMs(int holdMs) {
if (this.produceAccumulator == null) {
throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
this.batchMaxDelayMs = holdMs;
if (this.produceAccumulator != null) {
this.produceAccumulator.batchMaxDelayMs(holdMs);
}
this.produceAccumulator.batchMaxDelayMs(holdMs);
}

public long getBatchMaxBytes() {
Expand All @@ -1182,10 +1195,10 @@ public long getBatchMaxBytes() {
}

public void batchMaxBytes(long holdSize) {
if (this.produceAccumulator == null) {
throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
this.batchMaxBytes = holdSize;
if (this.produceAccumulator != null) {
this.produceAccumulator.batchMaxBytes(holdSize);
}
this.produceAccumulator.batchMaxBytes(holdSize);
}

public long getTotalBatchMaxBytes() {
Expand All @@ -1196,10 +1209,10 @@ public long getTotalBatchMaxBytes() {
}

public void totalBatchMaxBytes(long totalHoldSize) {
if (this.produceAccumulator == null) {
throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
this.totalBatchMaxBytes = totalHoldSize;
if (this.produceAccumulator != null) {
this.produceAccumulator.totalBatchMaxBytes(totalHoldSize);
}
this.produceAccumulator.totalBatchMaxBytes(totalHoldSize);
}

public boolean getAutoBatch() {
Expand All @@ -1210,9 +1223,6 @@ public boolean getAutoBatch() {
}

public void setAutoBatch(boolean autoBatch) {
if (this.produceAccumulator == null) {
throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
}
this.autoBatch = autoBatch;
}

Expand Down Expand Up @@ -1439,4 +1449,21 @@ public void setCompressType(CompressionType compressType) {
public Compressor getCompressor() {
return compressor;
}

public void initProduceAccumulator() {
this.produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);

if (this.batchMaxDelayMs > -1) {
this.produceAccumulator.batchMaxDelayMs(this.batchMaxDelayMs);
}

if (this.batchMaxBytes > -1) {
this.produceAccumulator.batchMaxBytes(this.batchMaxBytes);
}

if (this.totalBatchMaxBytes > -1) {
this.produceAccumulator.totalBatchMaxBytes(this.totalBatchMaxBytes);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -659,29 +660,29 @@ public void assertCreateDefaultMQProducer() {
assertNotNull(producer1);
assertEquals(producerGroupTemp, producer1.getProducerGroup());
assertNotNull(producer1.getDefaultMQProducerImpl());
assertTrue(producer1.getTotalBatchMaxBytes() > 0);
assertTrue(producer1.getBatchMaxBytes() > 0);
assertTrue(producer1.getBatchMaxDelayMs() > 0);
assertEquals(0, producer1.getTotalBatchMaxBytes());
assertEquals(0, producer1.getBatchMaxBytes());
assertEquals(0, producer1.getBatchMaxDelayMs());
assertNull(producer1.getTopics());
assertFalse(producer1.isEnableTrace());
assertTrue(UtilAll.isBlank(producer1.getTraceTopic()));
DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp, mock(RPCHook.class));
assertNotNull(producer2);
assertEquals(producerGroupTemp, producer2.getProducerGroup());
assertNotNull(producer2.getDefaultMQProducerImpl());
assertTrue(producer2.getTotalBatchMaxBytes() > 0);
assertTrue(producer2.getBatchMaxBytes() > 0);
assertTrue(producer2.getBatchMaxDelayMs() > 0);
assertEquals(0, producer2.getTotalBatchMaxBytes());
assertEquals(0, producer2.getBatchMaxBytes());
assertEquals(0, producer2.getBatchMaxDelayMs());
assertNull(producer2.getTopics());
assertFalse(producer2.isEnableTrace());
assertTrue(UtilAll.isBlank(producer2.getTraceTopic()));
DefaultMQProducer producer3 = new DefaultMQProducer(producerGroupTemp, mock(RPCHook.class), Collections.singletonList("custom_topic"));
assertNotNull(producer3);
assertEquals(producerGroupTemp, producer3.getProducerGroup());
assertNotNull(producer3.getDefaultMQProducerImpl());
assertTrue(producer3.getTotalBatchMaxBytes() > 0);
assertTrue(producer3.getBatchMaxBytes() > 0);
assertTrue(producer3.getBatchMaxDelayMs() > 0);
assertEquals(0, producer3.getTotalBatchMaxBytes());
assertEquals(0, producer3.getBatchMaxBytes());
assertEquals(0, producer3.getBatchMaxDelayMs());
assertNotNull(producer3.getTopics());
assertEquals(1, producer3.getTopics().size());
assertFalse(producer3.isEnableTrace());
Expand All @@ -690,19 +691,19 @@ public void assertCreateDefaultMQProducer() {
assertNotNull(producer4);
assertEquals(producerGroupTemp, producer4.getProducerGroup());
assertNotNull(producer4.getDefaultMQProducerImpl());
assertTrue(producer4.getTotalBatchMaxBytes() > 0);
assertTrue(producer4.getBatchMaxBytes() > 0);
assertTrue(producer4.getBatchMaxDelayMs() > 0);
assertEquals(0, producer4.getTotalBatchMaxBytes());
assertEquals(0, producer4.getBatchMaxBytes());
assertEquals(0, producer4.getBatchMaxDelayMs());
assertNull(producer4.getTopics());
assertTrue(producer4.isEnableTrace());
assertEquals("custom_trace_topic", producer4.getTraceTopic());
DefaultMQProducer producer5 = new DefaultMQProducer(producerGroupTemp, mock(RPCHook.class), Collections.singletonList("custom_topic"), true, "custom_trace_topic");
assertNotNull(producer5);
assertEquals(producerGroupTemp, producer5.getProducerGroup());
assertNotNull(producer5.getDefaultMQProducerImpl());
assertTrue(producer5.getTotalBatchMaxBytes() > 0);
assertTrue(producer5.getBatchMaxBytes() > 0);
assertTrue(producer5.getBatchMaxDelayMs() > 0);
assertEquals(0, producer5.getTotalBatchMaxBytes());
assertEquals(0, producer5.getBatchMaxBytes());
assertEquals(0, producer5.getBatchMaxDelayMs());
assertNotNull(producer5.getTopics());
assertEquals(1, producer5.getTopics().size());
assertTrue(producer5.isEnableTrace());
Expand Down Expand Up @@ -810,6 +811,136 @@ public void assertTotalBatchMaxBytes() throws NoSuchFieldException, IllegalAcces
assertEquals(0L, producer.getTotalBatchMaxBytes());
}

@Test
public void assertProduceAccumulatorStart() throws NoSuchFieldException, IllegalAccessException, MQClientException {
String producerGroupTemp = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp);
assertEquals(0, producer.getTotalBatchMaxBytes());
assertEquals(0, producer.getBatchMaxBytes());
assertEquals(0, producer.getBatchMaxDelayMs());
assertNull(getField(producer, "produceAccumulator", ProduceAccumulator.class));
producer.start();
assertTrue(producer.getTotalBatchMaxBytes() > 0);
assertTrue(producer.getBatchMaxBytes() > 0);
assertTrue(producer.getBatchMaxDelayMs() > 0);
assertNotNull(getField(producer, "produceAccumulator", ProduceAccumulator.class));
}

@Test
public void assertProduceAccumulatorBeforeStartSet() throws NoSuchFieldException, IllegalAccessException, MQClientException {
String producerGroupTemp = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp);
producer.totalBatchMaxBytes(64 * 1024 * 100);
producer.batchMaxBytes(64 * 1024);
producer.batchMaxDelayMs(10);

producer.start();
assertEquals(64 * 1024, producer.getBatchMaxBytes());
assertEquals(10, producer.getBatchMaxDelayMs());
assertNotNull(getField(producer, "produceAccumulator", ProduceAccumulator.class));
}

@Test
public void assertProduceAccumulatorAfterStartSet() throws NoSuchFieldException, IllegalAccessException, MQClientException {
String producerGroupTemp = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp);
producer.start();

assertNotNull(getField(producer, "produceAccumulator", ProduceAccumulator.class));

producer.totalBatchMaxBytes(64 * 1024 * 100);
producer.batchMaxBytes(64 * 1024);
producer.batchMaxDelayMs(10);

assertEquals(64 * 1024, producer.getBatchMaxBytes());
assertEquals(10, producer.getBatchMaxDelayMs());
}

@Test
public void assertProduceAccumulatorUnit() throws NoSuchFieldException, IllegalAccessException, MQClientException {
String producerGroupTemp = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp);
producer1.setUnitName("unit1");
DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp);
producer2.setUnitName("unit2");

producer1.start();
producer2.start();

ProduceAccumulator producer1Accumulator = getField(producer1, "produceAccumulator", ProduceAccumulator.class);
ProduceAccumulator producer2Accumulator = getField(producer2, "produceAccumulator", ProduceAccumulator.class);

assertNotNull(producer1Accumulator);
assertNotNull(producer2Accumulator);

assertNotEquals(producer1Accumulator, producer2Accumulator);
}

@Test
public void assertProduceAccumulator() throws NoSuchFieldException, IllegalAccessException, MQClientException {
String producerGroupTemp1 = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp1);
producer1.setInstanceName("instanceName1");
String producerGroupTemp2 = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp2);
producer2.setInstanceName("instanceName2");

producer1.start();
producer2.start();

ProduceAccumulator producer1Accumulator = getField(producer1, "produceAccumulator", ProduceAccumulator.class);
ProduceAccumulator producer2Accumulator = getField(producer2, "produceAccumulator", ProduceAccumulator.class);

assertNotNull(producer1Accumulator);
assertNotNull(producer2Accumulator);

assertNotEquals(producer1Accumulator, producer2Accumulator);
}

@Test
public void assertProduceAccumulatorInstanceEqual() throws NoSuchFieldException, IllegalAccessException, MQClientException {
String producerGroupTemp1 = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp1);
producer1.setInstanceName("equalInstance");
String producerGroupTemp2 = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp2);
producer2.setInstanceName("equalInstance");

producer1.start();
producer2.start();

ProduceAccumulator producer1Accumulator = getField(producer1, "produceAccumulator", ProduceAccumulator.class);
ProduceAccumulator producer2Accumulator = getField(producer2, "produceAccumulator", ProduceAccumulator.class);

assertNotNull(producer1Accumulator);
assertNotNull(producer2Accumulator);

assertEquals(producer1Accumulator, producer2Accumulator);
}

@Test
public void assertProduceAccumulatorInstanceAndUnitNameEqual() throws NoSuchFieldException, IllegalAccessException, MQClientException {
String producerGroupTemp1 = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp1);
producer1.setInstanceName("equalInstance");
producer1.setUnitName("equalUnitName");
String producerGroupTemp2 = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp2);
producer2.setInstanceName("equalInstance");
producer2.setUnitName("equalUnitName");

producer1.start();
producer2.start();

ProduceAccumulator producer1Accumulator = getField(producer1, "produceAccumulator", ProduceAccumulator.class);
ProduceAccumulator producer2Accumulator = getField(producer2, "produceAccumulator", ProduceAccumulator.class);

assertNotNull(producer1Accumulator);
assertNotNull(producer2Accumulator);

assertEquals(producer1Accumulator, producer2Accumulator);
}

@Test
public void assertGetRetryResponseCodes() {
assertNotNull(producer.getRetryResponseCodes());
Expand Down Expand Up @@ -875,4 +1006,11 @@ private void setField(final Object target, final String fieldName, final Object
field.setAccessible(true);
field.set(target, newValue);
}

private <T> T getField(final Object target, final String fieldName, final Class<T> fieldClassType) throws NoSuchFieldException, IllegalAccessException {
Class<?> targetClazz = target.getClass();
Field field = targetClazz.getDeclaredField(fieldName);
field.setAccessible(true);
return fieldClassType.cast(field.get(target));
}
}

0 comments on commit 11f0002

Please sign in to comment.