Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message dispatching based on consumer priority-level #165

Merged
merged 2 commits into from
Feb 26, 2017

Conversation

rdhabalia
Copy link
Contributor

Motivation

Addressing #134 : Message dispatching based on consumer priority-level

Result

Shared mode consumer priority, the messages will only goes to the consumers with higher priority if there's permit, otherwise the message can also be pushed to the consumers with lower priority.

@rdhabalia rdhabalia added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Jan 12, 2017
@rdhabalia rdhabalia added this to the 1.16 milestone Jan 12, 2017
@rdhabalia rdhabalia self-assigned this Jan 12, 2017
@yahoocla
Copy link

CLA is valid!

@rdhabalia rdhabalia modified the milestones: 1.17, 1.16 Jan 20, 2017
Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change looks good, though I'd like to understand more the logic to see that it doesn't add any penalty when priorities are not used

* Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
* messages.
* </p>
* In Shared subscription mode, broker will first dispatch messages to upper priority-level consumers if they
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more intuitive to use descending priorities. (eg: 0=max-priority, 1, 2, ).

Priority 0 should also be the effective default applied to consumers that either:

  • Are running the old client version
  • Haven't set the configuration value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more intuitive to use descending priorities. (eg: 0=max-priority, 1, 2, ).

Actually, right now that's how the implementation is: 0=max-priority, 1=second, 2=..
sorting: consumerList.sort((c1, c2) -> c1.getPriorityLevel() - c2.getPriorityLevel());

I will update the documentation part to make it more clear.

}

// find next available unblocked consumer
int unblockedConsumerIndex = consumerIndex;
// index of resulting consumer which will be returned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic looks fine here, although it would be helpful to have a comment to explain the algorithm in its entirety.

Also, it would be good to verify here that there no performance regression in the case when priorities are not used. Eg. running a quick stress test and verify the CPU usage in broker doesn't shoot up compared to before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic looks fine here, although it would be helpful to have a comment to explain the algorithm in its entirety.

Added algorithm explaination

running a quick stress test and verify the CPU usage in broker doesn't shoot up compared to before.

I have run stress rest and I don't see impact in CPU usage. we don't perform extra logic when we have consumers on same priority-level.

without-changes of : Priority-level
screenshot from without-changes

with-changes of : Priority-level
screenshot from with-changes

Test
Shared-Subscription with 30 consumers

 @Test
    public void testSharedSubscptionConsumerLoad() throws Exception {
        final String topicName = "persistent://my-property/use/my-ns/my-topic1";
        final String subscriberName = "my-subscriber-name";
        final int totalConsumers = 30;

        ConsumerConfiguration conf = new ConsumerConfiguration();
        conf.setSubscriptionType(SubscriptionType.Shared);
        conf.setMessageListener((consumer, msg) -> {
            try {
                consumer.acknowledge(msg);
            } catch (PulsarClientException e) {
                e.printStackTrace();
            }
        });
        List<Consumer> consumers = Lists.newArrayList();
        for (int i = 0; i < totalConsumers; i++) {
            consumers.add(pulsarClient.subscribe(topicName, subscriberName, conf));
        }
        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1");
        byte[] msg = "my-message-".getBytes();
        int totalMsgs = Integer.MAX_VALUE;
        for (int i = 0; i < totalMsgs; i++) {
            producer.sendAsync(msg);
        }

    }

@rdhabalia rdhabalia force-pushed the priority branch 4 times, most recently from 92dc640 to 86743c4 Compare February 24, 2017 20:07
Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@merlimat merlimat merged commit cd2d16e into apache:master Feb 26, 2017
@rdhabalia rdhabalia deleted the priority branch June 21, 2017 18:55
sijie added a commit to sijie/pulsar that referenced this pull request Mar 4, 2018
- inject instance id in thread context. so we can have instance id information in function log
- rename dlog logging to bk logging, so all bk/dlog logging can be routed there
- add more variables to allow overrides by sys variables
- fixes some scripts
hrsakai pushed a commit to hrsakai/pulsar that referenced this pull request Dec 10, 2020
Signed-off-by: xiaolong.ran <rxl@apache.org>
hangc0276 added a commit to hangc0276/pulsar that referenced this pull request May 26, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants