-
Notifications
You must be signed in to change notification settings - Fork 102
/
sync_consume_v5.rs
203 lines (174 loc) · 6.99 KB
/
sync_consume_v5.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// paho-mqtt/examples/sync_consume_v5.rs
// This is a Paho MQTT v5 Rust client, sample application.
//
//! This application is an MQTT v5 consumer/subscriber using the Rust
//! synchronous client interface, which uses the queuing API to
//! receive messages.
//!
//! It also uses MQTT v5 subscription identifiers to create an indexed table
//! for processing messages based on the subscribed topic.
//!
//! The sample demonstrates:
//! - Connecting to an MQTT server/broker
//! - Checking server responses
//! - Subscribing to multiple topics
//! - MQTT v5 subscription identifiers
//! - Receiving messages through the queueing consumer API
//! - Recieving and acting upon commands via MQTT topics
//! - Manual reconnects
//! - Using a persistent (non-clean) session
//! - Last will and testament
//! - Using ^C handler for a clean exit
//!
/*******************************************************************************
* Copyright (c) 2020-2023 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v20.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
use paho_mqtt as mqtt;
use std::{env, thread, time::Duration};
// --------------------------------------------------------------------------
// Handlers for different types of incoming messages based on their
// Subscription Identifiers
// Handler for data messages (i.e. topic "data/#")
// Subscription ID: 1
fn data_handler(msg: mqtt::Message) -> bool {
println!("{}", msg);
true
}
// Handler for command messages (i.e. topic "command")
// Return false to exit the application
// Subscription ID: 2
fn command_handler(msg: mqtt::Message) -> bool {
let cmd = msg.payload_str();
if cmd == "exit" {
println!("Exit command received");
false
}
else {
println!("Received command: '{}'", cmd);
true
}
}
// --------------------------------------------------------------------------
// This will attempt to reconnect to the broker. It can be called after
// connection is lost. In this example, we try to reconnect several times,
// with a few second pause between each attempt. A real system might keep
// trying indefinitely, with a backoff, or something like that.
fn try_reconnect(cli: &mqtt::Client) -> bool {
println!("Connection lost. Attempting to reconnect...");
for _ in 0..60 {
thread::sleep(Duration::from_secs(1));
if cli.reconnect().is_ok() {
println!(" Successfully reconnected");
return true;
}
}
println!("Unable to reconnect after several attempts.");
false
}
// Create a set of poperties with a single Subscription ID
fn sub_id(id: i32) -> mqtt::Properties {
mqtt::properties![
mqtt::PropertyCode::SubscriptionIdentifier => id
]
}
// --------------------------------------------------------------------------
fn main() -> mqtt::Result<()> {
// Initialize the logger from the environment
env_logger::init();
let host = env::args()
.nth(1)
.unwrap_or_else(|| "mqtt://localhost:1883".to_string());
println!("Connecting to the MQTT broker at '{}'...", host);
// Create the client. Use an ID for a persistent session.
// A real system should try harder to use a unique ID.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id("rust_sync_consumer_v5")
.finalize();
let cli = mqtt::Client::new(create_opts)?;
// Initialize the consumer before connecting
let rx = cli.start_consuming();
// Define the set of options for the connection
let lwt = mqtt::MessageBuilder::new()
.topic("lwt")
.payload("Sync consumer v5 lost connection")
.finalize();
// Connect with MQTT v5 and a persistent server session (no clean start).
// For a persistent v5 session, we must set the Session Expiry Interval
// on the server. Here we set that requests will persist for an hour
// (3600sec) if the service disconnects or restarts.
let conn_opts = mqtt::ConnectOptionsBuilder::new_v5()
.clean_start(false)
.properties(mqtt::properties![mqtt::PropertyCode::SessionExpiryInterval => 3600])
.will_message(lwt)
.finalize();
// A table of dispatch function for incoming messages by Subscription ID.
// (actually sub_id-1 since we can't use zero for a subscription ID)
let handler: Vec<fn(mqtt::Message) -> bool> = vec![data_handler, command_handler];
// Make the connection to the broker
let rsp = cli.connect(conn_opts)?;
// We're connecting with a persistent session. So we check if
// the server already knows about us and rembers our subscription(s).
// If not, we subscribe for incoming requests.
if let Some(conn_rsp) = rsp.connect_response() {
println!(
"Connected to: '{}' with MQTT version {}",
conn_rsp.server_uri, conn_rsp.mqtt_version
);
if conn_rsp.session_present {
println!(" w/ client session already present on broker.");
}
else {
// Register subscriptions on the server, using Subscription ID's.
println!(r#"Subscribing to topics ["data/#", "command"]..."#);
cli.subscribe_with_options("data/#", 0, None, sub_id(1))?;
cli.subscribe_with_options("command", 1, None, sub_id(2))?;
}
}
// ^C handler will stop the consumer, breaking us out of the loop, below
let ctrlc_cli = cli.clone();
ctrlc::set_handler(move || {
ctrlc_cli.stop_consuming();
})
.expect("Error setting Ctrl-C handler");
// Just loop on incoming messages.
// If we get a None message, check if we got disconnected,
// and then try a reconnect.
println!("\nWaiting for messages...");
for msg in rx.iter() {
if let Some(msg) = msg {
// In a real app you'd want to do a lot more error checking and
// recovery, but this should give an idea about the basics.
let sub_id = msg
.properties()
.get_int(mqtt::PropertyCode::SubscriptionIdentifier)
.expect("No Subscription ID") as usize;
if !handler[sub_id - 1](msg) {
break;
}
}
else if cli.is_connected() || !try_reconnect(&cli) {
break;
}
}
// If we're still connected, then disconnect now,
// otherwise we're already disconnected.
if cli.is_connected() {
println!("\nDisconnecting");
cli.disconnect(None).unwrap();
}
println!("Exiting");
Ok(())
}