Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncClient::get_stream() support unbounded channel #191

Merged

Conversation

fbrouille
Copy link
Contributor

Sometimes, a MQTT broker provides a lot of topics with a retain message for each of them.
Within current implementation, AsyncClient relies on a bounded channel that may be fully filled very quickly, even if messages are consumed and processed very fast. Fully filled the bounded channel results in losing messages. Note that I tried to determinate the relevant channel buffer capacity, but this might be difficult.
This PR aims to fix losing messages by using an unbounded channel if buffer_sz is 0. This way, the underlying channel will grow up automatically to buffer all received messages.
Of course, this is not the ideal solution for all use cases because of the memory footprint. I think that a MQTT broker usually does not provide so many topics with retain messages. So generally setting buffer_sz to a positive number is the way to go.

Signed-off-by: fbrouille <fbrouille@users.noreply.github.com>
@fpagliughi fpagliughi added this to the v0.12.1 milestone Feb 28, 2023
@fpagliughi
Copy link
Contributor

A few people have asked about this kind of thing since the original sync consumer. I'll merge this with the next point release, but may reconsider the API... having a special integer value for "no limit" seems like such a C thing to do :-)

Maybe an Into<Option<usize>>? Or am I getting complicated?

But also, we should be consistent with start_consuming() although these are backwards, I guess.

…usize>

Signed-off-by: fbrouille <fbrouille@users.noreply.github.com>
@fbrouille
Copy link
Contributor Author

IMO Option<usize> makes sense: None for an unbounded channel and Some(capacity) for a bounded channel.
I have also added some comments on the parameter buffer_sz.

@fpagliughi
Copy link
Contributor

Yeah, nice. Thanks for updating it. Maybe I'll rename the parameter buffer_limit or something like that, then the option will make more sense... A "limit" of "None" logically means unbounded.

But I think I will take it a step further and make it into an Into<Option<usize>>. That way it won't break backward compatibility with the existing code. You can say:

cli.get_stream(32);

which would convert to cli.get_stream(Some(32)) automatically.

Thanks for the PR.

@fpagliughi fpagliughi merged commit fd60f84 into eclipse-paho:develop Mar 9, 2023
@fpagliughi
Copy link
Contributor

To be clear... The PR, as submitted, broke the build of the examples:

error[E0308]: mismatched types
    --> examples/async_subscribe.rs:69:39
     |
69   |         let mut strm = cli.get_stream(25);
     |                            ---------- ^^ expected enum `Option`, found integer
     |                            |
     |                            arguments to this method are incorrect
     |
     = note: expected enum `Option<usize>`
                found type `{integer}`
note: associated function defined here
    --> /home/fmp/mqtt/paho.mqtt.rust/src/async_client.rs:1119:12
     |
1119 |     pub fn get_stream(&mut self, buffer_sz: Option<usize>) -> AsyncReceiver<Option<Message>> {
     |            ^^^^^^^^^^
help: try wrapping the expression in `Some`
     |
69   |         let mut strm = cli.get_stream(Some(25));
     |                                       +++++  +

but changing it to an Into<Option<usize>> gets the functionality you want without breaking backward compatibility.

pub fn get_stream<L>(&mut self, buffer_lim: L) -> AsyncReceiver<Option<Message>>
where
    L: Into<Option<usize>>,
{
    let (tx, rx) = match buffer_lim.into() {
        None => async_channel::unbounded(),
        Some(lim) => async_channel::bounded(lim),
    };

    ...
}

Plus it's nicer to be able to call the function for a bounded channel without needing the Some().

@fbrouille
Copy link
Contributor Author

thank you for the fix

@fpagliughi
Copy link
Contributor

Thanks again for the PR. I will update the synchronous channel version to do the same thing when I do the next major update, when I can make some breaking changes (v0.13)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants