Skip to content

Commit

Permalink
fix: use explicit connect packet and infer types from mqtt-packet (
Browse files Browse the repository at this point in the history
  • Loading branch information
robertsLando authored Jul 24, 2023
1 parent 0f29bff commit 2a49ed3
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 159 deletions.
259 changes: 102 additions & 157 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import mqttPacket, {
IUnsubscribePacket,
Packet,
QoS,
UserProperties,
ISubackPacket,
IConnectPacket,
} from 'mqtt-packet'
Expand Down Expand Up @@ -120,6 +119,8 @@ export type AckHandler = (
) => void

export interface IClientOptions extends ISecureClientOptions {
/** CLIENT PROPERTIES */

/** Encoding to use. Example 'binary' */
encoding?: BufferEncoding
/** Set browser buffer size. Default to 512KB */
Expand All @@ -134,8 +135,9 @@ export interface IClientOptions extends ISecureClientOptions {
authPacket?: Partial<IAuthPacket>
/** Disable/Enable writeToStream.cacheNumbers */
writeCache?: boolean

/** Should be set to `host` */
servername?: string
/** The default protocol to use when using `servers` and no protocol is specified */
defaultProtocol?: MqttProtocol
/** Support clientId passed in the query string of the url */
query?: Record<string, string>
Expand All @@ -156,26 +158,7 @@ export interface IClientOptions extends ISecureClientOptions {

/** Websocket options */
wsOptions?: ClientOptions | ClientRequestArgs | DuplexOptions
/**
* 10 seconds, set to 0 to disable
*/
keepalive?: number
/**
* 'mqttjs_' + Math.random().toString(16).substr(2, 8)
*/
clientId?: string
/**
* 'MQTT'
*/
protocolId?: string
/**
* 4
*/
protocolVersion?: number
/**
* true, set to false to receive QoS 1 and 2 messages while offline
*/
clean?: boolean

/**
* 1000 milliseconds, interval between two reconnections
*/
Expand All @@ -184,14 +167,7 @@ export interface IClientOptions extends ISecureClientOptions {
* 30 * 1000 milliseconds, time to wait before a CONNACK is received
*/
connectTimeout?: number
/**
* the username required by your broker, if any
*/
username?: string
/**
* the password required by your broker, if any
*/
password?: Buffer | string

/**
* a Store for the incoming packets
*/
Expand All @@ -207,10 +183,16 @@ export interface IClientOptions extends ISecureClientOptions {
/** Custom log function, default uses `debug` */
log?: (...args: any[]) => void

/** automatically use topic alias */
autoUseTopicAlias?: boolean

/** automatically assign topic alias */
autoAssignTopicAlias?: boolean

/** Set to false to disable ping reschedule. When enabled ping messages are rescheduled on each message sent */
reschedulePings?: boolean

/** List of broker servers. On each reconnect try the next server will be used */
servers?: Array<{
host: string
port: number
Expand All @@ -228,67 +210,67 @@ export interface IClientOptions extends ISecureClientOptions {
* true, set to false to disable re-subscribe functionality
*/
resubscribe?: boolean
/**
* a message that will sent by the broker automatically when the client disconnect badly.
*/
will?: {
/**
* the topic to publish
*/
topic: string
/**
* the message to publish
*/
payload: Buffer | string
/**
* the QoS
*/
qos: QoS
/**
* the retain flag
*/
retain: boolean
/*
* properies object of will
* */
properties?: {
willDelayInterval?: number
payloadFormatIndicator?: boolean
messageExpiryInterval?: number
contentType?: string
responseTopic?: string
correlationData?: Buffer
userProperties?: UserProperties
}

authPacket?: any

/** Prevent to call `connect` in constructor */
manualConnect?: boolean
}

/** when defined this function will be called to transform the url string generated by MqttClient from provided options */
transformWsUrl?: (
url: string,
options: IClientOptions,
client: MqttClient,
) => string

/** see `connect` packet: https://github.com/mqttjs/mqtt-packet/blob/master/types/index.d.ts#L65 */
properties?: {
sessionExpiryInterval?: number
receiveMaximum?: number
maximumPacketSize?: number
topicAliasMaximum?: number
requestResponseInformation?: boolean
requestProblemInformation?: boolean
userProperties?: UserProperties
authenticationMethod?: string
authenticationData?: Buffer
}
/** Custom message id provider */
messageIdProvider?: IMessageIdProvider

/** When using websockets, this is the timeout used when writing to socket. Default 1000 (1s) */
browserBufferTimeout?: number

/**
* When using websockets, this sets the `objectMode` option.
* When in objectMode, streams can push Strings and Buffers
* as well as any other JavaScript object.
* Another major difference is that when in objectMode,
* the internal buffering algorithm counts objects rather than bytes.
* This means if we have a Transform stream with the highWaterMark option set to 5,
* the stream will only buffer a maximum of 5 objects internally
*/
objectMode?: boolean

/** CONNECT PACKET PROPERTIES */

/**
* 'mqttjs_' + Math.random().toString(16).substr(2, 8)
*/
clientId?: string
/**
* 3=MQTT 3.1 4=MQTT 3.1.1 5=MQTT 5.0. Defaults to 4
*/
protocolVersion?: IConnectPacket['protocolVersion']
/**
* 'MQTT'
*/
protocolId?: IConnectPacket['protocolId']
/**
* true, set to false to receive QoS 1 and 2 messages while offline
*/
clean?: boolean
/**
* 10 seconds, set to 0 to disable
*/
keepalive?: number
/**
* the username required by your broker, if any
*/
username?: string
/**
* the password required by your broker, if any
*/
password?: Buffer | string
/**
* a message that will sent by the broker automatically when the client disconnect badly.
*/
will?: IConnectPacket['will']
/** see `connect` packet: https://github.com/mqttjs/mqtt-packet/blob/master/types/index.d.ts#L65 */
properties?: IConnectPacket['properties']
}

export interface IClientPublishOptions {
Expand All @@ -307,55 +289,13 @@ export interface IClientPublishOptions {
/*
* MQTT 5.0 properties object
*/
properties?: {
payloadFormatIndicator?: boolean
messageExpiryInterval?: number
topicAlias?: number
responseTopic?: string
correlationData?: Buffer
userProperties?: UserProperties
subscriptionIdentifier?: number
contentType?: string
}
properties?: IPublishPacket['properties']
/**
* callback called when message is put into `outgoingStore`
*/
cbStorePut?: StorePutCallback
}
export interface IClientSubscribeOptions {
/**
* the QoS
*/
qos: QoS
/*
* no local flag
* */
nl?: boolean
/*
* Retain As Published flag
* */
rap?: boolean
/*
* Retain Handling option
* */
rh?: number
/*
* MQTT 5.0 properies object of subscribe
* */
properties?: {
subscriptionIdentifier?: number
userProperties?: UserProperties
}
}
export interface IClientSubscribeProperties {
/*
* MQTT 5.0 properies object of subscribe
* */
properties?: {
subscriptionIdentifier?: number
userProperties?: UserProperties
}
}

export interface IClientReconnectOptions {
/**
* a Store for the incoming packets
Expand All @@ -366,16 +306,18 @@ export interface IClientReconnectOptions {
*/
outgoingStore?: Store
}
export interface IClientSubscribeProperties {
/*
* MQTT 5.0 properies object of subscribe
* */
properties?: ISubscribePacket['properties']
}

export interface ISubscriptionGrant {
/**
* is a subscribed to topic
*/
topic: string
export interface IClientSubscribeOptions extends IClientSubscribeProperties {
/**
* is the granted qos level on it, may return 128 on error
* the QoS
*/
qos: QoS | number
qos: QoS
/*
* no local flag
* */
Expand All @@ -389,41 +331,26 @@ export interface ISubscriptionGrant {
* */
rh?: number
}
export interface ISubscriptionRequest extends IClientSubscribeProperties {
export interface ISubscriptionRequest extends IClientSubscribeOptions {
/**
* is a subscribed to topic
*/
topic: string
/**
* is the granted qos level on it
*/
qos: QoS
/*
* no local flag
* */
nl?: boolean
/*
* Retain As Published flag
* */
rap?: boolean
/*
* Retain Handling option
* */
rh?: number
}

export interface ISubscriptioOptions extends IClientSubscribeProperties {
qos: QoS
nl?: boolean
rap?: boolean
rh?: number
export interface ISubscriptionGrant
extends Omit<ISubscriptionRequest, 'qos' | 'properties'> {
/**
* is the granted qos level on it, may return 128 on error
*/
qos: QoS | 128
}

export type ISubscriptionMap = {
/**
* object which has topic names as object keys and as value the options, like {'test1': {qos: 0}, 'test2': {qos: 2}}.
*/
[topic: string]: ISubscriptioOptions
[topic: string]: IClientSubscribeOptions
} & {
resubscribe?: boolean
}
Expand Down Expand Up @@ -849,8 +776,26 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

// Send a connect packet
this.log('connect: sending packet `connect`')
const connectPacket: IConnectPacket = Object.create(this.options)
connectPacket.cmd = 'connect'

const connectPacket: IConnectPacket = {
cmd: 'connect',
protocolId: this.options.protocolId,
protocolVersion: this.options.protocolVersion,
clean: this.options.clean,
clientId: this.options.clientId,
keepalive: this.options.keepalive,
username: this.options.username,
password: this.options.password as Buffer,
properties: this.options.properties,
}

if (this.options.will) {
connectPacket.will = {
...this.options.will,
payload: this.options.will?.payload as Buffer,
}
}

if (this.topicAliasRecv) {
if (!connectPacket.properties) {
connectPacket.properties = {}
Expand Down Expand Up @@ -1201,7 +1146,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
const topics = []
subs.forEach((sub) => {
if (this.options.reconnectPeriod > 0) {
const topic: ISubscriptioOptions = { qos: sub.qos }
const topic: IClientSubscribeOptions = { qos: sub.qos }
if (version === 5) {
topic.nl = sub.nl || false
topic.rap = sub.rap || false
Expand Down
4 changes: 2 additions & 2 deletions test/abstract_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import handlePublish from '../src/lib/handlers/publish'
import {
IClientOptions,
IClientPublishOptions,
ISubscriptioOptions,
IClientSubscribeOptions,
ISubscriptionMap,
ISubscriptionRequest,
} from '../src/lib/client'
Expand Down Expand Up @@ -2223,7 +2223,7 @@ export default function abstractTest(server, config) {
it('should accept an options parameter', function _test(done) {
const client = connect()
const topic = 'test'
const opts: ISubscriptioOptions = { qos: 1 }
const opts: IClientSubscribeOptions = { qos: 1 }

client.once('connect', () => {
client.subscribe(topic, opts)
Expand Down

0 comments on commit 2a49ed3

Please sign in to comment.