Skip to content

Commit

Permalink
feat(cli): support setting the connect user properties in pub or sub …
Browse files Browse the repository at this point in the history
…command
  • Loading branch information
Red-Asuka authored and ysfscream committed Oct 10, 2022
1 parent 58b0b45 commit 718b2df
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 43 deletions.
104 changes: 74 additions & 30 deletions cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,16 @@ export class Commander {
this.program
.command('pub')
.description('Publish a message to a topic.')
.option('-V, --mqtt-version <5/3.1.1/3.1>', 'the MQTT version', parseMQTTVersion, 5)
.option('-h, --hostname <HOST>', 'the broker host', 'localhost')
.option('-p, --port <PORT>', 'the broker port', parseNumber)
.option('-i, --client-id <ID>', 'the client id', getClientId())
.option('-q, --qos <0/1/2>', 'the QoS of the message', parseNumber, 0)
.requiredOption('-t, --topic <TOPIC>', 'the message topic', parsePubTopic)
.option('-m, --message <MSG>', 'the message body', 'Hello From MQTT X CLI')
.option('-m, --message <BODY>', 'the message body', 'Hello From MQTT X CLI')
.option('-q, --qos <0/1/2>', 'the QoS of the message', parseNumber, 0)
.option('-r, --retain', 'send a retained message')
.option('-d, --dup', 'mark as duplicate flag')
.option('-s, --stdin', 'read the message body from stdin')
.option('-M, --multiline', 'read lines from stdin as multiple messages')
// properties options of MQTT 5.0
.option('-pf, --payload-format-indicator', 'the payload format indicator of the publish message')
.option('-me, --message-expiry-interval <NUMBER>', 'the lifetime of the publish message in seconds', parseNumber)
.option('-e, --message-expiry-interval <NUMBER>', 'the lifetime of the publish message in seconds', parseNumber)
.option(
'-ta, --topic-alias <NUMBER>',
'value that is used to identify the topic instead of using the topic name',
Expand All @@ -109,46 +106,64 @@ export class Commander {
'-cd, --correlation-data <DATA>',
'used by the sender of the request message to identify which request the response message is for when it is received',
)
.option(
'-up, --user-properties <USERPROPERTIES...>',
'the user properties of MQTT 5.0 (e.g. -up "name: mqttx cli")',
parseUserProperties,
)
.option('-si, --subscription-identifier <NUMBER>', 'the identifier of the subscription', parseNumber)
.option('-ct, --content-type <TYPE>', 'a description of the content of the publish message')
// connect options
.option('-V, --mqtt-version <5/3.1.1/3.1>', 'the MQTT version', parseMQTTVersion, 5)
.option('-h, --hostname <HOST>', 'the broker host', 'localhost')
.option('-p, --port <PORT>', 'the broker port', parseNumber)
.option('-i, --client-id <ID>', 'the client id', getClientId())
.option('--no-clean', 'set the clean session flag to false (default: true)')
.option('-k, --keepalive <SEC>', 'send a ping every SEC seconds', parseNumber, 30)
.option('-u, --username <USER>', 'the username')
.option('-P, --password <PASS>', 'the password')
.option('-l, --protocol <PROTO>', 'the protocol to use, mqtt or mqtts (default: mqtt)', parseProtocol)
.option('--key <PATH>', 'path to the key file')
.option('--cert <PATH>', 'path to the cert file')
.option('--ca <PATH>', 'path to the ca certificate')
.option('--insecure', 'do not verify the server certificate')
// connect properties options of MQTT 5.0
.option('-se, --session-expiry-interval <SECONDS>', 'the session expiry interval in seconds', parseNumber)
.option('--rcv-max, --receive-maximum <NUMBER>', 'the receive maximum value', parseNumber)
.option('--maximum-packet-size <NUMBER>', 'the maximum packet size the client is willing to accept', parseNumber)
.option('--topic-alias-maximum <NUMBER>', 'the topic alias maximum value', parseNumber)
.option('--req-response-info', 'the client requests response information from the server')
.option('--no-req-problem-info', 'the client requests problem information from the server')
.option(
'-up, --user-properties <USERPROPERTIES...>',
'the user properties of MQTT 5.0 (e.g. -up "name: mqttx cli")',
'-Cup, --conn-user-properties <USERPROPERTIES...>',
'the connect user properties of MQTT 5.0 (e.g. -up "name: mqttx cli")',
parseUserProperties,
)
// will message options
.option('-Wt, --will-topic <TOPIC>', 'the will topic')
.option('-Wm, --will-message <BODY>', 'the will message')
.option('-Wq, --will-qos <0/1/2>', 'the will qos', parseNumber)
.option('-Wr, --will-retain', 'send a will retained message')
// will message properties options of MQTT 5.0
.option('-Wd, --will-delay-interval <SECONDS>', 'the will delay interval in seconds', parseNumber)
.option('-Wpf, --will-payload-format-indicator', 'will message is UTF-8 encoded character data or not')
.option('-We, --will-message-expiry-interval <SECONDS>', 'lifetime of the will message in seconds', parseNumber)
.option('-Wct, --will-content-type <CONTENTTYPE>', 'description of the will message’s content')
.option('-Wrt, --will-response-topic <TOPIC>', 'topic name for a response message')
.option('-Wcd, --will-correlation-data <DATA>', 'correlation data for the response message')
.option(
'-Wup, --will-user-properties <USERPROPERTIES...>',
'the user properties of will message',
parseUserProperties,
)
.option('--will-topic <TOPIC>', 'the will topic')
.option('--will-message <BODY>', 'the will message')
.option('--will-qos <0/1/2>', 'the will qos', parseNumber, 0)
.option('--will-retain', 'send a will retained message')
.action(pub)

this.program
.command('sub')
.description('Subscribes to a topic.')
.option('-V, --mqtt-version <5/3.1.1/3.1>', 'the MQTT version', parseMQTTVersion, 5)
.option('-h, --hostname <HOST>', 'the broker host', 'localhost')
.option('-p, --port <PORT>', 'the broker port', parseNumber)
.option('-i, --client-id <ID>', 'the client id', getClientId())
.option('-q, --qos <0/1/2...>', 'the QoS of the message', parseQoS)
.option('--no-clean', 'set the clean session flag to false (default: true)')
.requiredOption('-t, --topic <TOPIC...>', 'the message topic')
.option('-k, --keepalive <SEC>', 'send a ping every SEC seconds', parseNumber, 30)
.option('-u, --username <USER>', 'the username')
.option('-P, --password <PASS>', 'the password')
.option('-l, --protocol <PROTO>', 'the protocol to use, mqtt or mqtts (default: mqtt)', parseProtocol)
.option('-q, --qos <0/1/2...>', 'the QoS of the message', parseQoS)
// properties options of MQTT 5.0
.option('-nl, --no_local [FLAG...]', 'the no local MQTT 5.0 flag', parseVariadicOfBooleanType)
.option(
'-rap, --retain-as-published [FLAG...]',
Expand All @@ -157,26 +172,55 @@ export class Commander {
)
.option('-rh, --retain-handling <0/1/2...>', 'the retain handling MQTT 5.0', parseQoS)
.option('-si, --subscription-identifier <NUMBER...>', 'the identifier of the subscription', parseNumber)
.option(
'-up, --user-properties <USERPROPERTIES...>',
'the user properties of MQTT 5.0 (e.g. -up "name: mqttx cli")',
parseUserProperties,
)
.option('-v, --verbose', 'print the topic before the message')
// connect options
.option('-V, --mqtt-version <5/3.1.1/3.1>', 'the MQTT version', parseMQTTVersion, 5)
.option('-h, --hostname <HOST>', 'the broker host', 'localhost')
.option('-p, --port <PORT>', 'the broker port', parseNumber)
.option('-i, --client-id <ID>', 'the client id', getClientId())
.option('--no-clean', 'set the clean session flag to false (default: true)')

.option('-u, --username <USER>', 'the username')
.option('-P, --password <PASS>', 'the password')
.option('-l, --protocol <PROTO>', 'the protocol to use, mqtt or mqtts (default: mqtt)', parseProtocol)
.option('--key <PATH>', 'path to the key file')
.option('--cert <PATH>', 'path to the cert file')
.option('--ca <PATH>', 'path to the ca certificate')
.option('--insecure', 'do not verify the server certificate')
// connect properties options of MQTT 5.0
.option('-se, --session-expiry-interval <SECONDS>', 'the session expiry interval in seconds', parseNumber)
.option('--rcv-max, --receive-maximum <NUMBER>', 'the receive maximum value', parseNumber)
.option('--maximum-packet-size <NUMBER>', 'the maximum packet size the client is willing to accept', parseNumber)
.option('--topic-alias-maximum <NUMBER>', 'the topic alias maximum value', parseNumber)
.option('--req-response-info', 'the client requests response information from the server')
.option('--no-req-problem-info', 'the client requests problem information from the server')
.option(
'-up, --user-properties <USERPROPERTIES...>',
'the user properties of MQTT 5.0 (e.g. -up "name: mqttx cli")',
'-Cup, --conn-user-properties <USERPROPERTIES...>',
'the connect user properties of MQTT 5.0 (e.g. -up "name: mqttx cli")',
parseUserProperties,
)
// will message options
.option('-Wt, --will-topic <TOPIC>', 'the will topic')
.option('-Wm, --will-message <BODY>', 'the will message')
.option('-Wq, --will-qos <0/1/2>', 'the will qos', parseNumber)
.option('-Wr, --will-retain', 'send a will retained message')
// will message properties options of MQTT 5.0
.option('-Wd, --will-delay-interval <SECONDS>', 'the will delay interval in seconds', parseNumber)
.option('-Wpf, --will-payload-format-indicator', 'will message is UTF-8 encoded character data or not')
.option('-We, --will-message-expiry-interval <SECONDS>', 'lifetime of the will message in seconds', parseNumber)
.option('-Wct, --will-content-type <CONTENTTYPE>', 'description of the will message’s content')
.option('-Wrt, --will-response-topic <TOPIC>', 'topic name for a response message')
.option('-Wcd, --will-correlation-data <DATA>', 'correlation data for the response message')
.option(
'-Wup, --will-user-properties <USERPROPERTIES...>',
'the user properties of will message',
parseUserProperties,
)
.option('--will-topic <TOPIC>', 'the will topic')
.option('--will-message <BODY>', 'the will message')
.option('--will-qos <0/1/2>', 'the will qos', parseNumber)
.option('--will-retain', 'send a will retained message')
.option('-v, --verbose', 'print the topic before the message')
.action(sub)
}
}
Expand Down
1 change: 0 additions & 1 deletion cli/src/lib/conn.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as mqtt from 'mqtt'
import * as fs from 'fs'
import signale from '../utils/signale'
import { parseConnectOptions } from '../utils/parse'

Expand Down
7 changes: 3 additions & 4 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as mqtt from 'mqtt'
import * as fs from 'fs'
import pump from 'pump'
import concat from 'concat-stream'
import { Writable } from 'readable-stream'
Expand All @@ -10,15 +9,15 @@ import { parseConnectOptions, parsePublishOptions } from '../utils/parse'

const send = (
connOpts: IClientOptions,
pubOpts: { topic: string; message?: string | Buffer; opts: IClientPublishOptions },
pubOpts: { topic: string; message: string | Buffer; opts: IClientPublishOptions },
) => {
const client = mqtt.connect(connOpts)
signale.await('Connecting...')
client.on('connect', () => {
signale.success('Connected')
const { topic, message } = pubOpts
signale.await('Message Publishing...')
client.publish(topic, message!, pubOpts.opts, (err) => {
client.publish(topic, message, pubOpts.opts, (err) => {
if (err) {
signale.warn(err)
} else {
Expand All @@ -35,7 +34,7 @@ const send = (

const multisend = (
connOpts: IClientOptions,
pubOpts: { topic: string; message?: string | Buffer; opts: IClientPublishOptions },
pubOpts: { topic: string; message: string | Buffer; opts: IClientPublishOptions },
) => {
const client = mqtt.connect(connOpts)
signale.await('Connecting...')
Expand Down
3 changes: 1 addition & 2 deletions cli/src/lib/sub.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as mqtt from 'mqtt'
import * as fs from 'fs'
import { signale, msgLog } from '../utils/signale'
import { parseConnectOptions, parseSubscribeOptions } from '../utils/parse'

Expand All @@ -13,7 +12,7 @@ const sub = (options: SubscribeOptions) => {
client.on('connect', () => {
signale.success('Connected')

const subOptsArray = parseSubscribeOptions(options, 'sub')
const subOptsArray = parseSubscribeOptions(options)

const { topic } = options

Expand Down
5 changes: 4 additions & 1 deletion cli/src/types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ declare global {

interface PublishOptions extends ConnectOptions {
topic: string
message?: string | Buffer
message: string | Buffer
qos: QoS
retain?: boolean
dup?: boolean
Expand All @@ -60,16 +60,19 @@ declare global {
correlationData?: string
subscriptionIdentifier?: number
contentType?: string
connUserProperties?: Record<string, string>
}

interface SubscribeOptions extends ConnectOptions {
topic: string[]
qos?: QoS[]
// properties of MQTT 5.0
no_local?: boolean[]
retainAsPublished?: boolean[]
retainHandling?: QoS[]
subscriptionIdentifier?: number[]
verbose: boolean
connUserProperties?: Record<string, string>
}
}

Expand Down
14 changes: 9 additions & 5 deletions cli/src/utils/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ const parsePubTopic = (value: string) => {
return value
}

const parseConnectOptions = (options: ConnectOptions, commandType?: CommandType) => {
const parseConnectOptions = (
options: ConnectOptions | PublishOptions | SubscribeOptions,
commandType?: CommandType,
) => {
const {
mqttVersion,
hostname,
Expand All @@ -93,7 +96,6 @@ const parseConnectOptions = (options: ConnectOptions, commandType?: CommandType)
topicAliasMaximum,
reqResponseInfo,
reqProblemInfo,
userProperties,
willTopic,
willMessage,
willQos,
Expand Down Expand Up @@ -170,14 +172,16 @@ const parseConnectOptions = (options: ConnectOptions, commandType?: CommandType)
if (mqttVersion === 3) {
connectOptions.protocolId = 'MQIsdp'
} else if (mqttVersion === 5) {
const userProperties =
commandType === 'conn' ? options.userProperties : (<PublishOptions | SubscribeOptions>options).connUserProperties
const properties = {
sessionExpiryInterval,
receiveMaximum,
maximumPacketSize,
topicAliasMaximum,
requestResponseInformation: reqResponseInfo,
requestProblemInformation: reqProblemInfo,
userProperties: commandType === 'conn' ? userProperties : undefined,
userProperties,
}

if (clean === false) {
Expand Down Expand Up @@ -239,7 +243,7 @@ const parsePublishOptions = (options: PublishOptions) => {
return { topic, message, opts: publishOptions }
}

const parseSubscribeOptions = (options: SubscribeOptions, commandType?: CommandType) => {
const parseSubscribeOptions = (options: SubscribeOptions) => {
const {
mqttVersion,
topic,
Expand All @@ -264,7 +268,7 @@ const parseSubscribeOptions = (options: SubscribeOptions, commandType?: CommandT
if (mqttVersion === 5) {
const properties = {
subscriptionIdentifier: getSpecialTypesOption(subscriptionIdentifier as number[], index),
userProperties: commandType === 'sub' ? userProperties : undefined,
userProperties,
}

subOptions.properties = Object.fromEntries(
Expand Down

0 comments on commit 718b2df

Please sign in to comment.