Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Branch 3.2 cherry pick #69

Merged
merged 9 commits into from
Jul 25, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,8 @@ public static void createTenantIfAbsent(PulsarResources resources, String tenant
}
}

static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName,
String cluster, int bundleNumber) throws IOException {
public static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName,
String cluster, int bundleNumber) throws IOException {
NamespaceResources namespaceResources = resources.getNamespaceResources();

if (!namespaceResources.namespaceExists(namespaceName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final ExecutorProvider transactionExecutorProvider;
private final DefaultMonotonicSnapshotClock monotonicSnapshotClock;
private String brokerId;
private final CompletableFuture<Void> readyForIncomingRequestsFuture = new CompletableFuture<>();

public enum State {
Init, Started, Closing, Closed
Expand Down Expand Up @@ -904,6 +905,9 @@ public void start() throws PulsarServerException {

this.metricsGenerator = new MetricsGenerator(this);

// the broker is ready to accept incoming requests by Pulsar binary protocol and http/https
readyForIncomingRequestsFuture.complete(null);

// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
// so that the protocol handlers can access broker service properly.
Expand Down Expand Up @@ -952,12 +956,22 @@ public void start() throws PulsarServerException {
state = State.Started;
} catch (Exception e) {
LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e);
throw new PulsarServerException(e);
PulsarServerException startException = new PulsarServerException(e);
readyForIncomingRequestsFuture.completeExceptionally(startException);
throw startException;
} finally {
mutex.unlock();
}
}

public void runWhenReadyForIncomingRequests(Runnable runnable) {
readyForIncomingRequestsFuture.thenRun(runnable);
}

public void waitUntilReadyForIncomingRequests() throws ExecutionException, InterruptedException {
readyForIncomingRequestsFuture.get();
}

protected BrokerInterceptor newBrokerInterceptor() throws IOException {
return BrokerInterceptors.load(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.Reflections;
Expand Down Expand Up @@ -63,7 +64,7 @@ public interface LoadManager {
Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;

default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
Optional<ServiceUnitId> topic, ServiceUnitId bundle, LookupOptions options) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,12 @@ public interface ExtensibleLoadManager extends Closeable {
* (e.g. {@link NamespaceService#internalGetWebServiceUrl(NamespaceBundle, LookupOptions)}),
* So the topic is optional.
* @param serviceUnit service unit (e.g. bundle).
* @param options The lookup options.
* @return The broker lookup data.
*/
CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);
CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic,
ServiceUnitId serviceUnit,
LookupOptions options);

/**
* Check the incoming service unit is owned by the current broker.
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;

public class ExtensibleLoadManagerWrapper implements LoadManager {
Expand Down Expand Up @@ -62,9 +63,15 @@ public boolean isCentralized() {

@Override
public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
return loadManager.assign(topic, bundle)
.thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult));
Optional<ServiceUnitId> topic, ServiceUnitId bundle, LookupOptions options) {
return loadManager.assign(topic, bundle, options)
.thenApply(lookupData -> lookupData.map(data -> {
try {
return data.toLookupResult(options);
} catch (PulsarServerException ex) {
throw FutureUtil.wrapToCompletionException(ex);
}
}));
}

@Override
Expand Down
Loading
Loading