-
Notifications
You must be signed in to change notification settings - Fork 582
Tutorial: Connection Observable sharing
* This page contains opinions — the reader may not agree with them
The library is designed to not allow for subscribing to RxBleDevice.establishConnection()
twice or more if the previously established connection has not been ended. An BleAlreadyConnectedException
is emitted to the second or next subscribers.
Quite often it happens that communication protocol between a BLE central and a peripheral is using a request-response pattern. Sometimes the response itself contains no information about the request that triggered it so it is only understandable with the request. A code that could model this looks like this:
Single.zip(
rxBleConnection.writeCharacteristic(uuid, data), // write the request
characteristicNotificationObservable.take(1).singleOrError(), // and await the first response
{ requestData, responseData -> /* interpretation */ }
)
The problem is that if one would try to call this code two times around the same moment the responses could get mixed. These situations tend to happen seemingly at random so they are hard to debug. To prevent them the BleAlreadyConnectedException
is emitted.
You may think:
Ok, I am already aware of this possibility but I still need to share the connection — how can I continue?
There are three major ways of of interacting with RxBleConnection
from different entry points. Starting from the best (in the opinion of the library's author) to the worst in terms of maintainability.
An approach derived from an excellent Jake Wharton's talk about Managing State with RxJava.
- Whole flow specified in a single place
- Easiest to follow and comprehend once familiar with Rx programming
- No state managing
- Most control
- Least error prone
- May be difficult for beginners
Click to expand
Every app tends to have a unique data flow. Let's say this time there is a need to do one thing when the connection is established, a thing that is not related to the connection and a second thing on the connection. All the things should happen during a single connection.
rxBleDevice.establishConnection(false)
.flatMapCompletable { connection ->
Completable.concat(listOf(
connection.discoverServices().ignoreElement(), // one thing with connection
otherCompletableNotRelatedToConnection,
connection.writeCharacteristic(uuid, data).ignoreElement() // second thing with connection
))
}
.subscribe()
2. Multiple subscribes with RxReplayingShare
Already mentioned Jake Wharton has published a library that allows for easy sharing of an Observable
. The difference between standard RxJava .share()
operator and this utility is that if an emission from the shared Observable
has already happened before a second subscription the previous emission would get replayed.
- Easy to follow given all
.subscribe()
points - No state managing
- Lack of control over how many connections interactions will happen
Click to expand
val sharedConnectionObs = rxBleDevice.establishConnection(false).compose(ReplayingShare.instance())
sharedConnectionObs.subscribe({ /* ignored */ }, { /* ignored */ } // used only to keep the connection alive during otherCompletableNotRelatedToConnection below
Completable.concat(listOf(
sharedConnectionObs.flatMap { /* do something with the connection */ }.take(1).ignoreElements(),
otherCompletableNotRelatedToConnection,
sharedConnectionObs.flatMap { /* do something else with the connection */ }.take(1).ignoreElements()
)).subscribe()
Over how many connections the above Completable
will happen?
One cannot be sure. If the sharedConnectionObs
would disconnect while otherCompletableNotRelatedToConnection
is running the second sharedConnectionObs
would try to connect to the peripheral again. Is this a problem? It may be — depending on the use-case.
- Easy to understand for people coming from object oriented programming background
- Good control
- Stateful (and therefore error prone)
- Poorly models asynchronous nature of connections
Taken from this issue by davordev
Click to expand
The whole idea behind this is to have a single source of truth for active bluetooth connection. So, in order to do anything with the ble device, you should call bleSessionManager.connection
which will either return an active connection or throw an exception (preferably set by developer) notifying the user that the connection isn't established. In order to establish an connection, the following code should be called first:
bleSessionManager.apply {
setMacAddress("MAC_ADDRESS_OF_DEVICE")
setupConnection()
}
Preferably, you can call RxBleDevice.observeConnectionStateChanges() to track the connection state.
If you want to terminate the connection, you can either:
- call
bleSessionManager.disconnect()
to close down the connection instantly, or - call
bleSessionManager.disconnectButFirst { [..code to execute..] }
to close down the connection after the code has been executed
BleSessionManager.kt
/**
* Manages Bluetooth connection.
* @property rxBleClient RxBleClient
* @property mSchedulerProvider SchedulerProvider
* @property eventStream CompositeDisposable
* @property macAddress String?
* @property disconnectTriggerSubject PublishSubject<Boolean>
* @property _connection Observable<RxBleConnection>?
* @property connection Observable<RxBleConnection>
* @constructor
*/
class BleSessionManager(
val rxBleClient: RxBleClient,
val mSchedulerProvider: SchedulerProvider
) {
private val eventStream: CompositeDisposable = CompositeDisposable()
private var macAddress: String? = null
private val disconnectTriggerSubject: PublishSubject<Boolean> = PublishSubject.create<Boolean>()
private var _connection: Observable<RxBleConnection>? = null
var connection: Observable<RxBleConnection>
get() {
return _connection ?: connectionFallback()
}
private set(value) {
this._connection = value
}
/**
* Fallback in case connection isn't valid.
* @return Observable<RxBleConnection>
*/
private fun connectionFallback(): Observable<RxBleConnection> {
return Observable.error(NotConnectedException())
}
/**
* Used to set mac address.
* @param macAddress String
*/
fun setMacAddress(macAddress: String) {
this.macAddress = macAddress
}
/**
* Configures connection. Called after `setMacAddress()`.
* @throws NotFoundException May throw NotFoundException if mac address is not set.
*/
fun setupConnection() {
val address = this.macAddress ?: throw NotFoundException()
val observable = rxBleClient.getBleDevice(address)
.establishConnection(true, Timeout(BLE_CONNECT_OPERATION_TIMEOUT, TimeUnit.SECONDS))
.takeUntil(disconnectTriggerSubject)
.compose(ReplayingShare.instance())
// Subscribe actions should be implemented even though they have no implementation.
val disposable = observable
.subscribeOn(mSchedulerProvider.io())
.retry(10)
.subscribe(
{},
{},
{}
)
eventStream.add(disposable)
connection = observable
}
/**
* Shuts down the connection and clears event stream.
*/
private fun closeConnection() {
disconnectTriggerSubject.onNext(true)
eventStream.clear()
}
/**
* Nullifies connection. Triggered by End User.
*/
fun disconnect() {
closeConnection()
_connection = null
}
/**
* Nullifies connection, but executes the given code before. Triggered by End User.
*/
fun disconnectButFirst(extensionCode: () -> Unit) {
extensionCode()
disconnect()
}
}
BluetoothService.kt
class BluetoothService(
val bluetoothClient: RxBleClient,
val bluetoothScanSettings: ScanSettings,
val mSchedulerProvider: SchedulerProvider,
val bleSessionManager: BleSessionManager
) : IBluetoothService {
private fun observeConnection(device: RxBleDevice): Observable<ConnectionState> {
return device
.observeConnectionStateChanges()
.throttleLatest(500, TimeUnit.MILLISECONDS)
.subscribeOn(mSchedulerProvider.io())
.map { it.mapToConnectionState() }
}
override fun connectToDevice(macAddress: String): Observable<ConnectionState> {
val device = bluetoothClient.getBleDevice(macAddress)
bleSessionManager.setMacAddress(macAddress)
bleSessionManager.setupConnection()
return observeConnection(device)
}
override fun getDeviceShadow(): Single<HelixDeviceShadow> {
return bleSessionManager.connection
.subscribeOn(mSchedulerProvider.io())
.firstOrError()
.flatMap { it.readCharacteristic(MY_UUID) }
.map {
it.mapToMyResponse()
}
.doOnError { it.mapBluetoothGattOperationExceptions() }
}
override fun disconnect() {
var disposable: Disposable? = null
disposable = bleSessionManager.connection
.subscribeOn(mSchedulerProvider.io())
.firstOrError()
.flatMap {
it.writeCharacteristic(SLEEP_UUID, putHelixToSleep())
}
.ignoreElement()
.onErrorComplete()
.doFinally {
disposable?.dispose()
otaUpdateDisposables.clear()
bleSessionManager.disconnect()
}
.subscribe()
}