Skip to content

Commit

Permalink
[feat][test] Support delay messages in a random number of seconds by …
Browse files Browse the repository at this point in the history
…the range (apache#20717)
  • Loading branch information
mattisonchao authored Jul 5, 2023
1 parent ac33311 commit 4bf208d
Showing 1 changed file with 29 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
*/
package org.apache.pulsar.testclient;

import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES;
import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES;
import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.FileOutputStream;
Expand Down Expand Up @@ -200,6 +203,11 @@ static class Arguments extends PerformanceTopicListArguments {
"--delay" }, description = "Mark messages with a given delay in seconds")
public long delay = 0;

@Parameter(names = { "-dr", "--delay-range"}, description = "Mark messages with a given delay by a random"
+ " number of seconds. this value between the specified origin (inclusive) and the specified bound"
+ " (exclusive). e.g. \"1,300\"", converter = RangeConvert.class)
public Range<Long> delayRange = null;

@Parameter(names = { "-set",
"--set-event-time" }, description = "Set the eventTime on messages")
public boolean setEventTime = false;
Expand Down Expand Up @@ -622,6 +630,10 @@ private static void runProducer(int producerId,
}
if (arguments.delay > 0) {
messageBuilder.deliverAfter(arguments.delay, TimeUnit.SECONDS);
} else if (arguments.delayRange != null) {
final long deliverAfter = ThreadLocalRandom.current()
.nextLong(arguments.delayRange.lowerEndpoint(), arguments.delayRange.upperEndpoint());
messageBuilder.deliverAfter(deliverAfter, TimeUnit.SECONDS);
}
if (arguments.setEventTime) {
messageBuilder.eventTime(System.currentTimeMillis());
Expand Down Expand Up @@ -783,4 +795,21 @@ private static void printAggregatedStats() {
public enum MessageKeyGenerationMode {
autoIncrement, random
}

static class RangeConvert implements IStringConverter<Range<Long>> {
@Override
public Range<Long> convert(String rangeStr) {
try {
requireNonNull(rangeStr);
final String[] facts = rangeStr.substring(1, rangeStr.length() - 1).split(",");
final long min = Long.parseLong(facts[0].trim());
final long max = Long.parseLong(facts[1].trim());
return Range.closedOpen(min, max);
} catch (Throwable ex) {
throw new IllegalArgumentException("Unknown delay range interval,"
+ " the format should be \"<origin>,<bound>\". error message: " + rangeStr);
}
}
}

}

0 comments on commit 4bf208d

Please sign in to comment.