Skip to content

Commit

Permalink
feat: promises support (#1644)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertsLando authored Jul 25, 2023
1 parent 8521888 commit d02e176
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 21 deletions.
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,17 @@ Also user can manually register topic-alias pair using PUBLISH topic:'some', ta:
## API

- [`mqtt.connect()`](#connect)
- [`mqtt.connectAsync()`](#connect-async)
- [`mqtt.Client()`](#client)
- [`mqtt.Client#connect()`](#client-connect)
- [`mqtt.Client#publish()`](#publish)
- [`mqtt.Client#publishAsync()`](#publish-async)
- [`mqtt.Client#subscribe()`](#subscribe)
- [`mqtt.Client#subscribeAsync()`](#subscribe-async)
- [`mqtt.Client#unsubscribe()`](#unsubscribe)
- [`mqtt.Client#unsubscribeAsync()`](#unsubscribe-async)
- [`mqtt.Client#end()`](#end)
- [`mqtt.Client#endAsync()`](#end-async)
- [`mqtt.Client#removeOutgoingMessage()`](#removeOutgoingMessage)
- [`mqtt.Client#reconnect()`](#reconnect)
- [`mqtt.Client#handleMessage()`](#handleMessage)
Expand Down Expand Up @@ -350,6 +355,12 @@ at every connect.
For all MQTT-related options, see the [Client](#client)
constructor.

<a name="connect-async"></a>

### connectAsync([url], options)

Async [`connect`](#connect). Returns a `Promise` that resolves to a `mqtt.Client` instance.

---

<a name="client"></a>
Expand Down Expand Up @@ -574,6 +585,12 @@ Publish a message to a topic
- `callback` - `function (err)`, fired when the QoS handling completes,
or at the next tick if QoS 0. An error occurs if client is disconnecting.

<a name="publish-async"></a>

### mqtt.Client#publishAsync(topic, message, [options])

Async [`publish`](#publish). Returns a `Promise<void>`.

---

<a name="subscribe"></a>
Expand Down Expand Up @@ -601,6 +618,12 @@ Subscribe to a topic or topics
- `topic` is a subscribed to topic
- `qos` is the granted QoS level on it

<a name="subscribe-async"></a>

### mqtt.Client#subscribeAsync(topic/topic array/topic object, [options])

Async [`subscribe`](#subscribe). Returns a `Promise<granted[]>`.

---

<a name="unsubscribe"></a>
Expand All @@ -615,6 +638,12 @@ Unsubscribe from a topic or topics
- `userProperties`: The User Property is allowed to appear multiple times to represent multiple name, value pairs `object`
- `callback` - `function (err)`, fired on unsuback. An error occurs if client is disconnecting.

<a name="unsubscribe-async"></a>

### mqtt.Client#unsubscribeAsync(topic/topic array, [options])

Async [`unsubscribe`](#unsubscribe). Returns a `Promise<void>`.

---

<a name="end"></a>
Expand All @@ -636,6 +665,12 @@ Close the client, accepts the following options:
- `callback`: will be called when the client is closed. This parameter is
optional.

<a name="end-async"></a>

### mqtt.Client#endAsync([force], [options])

Async [`end`](#end). Returns a `Promise<void>`.

---

<a name="removeOutgoingMessage"></a>
Expand Down
109 changes: 97 additions & 12 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -897,9 +897,11 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
// .publish(topic, payload, cb);
if (typeof opts === 'function') {
callback = opts as DoneCallback
opts = {} as IClientPublishOptions
opts = null
}

opts = opts || {}

// default opts
const defaultOpts: IClientPublishOptions = {
qos: 0,
Expand Down Expand Up @@ -968,9 +970,32 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
callback,
})
}

return this
}

public publishAsync(topic: string, message: string | Buffer): Promise<void>
public publishAsync(
topic: string,
message: string | Buffer,
opts?: IClientPublishOptions,
): Promise<void>
public publishAsync(
topic: string,
message: string | Buffer,
opts?: IClientPublishOptions,
): Promise<void> {
return new Promise((resolve, reject) => {
this.publish(topic, message, opts, (err) => {
if (err) {
reject(err)
} else {
resolve()
}
})
})
}

/**
* subscribe - subscribe to <topic>
*
Expand Down Expand Up @@ -1192,6 +1217,28 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
return this
}

public subscribeAsync(
topicObject: string | string[] | ISubscriptionMap,
): Promise<ISubscriptionGrant[]>
public subscribeAsync(
topicObject: string | string[] | ISubscriptionMap,
opts?: IClientSubscribeOptions | IClientSubscribeProperties,
): Promise<ISubscriptionGrant[]>
public subscribeAsync(
topicObject: string | string[] | ISubscriptionMap,
opts?: IClientSubscribeOptions | IClientSubscribeProperties,
): Promise<ISubscriptionGrant[]> {
return new Promise((resolve, reject) => {
this.subscribe(topicObject, opts, (err, granted) => {
if (err) {
reject(err)
} else {
resolve(granted)
}
})
})
}

/**
* unsubscribe - unsubscribe from topic(s)
*
Expand Down Expand Up @@ -1297,6 +1344,26 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
return this
}

public unsubscribeAsync(topic: string | string[]): Promise<void>
public unsubscribeAsync(
topic: string | string[],
opts?: IClientSubscribeOptions,
): Promise<void>
public unsubscribeAsync(
topic: string | string[],
opts?: IClientSubscribeOptions,
): Promise<void> {
return new Promise((resolve, reject) => {
this.unsubscribe(topic, opts, (err) => {
if (err) {
reject(err)
} else {
resolve()
}
})
})
}

/**
* end - close connection
*
Expand Down Expand Up @@ -1324,25 +1391,21 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this.log('end :: (%s)', this.options.clientId)

if (force == null || typeof force !== 'boolean') {
cb = (opts || this.noop) as DoneCallback
cb = cb || (opts as DoneCallback)
opts = force as Partial<IDisconnectPacket>
force = false
if (typeof opts !== 'object') {
cb = opts
opts = null
if (typeof cb !== 'function') {
cb = this.noop
}
}
}

if (typeof opts !== 'object') {
cb = opts
cb = cb || opts
opts = null
}

this.log('end :: cb? %s', !!cb)
cb = cb || this.noop

if (!cb || typeof cb !== 'function') {
cb = this.noop
}

const closeStores = () => {
this.log('end :: closeStores: closing incoming and outgoing stores')
Expand Down Expand Up @@ -1414,6 +1477,28 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
return this
}

public endAsync(): Promise<void>
public endAsync(force?: boolean): Promise<void>
public endAsync(opts?: Partial<IDisconnectPacket>): Promise<void>
public endAsync(
force?: boolean,
opts?: Partial<IDisconnectPacket>,
): Promise<void>
public endAsync(
force?: boolean | Partial<IDisconnectPacket>,
opts?: Partial<IDisconnectPacket>,
): Promise<void> {
return new Promise((resolve, reject) => {
this.end(force as boolean, opts, (err) => {
if (err) {
reject(err)
} else {
resolve()
}
})
})
}

/**
* removeOutgoingMessage - remove a message in outgoing store
* the outgoing callback will be called withe Error('Message removed') if the message is removed
Expand Down Expand Up @@ -1547,7 +1632,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
}
}

private _checkDisconnecting(callback: GenericCallback<any>) {
private _checkDisconnecting(callback?: GenericCallback<any>) {
if (this.disconnecting) {
if (callback && callback !== this.noop) {
callback(new Error('client disconnecting'))
Expand Down
66 changes: 65 additions & 1 deletion src/lib/connect/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
/* eslint-disable @typescript-eslint/no-var-requires */
import url from 'url'
import MqttClient, { IClientOptions, MqttProtocol } from '../client'
import MqttClient, {
IClientOptions,
MqttClientEventCallbacks,
MqttProtocol,
} from '../client'
import IS_BROWSER from '../is-browser'
import Store from '../store'
import DefaultMessageIdProvider from '../default-message-id-provider'
Expand Down Expand Up @@ -177,4 +181,64 @@ function connect(
return client
}

function connectAsync(brokerUrl: string): Promise<MqttClient>
function connectAsync(opts: IClientOptions): Promise<MqttClient>
function connectAsync(
brokerUrl: string,
opts?: IClientOptions,
): Promise<MqttClient>
function connectAsync(
brokerUrl: string | IClientOptions,
opts?: IClientOptions,
allowRetries = true,
): Promise<MqttClient> {
return new Promise((resolve, reject) => {
const client = connect(brokerUrl as string, opts)

const promiseResolutionListeners: Partial<MqttClientEventCallbacks> = {
connect: (connack) => {
removePromiseResolutionListeners()
resolve(client) // Resolve on connect
},
end: () => {
removePromiseResolutionListeners()
resolve(client) // Resolve on end
},
error: (err) => {
removePromiseResolutionListeners()
client.end()
reject(err) // Reject on error
},
}

// If retries are not allowed, reject on close
if (allowRetries === false) {
promiseResolutionListeners.close = () => {
promiseResolutionListeners.error(
new Error("Couldn't connect to server"),
)
}
}

// Remove listeners added to client by this promise
function removePromiseResolutionListeners() {
Object.keys(promiseResolutionListeners).forEach((eventName) => {
client.off(
eventName as keyof MqttClientEventCallbacks,
promiseResolutionListeners[eventName],
)
})
}

// Add listeners to client
Object.keys(promiseResolutionListeners).forEach((eventName) => {
client.on(
eventName as keyof MqttClientEventCallbacks,
promiseResolutionListeners[eventName],
)
})
})
}

export default connect
export { connectAsync }
1 change: 0 additions & 1 deletion src/lib/connect/tls.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ const buildStream: StreamBuilder = (client, opts) => {
)

const connection = tls.connect(opts)
/* eslint no-use-before-define: [2, "nofunc"] */
connection.on('secureConnect', () => {
if (opts.rejectUnauthorized && !connection.authorized) {
connection.emit('error', new Error('TLS not authorized'))
Expand Down
4 changes: 3 additions & 1 deletion src/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ import MqttClient from './lib/client'
import DefaultMessageIdProvider from './lib/default-message-id-provider'
import UniqueMessageIdProvider from './lib/unique-message-id-provider'
import Store, { IStore } from './lib/store'
import connect from './lib/connect'
import connect, { connectAsync } from './lib/connect'

export const Client = MqttClient
export {
connect,
connectAsync,
MqttClient,
Store,
DefaultMessageIdProvider,
UniqueMessageIdProvider,
IStore,
}
export * from './lib/client'
export { ReasonCodes } from './lib/handlers/ack'
Loading

0 comments on commit d02e176

Please sign in to comment.