-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added spark streaming custom receiver for pulsar #296
Conversation
pulsarClient = PulsarClient.create(url, clientConfiguration); | ||
consumerConfiguration.setMessageListener(new MessageListener() { | ||
public void received(Consumer consumer, Message msg) { | ||
store(new String(msg.getData())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid the copy and pass the buffer instead?
Does store() throw any exception? If so, we should also handle it right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we change from extends Receiver<String>
to extends Receiver<byte[]>
, we will be able to avoid the copy.
Does it look good ?
} | ||
|
||
private static final Logger log = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also update documentation .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I'll make it.
}); | ||
pulsarClient.subscribe(topic, subscription, consumerConfiguration); | ||
} catch (PulsarClientException e) { | ||
restart(e.getMessage(), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception is logged by restart()? Else it's better to log it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are situations where client failures can be seen for a reasonably long period of time, so you might want to think about resource consumption in that case given that restart() will recurse into onStart().
- you create a new PulsarClient on each call, turning the previous one into garbage.
- you create a new MessageListener implementation on each call, with similar garbage.
i'd set those up in the constructor while leaving the call to subscribe() to onStart().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@saandrews
I added logging process.
@msb-at-yahoo
I reimplemented like that.
|
||
public void onStop() { | ||
try { | ||
if (pulsarClient != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this transition valid? onStart -> onStop -> onStart?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onStart and onStop are called only once.
onStart -> onStop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. the reason i asked is that it's now a bit asymmetric after you made the change i suggested to onStart(): onStop() invalidates the invariants that onStart() relies upon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I confirmed that when restart()
is called, onStop()
-> onStart()
is called.
As we create a client in the constructor, it cannot start subscribing when restarting.
I think we should create a client in onStart().
I added documentation. |
private String topic; | ||
private String subscription; | ||
|
||
public SparkStreamingPulsarReceiver( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please format the files with the Eclipse formatter profile that can be found at src/formatter.xml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I applied the format.
this.topic = topic; | ||
this.subscription = subscription; | ||
consumerConfiguration.setMessageListener((consumer, msg) -> { | ||
store(new String(msg.getData())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if store()
fails?
If you want to retry the operation, you should then block the listener thread until the store()
succeeds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't find the case that store()
fails in the official documents and others.
I think spark streaming is mainly used for calculating statistical data and so on and we don't have to mind some data lost in those use cases.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are 2 options in case the store()
calls fails with exception:
- Don't ack the message and rely on ack-timeout to replay the message some time later (eg: 1min)
- Just ack the message anyway and move on, basically ignoring the exception.
I prefer option 1 by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose option 1.
- AckTimeout is set if it's not set
- Added try block
- When failing to store, consumer doesn't ack
- The message will be resent based on AckTimeout in consumerConfiguration.
pulsarClient.subscribe(topic, subscription, consumerConfiguration); | ||
} catch (PulsarClientException e) { | ||
log.error("Failed to start subscription : {}", e.getMessage()); | ||
restart("Restart a consumer"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't be possible to just propagate the exception at this point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PulsarClientException is so a checked exception that it has to be caught here.
If we try to propagate like following, onStart() cannot be overrode.
public void onStart() throws PulsarClientExceptio{
catch (PulsarClientException e) {
throw new PulsarClientException("error!");
}
this.topic = topic; | ||
this.subscription = subscription; | ||
consumerConfiguration.setMessageListener((consumer, msg) -> { | ||
store(new String(msg.getData())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
besides the concerns from @saandrews regarding the data copy, by using String will this prevent Spark users from consuming binary messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, that is right.
CustomReceiver may have to extend Receiver<byte[]>
to make users be able consume binary messages.
I will implement like so.
5b6879c
to
547fb8a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Fix apache#290 This PR supports continuous offset for kop. Since this PR disable the orginal design of mapping between Pulsar `MessageId` and Kafka `offset`. So I just ignore some UnitTest based on the original desing. Maybe we can raise up another issue to track how we deal with these ignored tests.
Fixes apache#312 These tests were ignored temporarily just because they rely on the outdated methods that convert between MessageId and Kafka offset. So this PR fixes these tests and deletes these outdated methods. The exception is testBrokerRespectsPartitionsOrderAndSizeLimits, it's a broken test that is easily affected by some params. apache#287 and apache#246 are tracking the test.
Motivation
Recent years, more and more people are interested in Apache Spark for machine learning or something and there is demand for flowing data from message queue into Apache Spark.
Modifications
Added Spark Streaming Custom Receiver for Pulsar and its test.
Result
Pulsar can be used as a data source for Apache Spark.