Skip to content

Commit

Permalink
[fix][sec] Add a check for the input time value (apache#22023)
Browse files Browse the repository at this point in the history
(cherry picked from commit 60fed0d)
  • Loading branch information
liangyepianzhou authored and mukesh-ctds committed Mar 6, 2024
1 parent 84eedda commit a475bca
Show file tree
Hide file tree
Showing 22 changed files with 43 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public int getMinimumRolloverTimeMs() {
* the time unit
*/
public void setMinimumRolloverTime(int minimumRolloverTime, TimeUnit unit) {
checkArgument(minimumRolloverTime >= 0);
this.minimumRolloverTimeMs = (int) unit.toMillis(minimumRolloverTime);
checkArgument(maximumRolloverTimeMs >= minimumRolloverTimeMs,
"Minimum rollover time needs to be less than maximum rollover time");
Expand All @@ -196,6 +197,7 @@ public long getMaximumRolloverTimeMs() {
* the time unit
*/
public void setMaximumRolloverTime(int maximumRolloverTime, TimeUnit unit) {
checkArgument(maximumRolloverTime >= 0);
this.maximumRolloverTimeMs = unit.toMillis(maximumRolloverTime);
checkArgument(maximumRolloverTimeMs >= minimumRolloverTimeMs,
"Maximum rollover time needs to be greater than minimum rollover time");
Expand Down Expand Up @@ -412,7 +414,8 @@ public ManagedLedgerConfig setThrottleMarkDelete(double throttleMarkDelete) {
* time unit for retention time
*/
public ManagedLedgerConfig setRetentionTime(int retentionTime, TimeUnit unit) {
this.retentionTimeMs = unit.toMillis(retentionTime);
checkArgument(retentionTime >= -1, "The retention time should be -1, 0 or value > 0");
this.retentionTimeMs = retentionTime != -1 ? unit.toMillis(retentionTime) : -1;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
Expand All @@ -41,6 +42,7 @@ public ManagedLedgerFactoryMBeanImpl(ManagedLedgerFactoryImpl factory) throws Ex
}

public void refreshStats(long period, TimeUnit unit) {
checkArgument(period >= 0);
double seconds = unit.toMillis(period) / 1000.0;

if (seconds <= 0.0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -63,6 +64,7 @@ public ManagedLedgerMBeanImpl(ManagedLedgerImpl managedLedger) {
}

public void refreshStats(long period, TimeUnit unit) {
checkArgument(period >= 0);
double seconds = unit.toMillis(period) / 1000.0;
if (seconds <= 0.0) {
// skip refreshing stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats;

import static com.google.common.base.Preconditions.checkArgument;
import static io.prometheus.client.CollectorRegistry.defaultRegistry;
import io.prometheus.client.Collector;
import io.prometheus.client.Summary;
Expand Down Expand Up @@ -70,6 +71,7 @@ public DimensionStats(String name, long updateDurationInSec) {
}

public void recordDimensionTimeValue(long latency, TimeUnit unit) {
checkArgument(latency >= 0);
summary.observe(unit.toMillis(latency));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats.prometheus.metrics;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.stats.Counter;
Expand Down Expand Up @@ -57,6 +58,7 @@ public void addCount(long delta) {

@Override
public void addLatency(long eventLatency, TimeUnit unit) {
checkArgument(eventLatency >= 0);
long valueMillis = unit.toMillis(eventLatency);
counter.add(valueMillis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ private void checkTopicRetentionPolicy(String topicName, RetentionPolicies reten
ManagedLedgerConfig config = pulsar.getBrokerService()
.getManagedLedgerConfig(TopicName.get(topicName)).get();
Assert.assertEquals(config.getRetentionSizeInMB(), retentionPolicies.getRetentionSizeInMB());
Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes() * 60000L);
Assert.assertEquals(config.getRetentionTimeMillis(), retentionPolicies.getRetentionTimeInMinutes() < 0
? retentionPolicies.getRetentionTimeInMinutes()
: retentionPolicies.getRetentionTimeInMinutes() * 60000L);
}

private void testCompactionCursorRetention(String topic) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ public ClientConfiguration setServiceUrl(String serviceUrl) {
* @param unit the time unit in which the duration is defined
*/
public void setConnectionTimeout(int duration, TimeUnit unit) {
checkArgument(duration >= 0);
confData.setConnectionTimeoutMs((int) unit.toMillis(duration));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public long getAckTimeoutMillis() {
* @return {@link ConsumerConfiguration}
*/
public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit timeUnit) {
checkArgument(ackTimeout >= 0);
long ackTimeoutMillis = timeUnit.toMillis(ackTimeout);
checkArgument(ackTimeoutMillis >= minAckTimeoutMillis,
"Ack timeout should be should be greater than " + minAckTimeoutMillis + " ms");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.admin.internal;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -200,18 +201,21 @@ public PulsarAdminBuilder tlsProtocols(Set<String> tlsProtocols) {

@Override
public PulsarAdminBuilder connectionTimeout(int connectionTimeout, TimeUnit connectionTimeoutUnit) {
checkArgument(connectionTimeout >= 0);
this.conf.setConnectionTimeoutMs((int) connectionTimeoutUnit.toMillis(connectionTimeout));
return this;
}

@Override
public PulsarAdminBuilder readTimeout(int readTimeout, TimeUnit readTimeoutUnit) {
checkArgument(readTimeout >= 0);
this.conf.setReadTimeoutMs((int) readTimeoutUnit.toMillis(readTimeout));
return this;
}

@Override
public PulsarAdminBuilder requestTimeout(int requestTimeout, TimeUnit requestTimeoutUnit) {
checkArgument(requestTimeout >= 0);
this.conf.setRequestTimeoutMs((int) requestTimeoutUnit.toMillis(requestTimeout));
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public TransactionPendingAckStats getPendingAckStats(String topic, String subNam
@Override
public CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactionsByCoordinatorIdAsync(
Integer coordinatorId, long timeout, TimeUnit timeUnit) {
checkArgument(timeout >= 0);
WebTarget path = adminV3Transactions.path("slowTransactions");
path = path.path(timeUnit.toMillis(timeout) + "");
if (coordinatorId != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ public AutoClusterFailoverBuilder switchBackDelay(long switchBackDelay, TimeUnit

@Override
public AutoClusterFailoverBuilder checkInterval(long interval, TimeUnit timeUnit) {
checkArgument(interval >= 0L, "check interval time must not be negative.");
this.checkIntervalMs = timeUnit.toMillis(interval);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) {

@Override
public ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit) {
checkArgument(lookupTimeout >= 0, "lookupTimeout must not be negative");
conf.setLookupTimeoutMs(unit.toMillis(lookupTimeout));
return this;
}
Expand Down Expand Up @@ -331,6 +332,7 @@ public ClientBuilder keepAliveInterval(int keepAliveInterval, TimeUnit unit) {

@Override
public ClientBuilder connectionTimeout(int duration, TimeUnit unit) {
checkArgument(duration >= 0, "connectionTimeout needs to be >= 0");
conf.setConnectionTimeoutMs((int) unit.toMillis(duration));
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ public void acknowledge(Messages<?> messages) throws PulsarClientException {

@Override
public void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
checkArgument(delayTime >= 0, "The delay time must not be negative.");
reconsumeLater(message, null, delayTime, unit);
}

Expand Down Expand Up @@ -563,6 +564,7 @@ public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long dela
@Override
public CompletableFuture<Void> reconsumeLaterAsync(
Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) {
checkArgument(delayTime >= 0, "The delay time must not be negative.");
if (!conf.isRetryEnable()) {
return FutureUtil.failedFuture(new PulsarClientException(RECONSUME_LATER_ERROR_MSG));
}
Expand Down Expand Up @@ -599,12 +601,14 @@ public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {

@Override
public CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delayTime, TimeUnit unit) {
checkArgument(delayTime >= 0, "The delay time must not be negative.");
return reconsumeLaterCumulativeAsync(message, null, delayTime, unit);
}

@Override
public CompletableFuture<Void> reconsumeLaterCumulativeAsync(
Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) {
checkArgument(delayTime >= 0, "The delay time must not be negative.");
if (!conf.isRetryEnable()) {
return FutureUtil.failedFuture(new PulsarClientException(RECONSUME_LATER_ERROR_MSG));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ public ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean batchIndexAckno

@Override
public ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit) {
checkArgument(duration >= 0, "expired time of incomplete chunk message must not be negative");
conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ public ControlledClusterFailoverBuilder urlProviderHeader(Map<String, String> he

@Override
public ControlledClusterFailoverBuilder checkInterval(long interval, @NonNull TimeUnit timeUnit) {
checkArgument(interval >= 0, "The check interval time must not be negative.");
this.interval = timeUnit.toMillis(interval);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ public ReaderBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOl

@Override
public ReaderBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit) {
checkArgument(duration >= 0, "The expired time must not be negative.");
conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
Expand Down Expand Up @@ -205,6 +206,7 @@ private void failPendingRequest() {
}

public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit unit) {
checkArgument(timeout >= 0, "The timeout must not be negative.");
if (LOG.isDebugEnabled()) {
LOG.debug("New transaction with timeout in ms {}", unit.toMillis(timeout));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ public TypedMessageBuilder<T> disableReplication() {

@Override
public TypedMessageBuilder<T> deliverAfter(long delay, TimeUnit unit) {
checkArgument(delay >= 0, "The delay time must not be negative.");
return deliverAt(System.currentTimeMillis() + unit.toMillis(delay));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.transaction;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -45,6 +46,7 @@ public TransactionBuilderImpl(PulsarClientImpl client, TransactionCoordinatorCli

@Override
public TransactionBuilder withTransactionTimeout(long txnTimeout, TimeUnit timeoutUnit) {
checkArgument(txnTimeout >= 0, "The txn timeout must not be negative.");
this.txnTimeout = txnTimeout;
this.timeUnit = timeoutUnit;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.util;

import static com.google.common.base.Preconditions.checkArgument;
import java.time.Clock;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand All @@ -33,6 +34,7 @@ public class ObjectCache<T> implements Supplier<T> {

public ObjectCache(Supplier<T> supplier, long cacheDuration, TimeUnit unit) {
this(supplier, cacheDuration, unit, Clock.systemUTC());
checkArgument(cacheDuration >= 0, "The cache duration must not be negative.");
}

ObjectCache(Supplier<T> supplier, long cacheDuration, TimeUnit unit, Clock clock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.client;

import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -364,6 +365,7 @@ public synchronized CompletableFuture<Void> promiseAfter(int steps, List<Complet
}

public synchronized void addEntryDelay(long delay, TimeUnit unit) {
checkArgument(delay >= 0, "The delay time must not be negative.");
addEntryDelaysMillis.add(unit.toMillis(delay));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.client;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -65,6 +66,7 @@ public void addCount(long delta) {

@Override
public void addLatency(long eventLatency, TimeUnit unit) {
checkArgument(eventLatency >= 0, "The event latency must not be negative.");
long valueMillis = unit.toMillis(eventLatency);
updateMax(val.addAndGet(valueMillis));
}
Expand Down

0 comments on commit a475bca

Please sign in to comment.