Skip to content

Commit

Permalink
Remove partition-topic redirection as broker sends partition-metadata…
Browse files Browse the repository at this point in the history
… without redirection (#235)
  • Loading branch information
rdhabalia authored and merlimat committed Feb 23, 2017
1 parent 1691778 commit e2df954
Showing 1 changed file with 3 additions and 9 deletions.
12 changes: 3 additions & 9 deletions pulsar-client-cpp/lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,15 @@ namespace pulsar {
ClientConnectionPtr conn = clientCnx.lock();
uint64_t requestId = newRequestId();
conn->newPartitionedMetadataLookup(destinationName, requestId, lookupPromise);
lookupPromise->getFuture().addListener(boost::bind(&BinaryProtoLookupService::handleLookup, this, destinationName, _1, _2, clientCnx, promise));
lookupPromise->getFuture().addListener(boost::bind(&BinaryProtoLookupService::handlePartitionMetadataLookup, this, destinationName, _1, _2, clientCnx, promise));
}

void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& destinationName,
Result result, LookupDataResultPtr data, const ClientConnectionWeakPtr& clientCnx,
LookupDataResultPromisePtr promise) {
if (data) {
if(data->isRedirect()) {
LOG_DEBUG("PartitionMetadataLookup request is for " << destinationName << " redirected to " << data->getBrokerUrl());
Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(data->getBrokerUrl());
future.addListener(boost::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this, destinationName, _1, _2, promise));
} else {
LOG_DEBUG("PartitionMetadataLookup response for " << destinationName << ", lookup-broker-url " << data->getBrokerUrl());
promise->setValue(data);
}
LOG_DEBUG("PartitionMetadataLookup response for " << destinationName << ", lookup-broker-url " << data->getBrokerUrl());
promise->setValue(data);
} else {
LOG_DEBUG("PartitionMetadataLookup failed for " << destinationName << ", result " << result);
promise->setFailed(result);
Expand Down

0 comments on commit e2df954

Please sign in to comment.