-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Topic lookup using Pulsar binary protocol #5
Conversation
CLA is valid! |
} | ||
}); | ||
|
||
LookupResult result = new LookupResult(new URI(candidateBroker), report.getPulsarServiceUrl(), report.getPulsarServieUrlTls()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use the http & https urls from the report here (all 4 the urls)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually in this case, returned LookupResult
contains redirect
urls.
- Http lookup request just requires broker-url where call should be redirected. So,
LookupResult
has onehttp/https url
- Binary-Protocol request sends both broker service-url/tls and client decides which one needs to be selected based on client configuration.
e254af3
to
ff3c89b
Compare
@@ -56,7 +56,8 @@ | |||
private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class); | |||
|
|||
private final ClientConfiguration conf; | |||
private final HttpClient httpClient; | |||
private HttpClient httpClient; | |||
private boolean isHttpURI; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should abstract the LookupService
to have 2 different implementation instead of relying on the flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.. made the change to abstract LookupService
.
e5dcb7a
to
f37db88
Compare
@@ -177,7 +177,8 @@ public SimpleLoadManagerImpl(PulsarService pulsar) { | |||
this.realtimeResourceQuotas.set(new HashMap<>()); | |||
this.realtimeAvgResourceQuota = new ResourceQuota(); | |||
placementStrategy = new WRRPlacementStrategy(); | |||
lastLoadReport = new LoadReport(); | |||
lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is already in the other PR #11
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes..we picked up the same commit to unblock this enhancement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That PR is merged, so you can just rebase to get rid of the commit from this PR.
* | ||
* Lookup broker-service address for a given namespace-bundle which contains given topic. | ||
* | ||
* a. If current-broker receives lookup-request and if it's not a leader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we return the broker address if the bundle is already owned? If I'm not wrong, we only redirect the request if the bundle is not owned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.. that is correct. added that point in comment as well.
String clientAppId, long requestId) { | ||
final String cluster = fqdn.getCluster(); | ||
try { | ||
// (1) validate cluster |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we put these validation checks in separate methods maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, we have already abstracted all validation. and each validation step: we are returning different LookupResponse-ByteBuf
} | ||
|
||
private final Type type; | ||
private final LookupData lookupData; | ||
private final URI httpRedirectAddress; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you confirm if removing this will be backward compatible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.. as we were not returning httpRedirectAddress
to client and it was being used in internal method to find out final returning result.
- If
Type== HttpRedirectUrl
then redirect-http request to URLhttpRedirectAddress
.
However, in new change all urls service/web-service/tls
will be part of LookupData
and redirection will be determined using RedirectUrl
type.
@@ -366,7 +357,8 @@ private LookupResult findBrokerServiceUrl(ServiceUnitId suName, boolean authorit | |||
pulsar.getBrokerServiceUrl(), pulsar.getWebServiceAddress(), candidateBroker, suName); | |||
} | |||
// Now setting the redirect url | |||
return new LookupResult(new URI(candidateBroker)); | |||
//new LookupResult(new URI(candidateBroker)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed comment..
return result; | ||
} | ||
|
||
public ObjectMapper jsonMapper() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is already there in AdminResource.java
} | ||
|
||
@Override | ||
protected void handlePartitionRequest(CommandPartitionedTopicMetadata partitionMetadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handlePartitionRequest
-> handlePartitionMetadataRequest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alright.. renamed the method name.
return serviceAddress.toString(); | ||
} | ||
|
||
static class LookupDataResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we reuse the POJO that we already have for LookupResult
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LookupResult
POJO is being used to return Lookup result by Broker-http-lookup. However, to implement Lookup-Response-handling in binary-protocol lookup
we need few more additional attributes such as authoritative, redirect, partitions
. So, we have this POJO at client side to handle Future.Response
for topic-lookup
and partition-lookup
d6b782e
to
76094a4
Compare
Please ignore travis-build for this PR for now: as it seems there is an issue with travis-build, it always picks up a wrong commit-1b99fa608daa42955e09356f89ae498d6c969347 from different PR and that wrong commit had a unit-testcase-failure-issue which is already fixed in later commit of PR. I have local-travis-job setup which is green by picking up correct commit. |
uri.getPort()); | ||
pulsar.getLocalZkCache().getDataAsync(path, new Deserializer<LoadReport>() { | ||
@Override | ||
public LoadReport deserialize(String key, byte[] content) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This deserializer could be made static
lookup.getAuthoritative(), getRole(), lookup.getRequestId()).thenAccept(lookupResponse -> { | ||
ctx.writeAndFlush(lookupResponse); | ||
}).exceptionally(ex -> { | ||
// it should never happen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could actually fail for multiple reasons :) (invalid auth, no zk session.. etc )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, lookupDestinationAsync(..)
handles all failure scenarios (auth, no zk session,..) and always returns appropriate ByteBuf
with specific error-response. therefore, future should not complete with exception.
I have addressed all other comments as well.
@@ -128,7 +136,7 @@ protected void validateSuperUserAccess() { | |||
if (config().isAuthenticationEnabled()) { | |||
String appId = clientAppId(); | |||
log.debug("[{}] Check super user access: Authenticated: {} -- Role: {}", uri.getRequestUri(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you wrap this log.debug()
call with the if (log.isDebugEnabled())
class BinaryProtoLookupService implements LookupService { | ||
|
||
private final ConnectionPool cnxPool; | ||
protected InetSocketAddress serviceAddress; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serviceAddress
can be final
@rdhabalia There seems to be a conflict with master in the latest push |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* add protobuf.json to git * make protobuf doc generation static
Motivation
To improve lookup performance: adding lookup API in Pulsar binary protocol for:
Modifications
Result
Result: Client-library can do topic-lookup using pulsar binary protocol.