diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 74a2516174a..3d4fdbec373 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -250,6 +250,8 @@ public void start(final boolean startFactory) throws MQClientException { this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); + defaultMQProducer.initProduceAccumulator(); + boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index f0842de8ba7..a8bf7cee85f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -174,6 +174,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ private int backPressureForAsyncSendSize = 100 * 1024 * 1024; + /** + * Maximum hold time of accumulator. + */ + private int batchMaxDelayMs = -1; + + /** + * Maximum accumulation message body size for a single messageAccumulation. + */ + private long batchMaxBytes = -1; + + /** + * Maximum message body size for produceAccumulator. + */ + private long totalBatchMaxBytes = -1; + private RPCHook rpcHook = null; /** @@ -293,7 +308,6 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, final List this.enableTrace = enableMsgTrace; this.traceTopic = customizedTraceTopic; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); - produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); } /** @@ -320,7 +334,6 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC this.producerGroup = producerGroup; this.rpcHook = rpcHook; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); - produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); } /** @@ -1168,10 +1181,10 @@ public int getBatchMaxDelayMs() { } public void batchMaxDelayMs(int holdMs) { - if (this.produceAccumulator == null) { - throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch"); + this.batchMaxDelayMs = holdMs; + if (this.produceAccumulator != null) { + this.produceAccumulator.batchMaxDelayMs(holdMs); } - this.produceAccumulator.batchMaxDelayMs(holdMs); } public long getBatchMaxBytes() { @@ -1182,10 +1195,10 @@ public long getBatchMaxBytes() { } public void batchMaxBytes(long holdSize) { - if (this.produceAccumulator == null) { - throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch"); + this.batchMaxBytes = holdSize; + if (this.produceAccumulator != null) { + this.produceAccumulator.batchMaxBytes(holdSize); } - this.produceAccumulator.batchMaxBytes(holdSize); } public long getTotalBatchMaxBytes() { @@ -1196,10 +1209,10 @@ public long getTotalBatchMaxBytes() { } public void totalBatchMaxBytes(long totalHoldSize) { - if (this.produceAccumulator == null) { - throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch"); + this.totalBatchMaxBytes = totalHoldSize; + if (this.produceAccumulator != null) { + this.produceAccumulator.totalBatchMaxBytes(totalHoldSize); } - this.produceAccumulator.totalBatchMaxBytes(totalHoldSize); } public boolean getAutoBatch() { @@ -1210,9 +1223,6 @@ public boolean getAutoBatch() { } public void setAutoBatch(boolean autoBatch) { - if (this.produceAccumulator == null) { - throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch"); - } this.autoBatch = autoBatch; } @@ -1439,4 +1449,21 @@ public void setCompressType(CompressionType compressType) { public Compressor getCompressor() { return compressor; } + + public void initProduceAccumulator() { + this.produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); + + if (this.batchMaxDelayMs > -1) { + this.produceAccumulator.batchMaxDelayMs(this.batchMaxDelayMs); + } + + if (this.batchMaxBytes > -1) { + this.produceAccumulator.batchMaxBytes(this.batchMaxBytes); + } + + if (this.totalBatchMaxBytes > -1) { + this.produceAccumulator.totalBatchMaxBytes(this.totalBatchMaxBytes); + } + + } } diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 4cf899f9708..33cf0df390d 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -68,6 +68,7 @@ import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -659,9 +660,9 @@ public void assertCreateDefaultMQProducer() { assertNotNull(producer1); assertEquals(producerGroupTemp, producer1.getProducerGroup()); assertNotNull(producer1.getDefaultMQProducerImpl()); - assertTrue(producer1.getTotalBatchMaxBytes() > 0); - assertTrue(producer1.getBatchMaxBytes() > 0); - assertTrue(producer1.getBatchMaxDelayMs() > 0); + assertEquals(0, producer1.getTotalBatchMaxBytes()); + assertEquals(0, producer1.getBatchMaxBytes()); + assertEquals(0, producer1.getBatchMaxDelayMs()); assertNull(producer1.getTopics()); assertFalse(producer1.isEnableTrace()); assertTrue(UtilAll.isBlank(producer1.getTraceTopic())); @@ -669,9 +670,9 @@ public void assertCreateDefaultMQProducer() { assertNotNull(producer2); assertEquals(producerGroupTemp, producer2.getProducerGroup()); assertNotNull(producer2.getDefaultMQProducerImpl()); - assertTrue(producer2.getTotalBatchMaxBytes() > 0); - assertTrue(producer2.getBatchMaxBytes() > 0); - assertTrue(producer2.getBatchMaxDelayMs() > 0); + assertEquals(0, producer2.getTotalBatchMaxBytes()); + assertEquals(0, producer2.getBatchMaxBytes()); + assertEquals(0, producer2.getBatchMaxDelayMs()); assertNull(producer2.getTopics()); assertFalse(producer2.isEnableTrace()); assertTrue(UtilAll.isBlank(producer2.getTraceTopic())); @@ -679,9 +680,9 @@ public void assertCreateDefaultMQProducer() { assertNotNull(producer3); assertEquals(producerGroupTemp, producer3.getProducerGroup()); assertNotNull(producer3.getDefaultMQProducerImpl()); - assertTrue(producer3.getTotalBatchMaxBytes() > 0); - assertTrue(producer3.getBatchMaxBytes() > 0); - assertTrue(producer3.getBatchMaxDelayMs() > 0); + assertEquals(0, producer3.getTotalBatchMaxBytes()); + assertEquals(0, producer3.getBatchMaxBytes()); + assertEquals(0, producer3.getBatchMaxDelayMs()); assertNotNull(producer3.getTopics()); assertEquals(1, producer3.getTopics().size()); assertFalse(producer3.isEnableTrace()); @@ -690,9 +691,9 @@ public void assertCreateDefaultMQProducer() { assertNotNull(producer4); assertEquals(producerGroupTemp, producer4.getProducerGroup()); assertNotNull(producer4.getDefaultMQProducerImpl()); - assertTrue(producer4.getTotalBatchMaxBytes() > 0); - assertTrue(producer4.getBatchMaxBytes() > 0); - assertTrue(producer4.getBatchMaxDelayMs() > 0); + assertEquals(0, producer4.getTotalBatchMaxBytes()); + assertEquals(0, producer4.getBatchMaxBytes()); + assertEquals(0, producer4.getBatchMaxDelayMs()); assertNull(producer4.getTopics()); assertTrue(producer4.isEnableTrace()); assertEquals("custom_trace_topic", producer4.getTraceTopic()); @@ -700,9 +701,9 @@ public void assertCreateDefaultMQProducer() { assertNotNull(producer5); assertEquals(producerGroupTemp, producer5.getProducerGroup()); assertNotNull(producer5.getDefaultMQProducerImpl()); - assertTrue(producer5.getTotalBatchMaxBytes() > 0); - assertTrue(producer5.getBatchMaxBytes() > 0); - assertTrue(producer5.getBatchMaxDelayMs() > 0); + assertEquals(0, producer5.getTotalBatchMaxBytes()); + assertEquals(0, producer5.getBatchMaxBytes()); + assertEquals(0, producer5.getBatchMaxDelayMs()); assertNotNull(producer5.getTopics()); assertEquals(1, producer5.getTopics().size()); assertTrue(producer5.isEnableTrace()); @@ -810,6 +811,136 @@ public void assertTotalBatchMaxBytes() throws NoSuchFieldException, IllegalAcces assertEquals(0L, producer.getTotalBatchMaxBytes()); } + @Test + public void assertProduceAccumulatorStart() throws NoSuchFieldException, IllegalAccessException, MQClientException { + String producerGroupTemp = producerGroupPrefix + System.nanoTime(); + DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp); + assertEquals(0, producer.getTotalBatchMaxBytes()); + assertEquals(0, producer.getBatchMaxBytes()); + assertEquals(0, producer.getBatchMaxDelayMs()); + assertNull(getField(producer, "produceAccumulator", ProduceAccumulator.class)); + producer.start(); + assertTrue(producer.getTotalBatchMaxBytes() > 0); + assertTrue(producer.getBatchMaxBytes() > 0); + assertTrue(producer.getBatchMaxDelayMs() > 0); + assertNotNull(getField(producer, "produceAccumulator", ProduceAccumulator.class)); + } + + @Test + public void assertProduceAccumulatorBeforeStartSet() throws NoSuchFieldException, IllegalAccessException, MQClientException { + String producerGroupTemp = producerGroupPrefix + System.nanoTime(); + DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp); + producer.totalBatchMaxBytes(64 * 1024 * 100); + producer.batchMaxBytes(64 * 1024); + producer.batchMaxDelayMs(10); + + producer.start(); + assertEquals(64 * 1024, producer.getBatchMaxBytes()); + assertEquals(10, producer.getBatchMaxDelayMs()); + assertNotNull(getField(producer, "produceAccumulator", ProduceAccumulator.class)); + } + + @Test + public void assertProduceAccumulatorAfterStartSet() throws NoSuchFieldException, IllegalAccessException, MQClientException { + String producerGroupTemp = producerGroupPrefix + System.nanoTime(); + DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp); + producer.start(); + + assertNotNull(getField(producer, "produceAccumulator", ProduceAccumulator.class)); + + producer.totalBatchMaxBytes(64 * 1024 * 100); + producer.batchMaxBytes(64 * 1024); + producer.batchMaxDelayMs(10); + + assertEquals(64 * 1024, producer.getBatchMaxBytes()); + assertEquals(10, producer.getBatchMaxDelayMs()); + } + + @Test + public void assertProduceAccumulatorUnit() throws NoSuchFieldException, IllegalAccessException, MQClientException { + String producerGroupTemp = producerGroupPrefix + System.nanoTime(); + DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp); + producer1.setUnitName("unit1"); + DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp); + producer2.setUnitName("unit2"); + + producer1.start(); + producer2.start(); + + ProduceAccumulator producer1Accumulator = getField(producer1, "produceAccumulator", ProduceAccumulator.class); + ProduceAccumulator producer2Accumulator = getField(producer2, "produceAccumulator", ProduceAccumulator.class); + + assertNotNull(producer1Accumulator); + assertNotNull(producer2Accumulator); + + assertNotEquals(producer1Accumulator, producer2Accumulator); + } + + @Test + public void assertProduceAccumulator() throws NoSuchFieldException, IllegalAccessException, MQClientException { + String producerGroupTemp1 = producerGroupPrefix + System.nanoTime(); + DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp1); + producer1.setInstanceName("instanceName1"); + String producerGroupTemp2 = producerGroupPrefix + System.nanoTime(); + DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp2); + producer2.setInstanceName("instanceName2"); + + producer1.start(); + producer2.start(); + + ProduceAccumulator producer1Accumulator = getField(producer1, "produceAccumulator", ProduceAccumulator.class); + ProduceAccumulator producer2Accumulator = getField(producer2, "produceAccumulator", ProduceAccumulator.class); + + assertNotNull(producer1Accumulator); + assertNotNull(producer2Accumulator); + + assertNotEquals(producer1Accumulator, producer2Accumulator); + } + + @Test + public void assertProduceAccumulatorInstanceEqual() throws NoSuchFieldException, IllegalAccessException, MQClientException { + String producerGroupTemp1 = producerGroupPrefix + System.nanoTime(); + DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp1); + producer1.setInstanceName("equalInstance"); + String producerGroupTemp2 = producerGroupPrefix + System.nanoTime(); + DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp2); + producer2.setInstanceName("equalInstance"); + + producer1.start(); + producer2.start(); + + ProduceAccumulator producer1Accumulator = getField(producer1, "produceAccumulator", ProduceAccumulator.class); + ProduceAccumulator producer2Accumulator = getField(producer2, "produceAccumulator", ProduceAccumulator.class); + + assertNotNull(producer1Accumulator); + assertNotNull(producer2Accumulator); + + assertEquals(producer1Accumulator, producer2Accumulator); + } + + @Test + public void assertProduceAccumulatorInstanceAndUnitNameEqual() throws NoSuchFieldException, IllegalAccessException, MQClientException { + String producerGroupTemp1 = producerGroupPrefix + System.nanoTime(); + DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp1); + producer1.setInstanceName("equalInstance"); + producer1.setUnitName("equalUnitName"); + String producerGroupTemp2 = producerGroupPrefix + System.nanoTime(); + DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp2); + producer2.setInstanceName("equalInstance"); + producer2.setUnitName("equalUnitName"); + + producer1.start(); + producer2.start(); + + ProduceAccumulator producer1Accumulator = getField(producer1, "produceAccumulator", ProduceAccumulator.class); + ProduceAccumulator producer2Accumulator = getField(producer2, "produceAccumulator", ProduceAccumulator.class); + + assertNotNull(producer1Accumulator); + assertNotNull(producer2Accumulator); + + assertEquals(producer1Accumulator, producer2Accumulator); + } + @Test public void assertGetRetryResponseCodes() { assertNotNull(producer.getRetryResponseCodes()); @@ -875,4 +1006,11 @@ private void setField(final Object target, final String fieldName, final Object field.setAccessible(true); field.set(target, newValue); } + + private T getField(final Object target, final String fieldName, final Class fieldClassType) throws NoSuchFieldException, IllegalAccessException { + Class targetClazz = target.getClass(); + Field field = targetClazz.getDeclaredField(fieldName); + field.setAccessible(true); + return fieldClassType.cast(field.get(target)); + } }