Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix NPE when MultiTopicsConsumerImpl receives null value messages #9113

Conversation

BewareMyPower
Copy link
Contributor

Motivation

#6379 introduced the feature to handle null value messages, but it only checks the null value in ConsumerImpl when INCOMING_MESSAGES_SIZE_UPDATER is updated. Therefore, if a partitioned topic with at least 2 partitions was consumed with a null value message, the NPE would be thrown.

Modifications

  • Check the null value message in MultiTopicsConsumerImpl as well as ConsumerImpl. To reduce repeated code, two protected methods are added to ConsumerBase and INCOMING_MESSAGES_SIZE_UPDATER becomes private now, the derived consumer classes just use these two methods to update or reset INCOMING_MESSAGES_SIZE_UPDATER.
  • Add tests for partitioned topics in NullValueTest. Since the existed tests rely on the message send order, here we only send messages to a single partition only.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • Added tests to NullValueTest for partitioned topics

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

@sijie sijie added this to the 2.8.0 milestone Jan 4, 2021
@sijie sijie merged commit dd3b9d8 into apache:master Jan 4, 2021
@BewareMyPower BewareMyPower deleted the bewaremypower/fix-multi-topics-consumer-null-value branch January 5, 2021 01:04
@codelipenghui
Copy link
Contributor

Remove from release 2.6.3 since conflict with #9046

@sijie
Copy link
Member

sijie commented Jan 6, 2021

@codelipenghui it might worth cherry-picking the change and submit a separate pull request.

codelipenghui pushed a commit that referenced this pull request Jan 6, 2021
)

[#6379](#6379) introduced the feature to handle null value messages, but it only checks the null value in `ConsumerImpl` when `INCOMING_MESSAGES_SIZE_UPDATER` is updated. Therefore, if a partitioned topic with at least 2 partitions was consumed with a null value message, the NPE would be thrown.

- Check the null value message in `MultiTopicsConsumerImpl` as well as `ConsumerImpl`. To reduce repeated code, two protected methods are added to `ConsumerBase` and `INCOMING_MESSAGES_SIZE_UPDATER` becomes private now, the derived consumer classes just use these two methods to update or reset `INCOMING_MESSAGES_SIZE_UPDATER`.
- Add tests for partitioned topics in `NullValueTest`. Since the existed tests rely on the message send order, here we only send messages to a single partition only.

(cherry picked from commit dd3b9d8)
codelipenghui pushed a commit that referenced this pull request Jan 7, 2021
)

### Motivation

[#6379](#6379) introduced the feature to handle null value messages, but it only checks the null value in `ConsumerImpl` when `INCOMING_MESSAGES_SIZE_UPDATER` is updated. Therefore, if a partitioned topic with at least 2 partitions was consumed with a null value message, the NPE would be thrown.

### Modifications

- Check the null value message in `MultiTopicsConsumerImpl` as well as `ConsumerImpl`. To reduce repeated code, two protected methods are added to `ConsumerBase` and `INCOMING_MESSAGES_SIZE_UPDATER` becomes private now, the derived consumer classes just use these two methods to update or reset `INCOMING_MESSAGES_SIZE_UPDATER`.
- Add tests for partitioned topics in `NullValueTest`. Since the existed tests rely on the message send order, here we only send messages to a single partition only.

(cherry picked from commit dd3b9d8)
codelipenghui added a commit to codelipenghui/incubator-pulsar that referenced this pull request Jan 12, 2021
codelipenghui added a commit that referenced this pull request Jan 13, 2021
### Motivation

Fix incoming message size issue that introduced in #9113. We should decrease the incoming message size when taking messages from the queue and increase the incoming message size while adding messages to the queue. With #9113, will always increase the incoming queue size.

### Modifications

Add method `increaseIncomingSize` and `decreaseIncomingSize`

### Verifying this change

Add a new test for verifying the incoming message size should be zero while the incoming queue size is zero.
codelipenghui added a commit that referenced this pull request Jan 13, 2021
Fix incoming message size issue that introduced in #9113. We should decrease the incoming message size when taking messages from the queue and increase the incoming message size while adding messages to the queue. With #9113, will always increase the incoming queue size.

Add method `increaseIncomingSize` and `decreaseIncomingSize`

Add a new test for verifying the incoming message size should be zero while the incoming queue size is zero.

(cherry picked from commit 2cee0a8)
codelipenghui added a commit that referenced this pull request Jan 14, 2021
### Motivation

Fix incoming message size issue that introduced in #9113. We should decrease the incoming message size when taking messages from the queue and increase the incoming message size while adding messages to the queue. With #9113, will always increase the incoming queue size.

### Modifications

Add method `increaseIncomingSize` and `decreaseIncomingSize`

### Verifying this change

Add a new test for verifying the incoming message size should be zero while the incoming queue size is zero.

(cherry picked from commit 2cee0a8)
freeznet pushed a commit to streamnative/pulsar-archived that referenced this pull request Jan 14, 2021
…e#9182)

### Motivation

Fix incoming message size issue that introduced in apache#9113. We should decrease the incoming message size when taking messages from the queue and increase the incoming message size while adding messages to the queue. With apache#9113, will always increase the incoming queue size.

### Modifications

Add method `increaseIncomingSize` and `decreaseIncomingSize`

### Verifying this change

Add a new test for verifying the incoming message size should be zero while the incoming queue size is zero.

(cherry picked from commit 2cee0a8)
merlimat pushed a commit to merlimat/pulsar that referenced this pull request Apr 6, 2021
…e#9182)

### Motivation

Fix incoming message size issue that introduced in apache#9113. We should decrease the incoming message size when taking messages from the queue and increase the incoming message size while adding messages to the queue. With apache#9113, will always increase the incoming queue size.

### Modifications

Add method `increaseIncomingSize` and `decreaseIncomingSize`

### Verifying this change

Add a new test for verifying the incoming message size should be zero while the incoming queue size is zero.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants