diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index 389a6af4aaa58..c1d4f54e4d72b 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -18,16 +18,19 @@ */ package org.apache.pulsar.testclient; +import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES; import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES; import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; +import com.beust.jcommander.IStringConverter; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.common.collect.Range; import com.google.common.util.concurrent.RateLimiter; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.FileOutputStream; @@ -200,6 +203,11 @@ static class Arguments extends PerformanceTopicListArguments { "--delay" }, description = "Mark messages with a given delay in seconds") public long delay = 0; + @Parameter(names = { "-dr", "--delay-range"}, description = "Mark messages with a given delay by a random" + + " number of seconds. this value between the specified origin (inclusive) and the specified bound" + + " (exclusive). e.g. \"1,300\"", converter = RangeConvert.class) + public Range delayRange = null; + @Parameter(names = { "-set", "--set-event-time" }, description = "Set the eventTime on messages") public boolean setEventTime = false; @@ -622,6 +630,10 @@ private static void runProducer(int producerId, } if (arguments.delay > 0) { messageBuilder.deliverAfter(arguments.delay, TimeUnit.SECONDS); + } else if (arguments.delayRange != null) { + final long deliverAfter = ThreadLocalRandom.current() + .nextLong(arguments.delayRange.lowerEndpoint(), arguments.delayRange.upperEndpoint()); + messageBuilder.deliverAfter(deliverAfter, TimeUnit.SECONDS); } if (arguments.setEventTime) { messageBuilder.eventTime(System.currentTimeMillis()); @@ -783,4 +795,21 @@ private static void printAggregatedStats() { public enum MessageKeyGenerationMode { autoIncrement, random } + + static class RangeConvert implements IStringConverter> { + @Override + public Range convert(String rangeStr) { + try { + requireNonNull(rangeStr); + final String[] facts = rangeStr.substring(1, rangeStr.length() - 1).split(","); + final long min = Long.parseLong(facts[0].trim()); + final long max = Long.parseLong(facts[1].trim()); + return Range.closedOpen(min, max); + } catch (Throwable ex) { + throw new IllegalArgumentException("Unknown delay range interval," + + " the format should be \",\". error message: " + rangeStr); + } + } + } + }