Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass brokerServiceUrl to websocket service configuration #166

Merged
merged 1 commit into from
Jan 20, 2017

Conversation

rdhabalia
Copy link
Contributor

Motivation

Provide a way to deploy websocket-proxy service independently without accessing globalZookeeper. Right now, proxy-service requires globalZK for

  • fetching brokerService url for a given cluster
  • enabling authorization

So, Websocket-Proxy-service should have configuration to disable authorization and get broker-service url from the config.

Modifications

Broker-service url can be configured at Websocket-proxy-service.

Result

Websocket-proxy can be deployed without accessing global-zookeeper.

@rdhabalia rdhabalia added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Jan 17, 2017
@rdhabalia rdhabalia added this to the 1.16 milestone Jan 17, 2017
@rdhabalia rdhabalia self-assigned this Jan 17, 2017
import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.FieldContext;

public class ServiceConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of ServiceConfig, we could use something like WebSocketProxyConfiguration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, made the change to rename ServiceConfig with WebSocketProxyConfiguration

@yahoocla
Copy link

Thank you for submitting this pull request, however I do not see a valid CLA on file for you. Before we can merge this request please visit https://yahoocla.herokuapp.com/ and agree to the terms. Thanks! 😄

@yahoocla
Copy link

CLA is valid!

@merlimat
Copy link
Contributor

Is this PR depending on #168 ?

@rdhabalia
Copy link
Contributor Author

rdhabalia commented Jan 19, 2017

yes.. I will add change into #168 once this PR will be merged or other way around.

# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics
superUserRoles=

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we also have serviceUrl and serviceUrlTls here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I didn't add it into config to avoid confusion as serviceUrl is optional if globalZookeeperServers is already present.

We have still kept clusterName in config because it is required by Authorization, and proxy fetches serviceUrl from zookeeper if globalZookeeperServers and clusterName is present.

Do you think, we should still keep serviceUrl by providing comment with short description as mentioned above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. added serviceUrl and serviceUrlTls

@@ -163,7 +172,37 @@ private PulsarClient createClientInstance(ClusterData clusterData) throws IOExce
}
}

private static ClusterData createClusterData(WebSocketProxyConfiguration config) {
if (isNotBlank(config.getServiceUrl()) || isNotBlank(config.getServiceUrl())) {
return new ClusterData(config.getServiceUrl(), config.getServiceUrlTls());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have the same check twice? Intend to use config.getServiceUrlTls() ?

@rdhabalia rdhabalia force-pushed the proxy_zk branch 2 times, most recently from f7a0c92 to 32153fb Compare January 20, 2017 18:50
Copy link
Contributor

@saandrews saandrews left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@rdhabalia rdhabalia merged commit 0569689 into apache:master Jan 20, 2017
@rdhabalia rdhabalia deleted the proxy_zk branch January 23, 2017 22:04
sijie added a commit to sijie/pulsar that referenced this pull request Mar 4, 2018
hrsakai pushed a commit to hrsakai/pulsar that referenced this pull request Dec 10, 2020
Fixing a typo in a readme.md following changes done in apache#141
hangc0276 pushed a commit to hangc0276/pulsar that referenced this pull request May 26, 2021
**Fixes:**
 apache#166 apache#153 apache#99 

**Issue:**
KoP uses [Kafka-2.0.0](https://github.com/streamnative/kop/blob/78d9ba3487d4d7c85a5d667d45d9b38aaa7c824f/pom.xml#L46) which supports [API_VERSION's](https://kafka.apache.org/protocol.html#The_Messages_ApiVersions) **0** --> **2**

When **_Kafka-Clients-2.4.x+_**(using `API_VERSION:  3`) connects to KoP, it panics and following error stack is observed:
`10:22:23.281 [pulsar-io-22-4] ERROR io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - error to get Response ByteBuf:
java.lang.IllegalArgumentException: Invalid version for API key API_VERSIONS: 3
    at org.apache.kafka.common.protocol.ApiKeys.schemaFor(ApiKeys.java:312) ~[?:?]
    at org.apache.kafka.common.protocol.ApiKeys.responseSchema(ApiKeys.java:286) ~[?:?]
    at org.apache.kafka.common.requests.ApiVersionsResponse.toStruct(ApiVersionsResponse.java:129) ~[?:?]
    at org.apache.kafka.common.requests.ResponseUtils.serializeResponse(ResponseUtils.java:40) ~[?:?]`


**Resolved By:**
Returning an `UNSUPPORTED_VERSION` [error-code: 35](https://kafka.apache.org/protocol.html#protocol_error_codes), which would notify the **_kafka-client_** to lower it's `API_VERSION`. As no list of `ApiKeys & versions` were available for the **kafka-clients** to refer, it safely falls-back to using `API_VERSION:  0` and KoP continues processing the kafka-messages using `API_VERSION:  0`.

**Tested producing/consuming with Kafka-Clients:**

> 2.0.0
2.2.2
2.3.1
2.4.0
2.4.1
2.5.0
2.5.1
2.6.0


**More...**
KoP could have returned the list of supported `ApiKeys & versions` while sending the `UNSUPPORTED_VERSION` error-code, which would have made the **_kafka-client_** select the **_latest_** supported `API_VERSION` and use `API_VERSION:  2` instead of falling all the way back and using `API_VERSION:  0` 


Notes on how **_Kafka-Brokers_** is supposed to handle this scenario: 
> 2. On receiving ApiVersionsRequest, a broker returns its full list of supported ApiKeys and versions regardless of current authentication state (e.g., before SASL authentication on an SASL listener, do note that no Kafka protocol requests may take place on an SSL listener before the SSL handshake is finished). If this is considered to leak information about the broker version a workaround is to use SSL with client authentication which is performed at an earlier stage of the connection where the ApiVersionRequest is not available. Also, note that broker versions older than 0.10.0.0 do not support this API and will either ignore the request or close connection in response to the request.
> 
> 3. If multiple versions of an API are supported by broker and client, clients are recommended to use the latest version supported by the broker and itself.


_Reference: [Kafka-Protocol Guide](https://kafka.apache.org/protocol.html#api_versions)_

We analyzed how various **_Kafka-Brokers_** respond to a similar situation where the **_kafka-client's_** `API_VERSION` is higher than what is supported by the **_Kafka-Broker_**.

![Kafka-Broker-Client-Wireshark-Results](https://user-images.githubusercontent.com/63665447/91243701-34d3a500-e710-11ea-9752-f9980333ce1d.png)
_Reference: Wireshark packet captures - [Kafka-Protocol-Study.zip](https://github.com/streamnative/kop/files/5127018/Kafka-Protocol-Study.zip)_

From the study we can infer that, in a similar `API_VERSION` mismatch scenario the **_Kafka-Brokers_** doesn't return the list of supported `ApiKeys & versions` when notifying the **_kafka-client_** with the `UNSUPPORTED_VERSION` [error-code: 35](https://kafka.apache.org/protocol.html#protocol_error_codes). Thus, forcing the **_kafka-clients_** to fall-back to using `API_VERSION:  0`.

To keep KoP working in sync with the **_Kafka-Broker_** working, we decided not to return the list of supported `ApiKeys & versions`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants