Skip to content

Commit

Permalink
[improve][admin] PulsarAdminBuilderImpl overrides timeout properties …
Browse files Browse the repository at this point in the history
…passed through config map (#17375)
  • Loading branch information
Technoboy- authored Nov 18, 2022
1 parent 007129b commit 79d1e52
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ public void testCertRefreshForPulsarAdmin() throws Exception {
.allowTlsInsecureConnection(false)
.enableTlsHostnameVerification(false)
.serviceHttpUrl(brokerUrlTls.toString())
.autoCertRefreshTime(1, TimeUnit.SECONDS)
.autoCertRefreshTime(autoCertRefreshTimeSec, TimeUnit.SECONDS)
.authentication("org.apache.pulsar.client.impl.auth.AuthenticationTls",
String.format("tlsCertFile:%s,tlsKeyFile:%s",
getTLSFile(adminUser + ".cert"), keyFile))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,12 @@
public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {

protected ClientConfigurationData conf;
private int connectTimeout = PulsarAdminImpl.DEFAULT_CONNECT_TIMEOUT_SECONDS;
private int readTimeout = PulsarAdminImpl.DEFAULT_READ_TIMEOUT_SECONDS;
private int requestTimeout = PulsarAdminImpl.DEFAULT_REQUEST_TIMEOUT_SECONDS;
private int autoCertRefreshTime = PulsarAdminImpl.DEFAULT_CERT_REFRESH_SECONDS;
private TimeUnit connectTimeoutUnit = TimeUnit.SECONDS;
private TimeUnit readTimeoutUnit = TimeUnit.SECONDS;
private TimeUnit requestTimeoutUnit = TimeUnit.SECONDS;
private TimeUnit autoCertRefreshTimeUnit = TimeUnit.SECONDS;

private ClassLoader clientBuilderClassLoader = null;

@Override
public PulsarAdmin build() throws PulsarClientException {
return new PulsarAdminImpl(conf.getServiceUrl(), conf, connectTimeout, connectTimeoutUnit, readTimeout,
readTimeoutUnit, requestTimeout, requestTimeoutUnit, autoCertRefreshTime,
autoCertRefreshTimeUnit, clientBuilderClassLoader);
return new PulsarAdminImpl(conf.getServiceUrl(), conf, clientBuilderClassLoader);
}

public PulsarAdminBuilderImpl() {
Expand Down Expand Up @@ -187,29 +178,25 @@ public PulsarAdminBuilder tlsProtocols(Set<String> tlsProtocols) {

@Override
public PulsarAdminBuilder connectionTimeout(int connectionTimeout, TimeUnit connectionTimeoutUnit) {
this.connectTimeout = connectionTimeout;
this.connectTimeoutUnit = connectionTimeoutUnit;
this.conf.setConnectionTimeoutMs((int) connectionTimeoutUnit.toMillis(connectionTimeout));
return this;
}

@Override
public PulsarAdminBuilder readTimeout(int readTimeout, TimeUnit readTimeoutUnit) {
this.readTimeout = readTimeout;
this.readTimeoutUnit = readTimeoutUnit;
this.conf.setReadTimeoutMs((int) readTimeoutUnit.toMillis(readTimeout));
return this;
}

@Override
public PulsarAdminBuilder requestTimeout(int requestTimeout, TimeUnit requestTimeoutUnit) {
this.requestTimeout = requestTimeout;
this.requestTimeoutUnit = requestTimeoutUnit;
this.conf.setRequestTimeoutMs((int) requestTimeoutUnit.toMillis(requestTimeout));
return this;
}

@Override
public PulsarAdminBuilder autoCertRefreshTime(int autoCertRefreshTime, TimeUnit autoCertRefreshTimeUnit) {
this.autoCertRefreshTime = autoCertRefreshTime;
this.autoCertRefreshTimeUnit = autoCertRefreshTimeUnit;
this.conf.setAutoCertRefreshSeconds((int) autoCertRefreshTimeUnit.toSeconds(autoCertRefreshTime));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@
public class PulsarAdminImpl implements PulsarAdmin {
private static final Logger LOG = LoggerFactory.getLogger(PulsarAdmin.class);

public static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 60;
public static final int DEFAULT_READ_TIMEOUT_SECONDS = 60;
public static final int DEFAULT_REQUEST_TIMEOUT_SECONDS = 300;
public static final int DEFAULT_CERT_REFRESH_SECONDS = 300;

private final Clusters clusters;
private final Brokers brokers;
Expand Down Expand Up @@ -106,38 +103,11 @@ public class PulsarAdminImpl implements PulsarAdmin {
private final Transactions transactions;
protected final WebTarget root;
protected final Authentication auth;
private final int connectTimeout;
private final TimeUnit connectTimeoutUnit;
private final int readTimeout;
private final TimeUnit readTimeoutUnit;
private final int requestTimeout;
private final TimeUnit requestTimeoutUnit;

public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigData) throws PulsarClientException {
this(serviceUrl, clientConfigData, DEFAULT_CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS,
DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS, DEFAULT_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS,
DEFAULT_CERT_REFRESH_SECONDS, TimeUnit.SECONDS, null);
}

public PulsarAdminImpl(String serviceUrl,
ClientConfigurationData clientConfigData,
int connectTimeout,
TimeUnit connectTimeoutUnit,
int readTimeout,
TimeUnit readTimeoutUnit,
int requestTimeout,
TimeUnit requestTimeoutUnit,
int autoCertRefreshTime,
TimeUnit autoCertRefreshTimeUnit,
ClassLoader clientBuilderClassLoader) throws PulsarClientException {

public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigData,
ClassLoader clientBuilderClassLoader) throws PulsarClientException {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service URL needs to be specified");

this.connectTimeout = connectTimeout;
this.connectTimeoutUnit = connectTimeoutUnit;
this.readTimeout = readTimeout;
this.readTimeoutUnit = readTimeoutUnit;
this.requestTimeout = requestTimeout;
this.requestTimeoutUnit = requestTimeoutUnit;
this.clientConfigData = clientConfigData;
this.auth = clientConfigData != null ? clientConfigData.getAuthentication() : new AuthenticationDisabled();
LOG.debug("created: serviceUrl={}, authMethodName={}", serviceUrl,
Expand All @@ -152,7 +122,7 @@ public PulsarAdminImpl(String serviceUrl,
}

AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData,
(int) autoCertRefreshTimeUnit.toSeconds(autoCertRefreshTime));
clientConfigData.getAutoCertRefreshSeconds());

ClientConfig httpConfig = new ClientConfig();
httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
Expand All @@ -168,8 +138,8 @@ public PulsarAdminImpl(String serviceUrl,

ClientBuilder clientBuilder = ClientBuilder.newBuilder()
.withConfig(httpConfig)
.connectTimeout(this.connectTimeout, this.connectTimeoutUnit)
.readTimeout(this.readTimeout, this.readTimeoutUnit)
.connectTimeout(this.clientConfigData.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS)
.readTimeout(this.clientConfigData.getReadTimeoutMs(), TimeUnit.MILLISECONDS)
.register(JacksonConfigurator.class).register(JacksonFeature.class);

boolean useTls = clientConfigData.getServiceUrl().startsWith("https://");
Expand All @@ -181,12 +151,12 @@ public PulsarAdminImpl(String serviceUrl,
root = client.target(serviceUri.selectOne());

this.asyncHttpConnector = asyncConnectorProvider.getConnector(
Math.toIntExact(connectTimeoutUnit.toMillis(this.connectTimeout)),
Math.toIntExact(readTimeoutUnit.toMillis(this.readTimeout)),
Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout)),
(int) autoCertRefreshTimeUnit.toSeconds(autoCertRefreshTime));
Math.toIntExact(clientConfigData.getConnectionTimeoutMs()),
Math.toIntExact(clientConfigData.getReadTimeoutMs()),
Math.toIntExact(clientConfigData.getRequestTimeoutMs()),
clientConfigData.getAutoCertRefreshSeconds());

long readTimeoutMs = readTimeoutUnit.toMillis(this.readTimeout);
long readTimeoutMs = clientConfigData.getReadTimeoutMs();
this.clusters = new ClustersImpl(root, auth, readTimeoutMs);
this.brokers = new BrokersImpl(root, auth, readTimeoutMs);
this.brokerStats = new BrokerStatsImpl(root, auth, readTimeoutMs);
Expand Down Expand Up @@ -228,7 +198,7 @@ public PulsarAdminImpl(String serviceUrl,
*/
@Deprecated
public PulsarAdminImpl(URL serviceUrl, Authentication auth) throws PulsarClientException {
this(serviceUrl.toString(), getConfigData(auth));
this(serviceUrl.toString(), getConfigData(auth), null);
}

private static ClientConfigurationData getConfigData(Authentication auth) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Map;

public class PulsarAdminBuilderImplTest {

Expand All @@ -35,4 +40,21 @@ public void testAdminBuilderWithServiceUrlNotSet() throws PulsarClientException
assertEquals("Service URL needs to be specified", exception.getMessage());
}
}

@Test
public void testGetPropertiesFromConf() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("serviceUrl", "pulsar://localhost:6650");
config.put("requestTimeoutMs", 10);
config.put("autoCertRefreshSeconds", 20);
config.put("connectionTimeoutMs", 30);
config.put("readTimeoutMs", 40);
PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().loadConf(config);
PulsarAdminImpl admin = (PulsarAdminImpl) adminBuilder.build();
ClientConfigurationData clientConfigData = admin.getClientConfigData();
Assert.assertEquals(clientConfigData.getRequestTimeoutMs(), 10);
Assert.assertEquals(clientConfigData.getAutoCertRefreshSeconds(), 20);
Assert.assertEquals(clientConfigData.getConnectionTimeoutMs(), 30);
Assert.assertEquals(clientConfigData.getReadTimeoutMs(), 40);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -2182,19 +2181,9 @@ public void requestTimeout() throws Exception {
//Ok
}

ClientConfigurationData conf = ((PulsarAdminImpl)tool.getPulsarAdminSupplier().get()).getClientConfigData();

final PulsarAdmin admin = tool.getPulsarAdminSupplier().get();
Field requestTimeoutField =
PulsarAdminImpl.class.getDeclaredField("requestTimeout");
requestTimeoutField.setAccessible(true);
int requestTimeout = (int) requestTimeoutField.get(admin);

Field requestTimeoutUnitField =
PulsarAdminImpl.class.getDeclaredField("requestTimeoutUnit");
requestTimeoutUnitField.setAccessible(true);
TimeUnit requestTimeoutUnit = (TimeUnit) requestTimeoutUnitField.get(admin);
assertEquals(1, requestTimeout);
assertEquals(TimeUnit.SECONDS, requestTimeoutUnit);
assertEquals(1000, conf.getRequestTimeoutMs());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,18 @@ public class ClientConfigurationData implements Serializable, Cloneable {
)
private int requestTimeoutMs = 60000;

@ApiModelProperty(
name = "readTimeoutMs",
value = "Maximum read time of a request."
)
private int readTimeoutMs = 60000;

@ApiModelProperty(
name = "autoCertRefreshSeconds",
value = "Seconds of auto refreshing certificate."
)
private int autoCertRefreshSeconds = 300;

@ApiModelProperty(
name = "initialBackoffIntervalNanos",
value = "Initial backoff interval (in nanosecond)."
Expand Down

0 comments on commit 79d1e52

Please sign in to comment.