Skip to content

Commit

Permalink
[pulsar-admin] add read-timeout to blocked admin-request (#4484)
Browse files Browse the repository at this point in the history
* [pulsar-admin] add read-timeout to blocked admin-request

* fix: pass readTimeout from broker + consistent naming
  • Loading branch information
rdhabalia authored and nkurihar committed Jun 13, 2019
1 parent 150961e commit 46a068b
Show file tree
Hide file tree
Showing 20 changed files with 97 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -837,9 +837,13 @@ public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
}

// most of the admin request requires to make zk-call so, keep the max read-timeout based on
// zk-operation timeout
builder.readTimeout(conf.getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);

this.adminClient = builder.build();
LOG.info("Admin api url: " + adminApiUrl);
LOG.info("created admin with url {} ", adminApiUrl);
} catch (Exception e) {
throw new PulsarServerException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,22 +187,23 @@ public PulsarAdmin(String serviceUrl,
Math.toIntExact(readTimeoutUnit.toMillis(this.readTimeout)),
Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout))).getHttpClient();

this.clusters = new ClustersImpl(root, auth);
this.brokers = new BrokersImpl(root, auth);
this.brokerStats = new BrokerStatsImpl(root, auth);
this.tenants = new TenantsImpl(root, auth);
this.properties = new TenantsImpl(root, auth);;
this.namespaces = new NamespacesImpl(root, auth);
this.topics = new TopicsImpl(root, auth);
this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth);
this.resourceQuotas = new ResourceQuotasImpl(root, auth);
this.lookups = new LookupImpl(root, auth, useTls);
this.functions = new FunctionsImpl(root, auth, httpAsyncClient);
this.sources = new SourcesImpl(root, auth, httpAsyncClient);
this.sinks = new SinksImpl(root, auth, httpAsyncClient);
this.worker = new WorkerImpl(root, auth);
this.schemas = new SchemasImpl(root, auth);
this.bookies = new BookiesImpl(root, auth);
long readTimeoutMs = readTimeoutUnit.toMillis(this.readTimeout);
this.clusters = new ClustersImpl(root, auth, readTimeoutMs);
this.brokers = new BrokersImpl(root, auth, readTimeoutMs);
this.brokerStats = new BrokerStatsImpl(root, auth, readTimeoutMs);
this.tenants = new TenantsImpl(root, auth, readTimeoutMs);
this.properties = new TenantsImpl(root, auth, readTimeoutMs);
this.namespaces = new NamespacesImpl(root, auth, readTimeoutMs);
this.topics = new TopicsImpl(root, auth, readTimeoutMs);
this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth, readTimeoutMs);
this.resourceQuotas = new ResourceQuotasImpl(root, auth, readTimeoutMs);
this.lookups = new LookupImpl(root, auth, useTls, readTimeoutMs);
this.functions = new FunctionsImpl(root, auth, httpAsyncClient, readTimeoutMs);
this.sources = new SourcesImpl(root, auth, httpAsyncClient, readTimeoutMs);
this.sinks = new SinksImpl(root, auth, httpAsyncClient, readTimeoutMs);
this.worker = new WorkerImpl(root, auth, readTimeoutMs);
this.schemas = new SchemasImpl(root, auth, readTimeoutMs);
this.bookies = new BookiesImpl(root, auth, readTimeoutMs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ public PreconditionFailedException(ClientErrorException e) {
super(e);
}
}
public static class TimeoutException extends PulsarAdminException {
public TimeoutException(Throwable t) {
super(t);
}
}

public static class ServerSideErrorException extends PulsarAdminException {
public ServerSideErrorException(ServerErrorException e, String msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ public abstract class BaseResource {
private static final Logger log = LoggerFactory.getLogger(BaseResource.class);

protected final Authentication auth;
protected final long readTimeoutMs;

protected BaseResource(Authentication auth) {
protected BaseResource(Authentication auth, long readTimeoutMs) {
this.auth = auth;
this.readTimeoutMs = readTimeoutMs;
}

public Builder request(final WebTarget target) throws PulsarAdminException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
public class BookiesImpl extends BaseResource implements Bookies {
private final WebTarget adminBookies;

public BookiesImpl(WebTarget web, Authentication auth) {
super(auth);
public BookiesImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
adminBookies = web.path("/admin/v2/bookies");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public class BrokerStatsImpl extends BaseResource implements BrokerStats {
private final WebTarget adminBrokerStats;
private final WebTarget adminV2BrokerStats;

public BrokerStatsImpl(WebTarget target, Authentication auth) {
super(auth);
public BrokerStatsImpl(WebTarget target, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
adminBrokerStats = target.path("/admin/broker-stats");
adminV2BrokerStats = target.path("/admin/v2/broker-stats");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
public class BrokersImpl extends BaseResource implements Brokers {
private final WebTarget adminBrokers;

public BrokersImpl(WebTarget web, Authentication auth) {
super(auth);
public BrokersImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
adminBrokers = web.path("/admin/v2/brokers");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public class ClustersImpl extends BaseResource implements Clusters {

private final WebTarget adminClusters;

public ClustersImpl(WebTarget web, Authentication auth) {
super(auth);
public ClustersImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
adminClusters = web.path("/admin/v2/clusters");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

public class ComponentResource extends BaseResource {

protected ComponentResource(Authentication auth) {
super(auth);
protected ComponentResource(Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
}

public RequestBuilder addAuthHeaders(WebTarget target, RequestBuilder requestBuilder) throws PulsarAdminException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public class FunctionsImpl extends ComponentResource implements Functions {
private final WebTarget functions;
private final AsyncHttpClient asyncHttpClient;

public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
super(auth);
public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long readTimeoutMs) {
super(auth, readTimeoutMs);
this.functions = web.path("/admin/v3/functions");
this.asyncHttpClient = asyncHttpClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public class LookupImpl extends BaseResource implements Lookup {
private final WebTarget v2lookup;
private final boolean useTls;

public LookupImpl(WebTarget web, Authentication auth, boolean useTls) {
super(auth);
public LookupImpl(WebTarget web, Authentication auth, boolean useTls, long readTimeoutMs) {
super(auth, readTimeoutMs);
this.useTls = useTls;
v2lookup = web.path("/lookup/v2");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
private final WebTarget adminNamespaces;
private final WebTarget adminV2Namespaces;

public NamespacesImpl(WebTarget web, Authentication auth) {
super(auth);
public NamespacesImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
adminNamespaces = web.path("/admin/namespaces");
adminV2Namespaces = web.path("/admin/v2/namespaces");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
private final WebTarget adminNonPersistentTopics;
private final WebTarget adminV2NonPersistentTopics;

public NonPersistentTopicsImpl(WebTarget web, Authentication auth) {
super(auth);
public NonPersistentTopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
adminNonPersistentTopics = web.path("/admin");
adminV2NonPersistentTopics = web.path("/admin/v2");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public class ResourceQuotasImpl extends BaseResource implements ResourceQuotas {
private final WebTarget adminQuotas;
private final WebTarget adminV2Quotas;

public ResourceQuotasImpl(WebTarget web, Authentication auth) {
super(auth);
public ResourceQuotasImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
adminQuotas = web.path("/admin/resource-quotas");
adminV2Quotas = web.path("/admin/v2/resource-quotas");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public class SchemasImpl extends BaseResource implements Schemas {

private final WebTarget target;

public SchemasImpl(WebTarget web, Authentication auth) {
super(auth);
public SchemasImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
this.target = web.path("/admin/v2/schemas");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
private final WebTarget sink;
private final AsyncHttpClient asyncHttpClient;

public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
super(auth);
public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long readTimeoutMs) {
super(auth, readTimeoutMs);
this.sink = web.path("/admin/v3/sink");
this.asyncHttpClient = asyncHttpClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
private final WebTarget source;
private final AsyncHttpClient asyncHttpClient;

public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
super(auth);
public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long readTimeoutMs) {
super(auth, readTimeoutMs);
this.source = web.path("/admin/v3/source");
this.asyncHttpClient = asyncHttpClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
public class TenantsImpl extends BaseResource implements Tenants, Properties {
private final WebTarget adminTenants;

public TenantsImpl(WebTarget web, Authentication auth) {
super(auth);
public TenantsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
adminTenants = web.path("/admin/v2/tenants");
}

Expand Down
Loading

0 comments on commit 46a068b

Please sign in to comment.