Skip to content

Commit

Permalink
fix(cli): fix user properties option of pub command
Browse files Browse the repository at this point in the history
  • Loading branch information
Red-Asuka authored and ysfscream committed Sep 14, 2022
1 parent 7d65f4f commit bc999a3
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import pump from 'pump'
import concat from 'concat-stream'
import { Writable } from 'readable-stream'
import split2 from 'split2'
import { IClientPublishOptions } from 'mqtt'

const send = (options: any) => {
const send = (options: any, pubOptions: IClientPublishOptions) => {
const client = mqtt.connect(options)
client.on('connect', () => {
client.publish(options.topic, options.message, options, (err) => {
const { topic, message } = options
client.publish(topic, message, pubOptions, (err) => {
if (err) {
console.warn(err)
}
Expand All @@ -21,13 +23,13 @@ const send = (options: any) => {
})
}

const multisend = (options: any) => {
const multisend = (options: any, pubOptions: IClientPublishOptions) => {
const client = mqtt.connect(options)
const sender = new Writable({
objectMode: true,
})
sender._write = (line, _enc, cb) => {
client.publish(options.topic, line.trim(), options, cb)
client.publish(options.topic, line.trim(), pubOptions, cb)
}

client.on('connect', () => {
Expand All @@ -42,11 +44,7 @@ const multisend = (options: any) => {

const pub = (options: any) => {
options.protocolVersion = options.mqttVersion
if (options.protocolVersion === 5) {
if (options.userProperties) {
options.properties = { userProperties: options.userProperties }
}
} else if (options.protocolVersion === 3) {
if (options.protocolVersion === 3) {
options.protocolId = 'MQIsdp'
}

Expand Down Expand Up @@ -78,19 +76,32 @@ const pub = (options: any) => {
options.will.retain = options.willRetain
}

const pubOptions: IClientPublishOptions = {}

if (typeof options.qos === 'number') {
pubOptions.qos = options.qos
}
if (typeof options.retain === 'boolean') {
pubOptions.retain = options.retain
}
if (options.protocolVersion === 5) {
const { userProperties } = options
userProperties && (pubOptions.properties = { userProperties })
}

if (options.stdin) {
if (options.multiline) {
multisend(options)
multisend(options, pubOptions)
} else {
process.stdin.pipe(
concat((data) => {
options.message = data
send(options)
send(options, pubOptions)
}),
)
}
} else {
send(options)
send(options, pubOptions)
}
}

Expand Down

0 comments on commit bc999a3

Please sign in to comment.