Небольшое предисловие, или в чем же боль
В последнее время я активно работаю над приложениями, которые имеют модули работы с Bluetooth по не-очень-хорошо спроектированным протоколам с кастомными устройствами, что периодически добавляет мне интересных угу, как же проблем.
Поскольку я искренний фанат реактивности в приложениях, то такие проблемы приходилось решать собственными силами, поскольку решений в сети просто нет. Совсем. О получившейся архитектуре работы с Bluetooth-устройствами я и хотел бы вам рассказать.
Опасности на пути джедая
Первый важный момент, о котором должен помнить разработчик, при работе с Bluetooth – пакеты могут повреждаться по пути. А еще – они могут сопровождаться шумом. И это не один случай из миллиона, подобные явления могут встречаться довольно часто, и их нужно обрабатывать. Еще блютус может отключиться, или не подключиться, или сделать вид что подключился, но на самом то деле мы знаем, что это ничего не значит...
В качестве примера решения этих задач, спроектируем микро-фреймворк для процессинга эвентов, которые детерминируются по типам с помощью шапки (первые N байт) и валидируются с помощью какой-нибудь простенькой чек-суммы. Для того, чтобы не загромождать код, примем допущение, что шапка по протоколу имеет фиксированный размер. Все пакеты же разделим на два типа: с фиксированной длиной, и с динамической, передаваемой отдельным байтом.
Проектирование
Начнем с описания возможных эвентов в приложении. Итак, общая абстракция будет выглядеть примерно так, с учетом принятых ограничений:
sealed class Event {
val headSize: Int = 2
abstract val head: ByteArray
abstract fun isCorrupted(): Boolean
//To be continued
}
Далее, когда мы определили наборы постоянных свойств для всех пакетов, требуется как-либо формализовать условия, при которых мы:
- Посчитаем, что пакет принадлежит какому-либо типу
- Должны добавить в буффер байт, так как пока что пакет не собирается
- Должны грохнуть буффер, так как какие-либо условия для его сборки не выполнились (этот пункт нужен скорее для подстраховки, лучше добавить туда логи во время тестирования приложения, чтобы проверять полноту остальных условий)
- Пробуем собрать пакет из буффера и проверяем его валидность
Данные четыре условия приводят нас к интерфейсу следующего вида:
interface EventMatcher {
val headSize: Int
fun matches(packet: ByteBuffer): Boolean
fun create(packet: ByteBuffer): Event
fun shouldBuffer(packet: ByteBuffer): Boolean
fun shouldDrop(packet: ByteBuffer): Boolean
}
Создадим компонент, который будет предоставлять сказал бы, что удобный, но это оставлю на ваше усмотрение прокси-интерфейс к нашим матчерам для всех существующих типов, ничего выдающегося, код под катом:
class EventMatchersAdapter {
private val matchers = mutableMapOf<KClass<out Event>, EventMatcher>()
fun register(event: KClass<out Event>, matcher: EventMatcher)
= apply { matchers.put(event, matcher) }
fun unregister(event: KClass<out Event>)
= apply { matchers.remove(event) }
fun knownEvents(): List<KClass<out Event>>
= matchers.keys.toList()
fun matches(packet: ByteBuffer, event: KClass<out Event>): Boolean
= matchers[event]?.matches(packet) ?: false
fun shouldBuffer(packet: ByteBuffer, event: KClass<out Event>): Boolean
= matchers[event]?.shouldBuffer(packet) ?: false
fun shouldDrop(packet: ByteBuffer, event: KClass<out Event>): Boolean
= matchers[event]?.shouldDrop(packet) ?: false
fun create(packet: ByteBuffer, event: KClass<out Event>): Event?
= matchers[event]?.create(packet)
}
В пакетах опишем способ определения того, был данный пакет поврежден или нет. Это довольно удобный подход, который позволяет не сильно страдать из-за плохо спроектированного протокола, в котором инженеру вздумалось закинуть вам сотню способов проверки пакетов на корректность, для каждого по несколько.
data class A(override val head: ByteArray,
val payload: ByteArray,
val checksum: Byte): Event() {
companion object {
//(two bytes of head) + (2 bytes of payload) + (byte of checksum)
@JvmStatic val length = 5.toByte()
@JvmStatic val headValue = byteArrayOf(0x00, 0x00)
@JvmStatic val matcherValue = object: EventMatcher {
override val headSize: Int = 2
override fun matches(packet: ByteBuffer): Boolean {
if(packet.position() == 0) return true
if(packet.position() == 1) return packet[0] == headValue[0]
return packet[0] == headValue[0]
&& packet[1] == headValue[1]
}
override fun create(packet: ByteBuffer): A {
packet.rewind()
return A(
ByteArray(2, { packet.get() }),
ByteArray(2, { packet.get() }),
packet.get()
)
}
override fun shouldBuffer(packet: ByteBuffer): Boolean
= packet.position() < length
override fun shouldDrop(packet: ByteBuffer): Boolean
= packet.position() > length
}
}
override fun isCorrupted(): Boolean = checksumOf(payload) != checksum
override fun equals(other: Any?): Boolean {
if(other as? A == null) return false
other as A
return Arrays.equals(head, other.head)
&& Arrays.equals(payload, other.payload)
&& checksum == other.checksum
}
override fun hashCode(): Int {
var result = Arrays.hashCode(head)
result = result * 31 + Arrays.hashCode(payload)
result = result * 31 + checksum.hashCode()
return result
}
}
data class C(override val head: ByteArray,
val length: Byte,
val payload: ByteArray,
val checksum: Byte): Event() {
companion object {
@JvmStatic val headValue = byteArrayOf(0x01, 0x00)
@JvmStatic val matcherValue = object: EventMatcher {
override val headSize: Int = 2
override fun matches(packet: ByteBuffer): Boolean {
if(packet.position() == 0) return true
if(packet.position() == 1) return packet[0] == headValue[0]
return packet[0] == headValue[0]
&& packet[1] == headValue[1]
}
override fun create(packet: ByteBuffer): C {
packet.rewind()
val msb = packet.get()
val lsb = packet.get()
val length = packet.get()
return C(
byteArrayOf(msb, lsb),
length,
packet.take(3, length.toPositiveInt()),
packet.get()
)
}
override fun shouldBuffer(packet: ByteBuffer): Boolean
= when(packet.position()) {
in 0..2 -> true
else -> packet.position() < (packet[2].toPositiveInt() + 4) //increase by (2 bytes of head) + (1 byte of length) + (1 byte of checksum)
}
override fun shouldDrop(packet: ByteBuffer): Boolean
= when(packet.position()) {
in 0..2 -> false
else -> packet.position() > (packet[2].toPositiveInt() + 4) //increase by (2 bytes of head) + (1 byte of length) + (1 byte of checksum)
}
}
}
override fun isCorrupted(): Boolean = checksumOf(payload) != checksum
override fun equals(other: Any?): Boolean {
if(other as? C == null) return false
other as C
return Arrays.equals(head, other.head)
&& length == other.length
&& Arrays.equals(payload, other.payload)
&& checksum == other.checksum
}
override fun hashCode(): Int {
var result = Arrays.hashCode(head)
result = result * 31 + length.hashCode()
result = result * 31 + Arrays.hashCode(payload)
result = result * 31 + checksum.hashCode()
return result
}
}
Далее – от нас требуется описать сам алгоритм считывания пакетов, причем такой, который будет:
- Поддерживать несколько различных типов
- Разруливать повреждения пакетов за нас
- Будет дружить с Flowable
Реализация алгоритма скрытого за Subscriber интерфейсом:
class EventsBridge(private val adapter: EventMatchersAdapter,
private val emitter: FlowableEmitter<Event>,
private val bufferSize: Int = 128): DisposableSubscriber<Byte>() {
private val buffers: Map<KClass<out Event>, ByteBuffer>
= mutableMapOf<KClass<out Event>, ByteBuffer>()
.apply {
for(knownEvent in adapter.knownEvents()) {
put(knownEvent, ByteBuffer.allocateDirect(bufferSize))
}
}
.toMap()
override fun onError(t: Throwable) {
emitter.onError(t)
}
override fun onComplete() {
emitter.onComplete()
}
override fun onNext(t: Byte) {
for((key, value) in buffers) {
value.put(t)
adapter.knownEvents()
.filter { it == key }
.forEach {
if (adapter.matches(value, it)) {
when {
adapter.shouldDrop(value, it) -> {
value.clear()
}
!adapter.shouldBuffer(value, it) -> {
val event = adapter.create(value, it)
if (!emitter.isCancelled
&& event != null
&& !event.isCorrupted()) {
release()
emitter.onNext(event)
} else {
value.clear()
}
}
}
} else {
value.clear()
}
}
}
}
private fun release() {
for(buffer in buffers) buffer.value.clear()
}
}
Использование
Рассмотрим на примере прогонки unit-тестов:
@Test
fun test_single_fixedLength() {
val adapter = EventMatchersAdapter()
.register(Event.A::class, Event.A.matcherValue)
val packetA = generateCorrectPacketA()
val testSubscriber = TestSubscriber<Event>()
Flowable.create<Event>(
{ emitter ->
val bridge = EventsBridge(adapter, emitter)
Flowable.create<Byte>({ byteEmitter -> for(byte in packetA) { byteEmitter.onNext(byte) } }, BackpressureStrategy.BUFFER).subscribe(bridge)
},
BackpressureStrategy.BUFFER
)
.subscribe(testSubscriber)
testSubscriber.assertNoErrors()
testSubscriber.assertValue { event ->
event is Event.A && !event.isCorrupted()
}
}
@Test
fun test_multiple_dynamicLength_mixed_withNoise() {
val adapter = EventMatchersAdapter()
.register(Event.C::class, Event.C.matcherValue)
.register(Event.D::class, Event.D.matcherValue)
val packetC1 = generateCorrectPacketC()
val packetD1 = generateCorrectPacketD()
val packetD2 = generateCorruptedPacketD()
val packetC2 = generateCorruptedPacketC()
val testSubscriber = TestSubscriber<Event>()
val random = Random()
Flowable.create<Event>(
{ emitter ->
val bridge = EventsBridge(adapter, emitter)
Flowable.create<Byte>({ byteEmitter ->
for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }
for(byte in packetC1) { byteEmitter.onNext(byte) }
for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }
for(byte in packetD1) { byteEmitter.onNext(byte) }
for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }
for(byte in packetD2) { byteEmitter.onNext(byte) }
for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }
for(byte in packetC2) { byteEmitter.onNext(byte) }
for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }
}, BackpressureStrategy.BUFFER).subscribe(bridge)
},
BackpressureStrategy.BUFFER
)
.subscribe(testSubscriber)
testSubscriber.assertNoErrors()
testSubscriber.assertValueCount(2)
}
private fun generateCorrectPacketB(): ByteArray {
val rnd = Random()
val payload = byteArrayOf(
rnd.nextInt().toByte(),
rnd.nextInt().toByte(),
rnd.nextInt().toByte(),
rnd.nextInt().toByte()
)
return byteArrayOf(
Event.B.headValue[0],
Event.B.headValue[1],
payload[0],
payload[1],
payload[2],
payload[3],
checksumOf(payload)
)
}
private fun generateCorrectPacketC(): ByteArray {
val rnd = Random()
val payload = List(rnd.nextInt(16), { index ->
rnd.nextInt().toByte()
}).toByteArray()
return ByteArray(4 + payload.size, { index ->
when(index) {
0 -> Event.C.headValue[0]
1 -> Event.C.headValue[1]
2 -> payload.size.toByte()
in 3..(4 + payload.size - 2) -> payload[index - 3]
4 + payload.size - 1 -> checksumOf(payload)
else -> 0.toByte()
}
})
}
private fun generateCorruptedPacketB(): ByteArray {
val rnd = Random()
val payload = byteArrayOf(
rnd.nextInt().toByte(),
rnd.nextInt().toByte(),
rnd.nextInt().toByte(),
rnd.nextInt().toByte()
)
return byteArrayOf(
Event.B.headValue[0],
Event.B.headValue[1],
payload[0],
payload[1],
payload[2],
payload[3],
(checksumOf(payload) + 1.toByte()).toByte()
)
}
private fun generateCorruptedPacketC(): ByteArray {
val rnd = Random()
val payload = List(rnd.nextInt(16), { _ -> rnd.nextInt().toByte() }).toByteArray()
return ByteArray(4 + payload.size, { index ->
when(index) {
0 -> Event.C.headValue[0]
1 -> Event.C.headValue[1]
2 -> payload.size.toByte()
in 3..(4 + payload.size - 2) -> payload[index - 3]
else -> (checksumOf(payload) + 1.toByte()).toByte()
}
})
}
inline fun checksumOf(data: ByteArray): Byte {
var result = 0x00.toByte()
for(b in data) {
result = (result + b).toByte()
}
return (result.inv() + 1.toByte()).toByte()
}
И зачем все это было нужно?
На этом примере, мне хотелось бы показать, как легко и непринужденно можно поддерживать модульность при обработке почти что произвольных событий, к слову, не обязательно пришедших из Bluetooth источника (никакого Bluetooth-зависимого кода пока-что не было), при этом избегая возможных повреждений пакетов и зашумления канала связи.
И что дальше?
Сделаем небольшую обертку над RxBluetooth, которая позволит нам в реактивном стиле работать с различными подключениями, слушая различные наборы эвентов.
Весь код условно можно разделить на три набора компонент: два сервиса и один репозиторий.
Сервисы будут у нас предоставлять подключение и работу с данными по подключению соответственно, а репозиторий – предоставлять абстракцию для работы с конкретными подключениями и выступать в роли неявного flyweight-а подключений.
Интерфейсы будут примерно следующими:
interface ConnectivityService {
fun sub(service: UUID): Observable<DataService>
}
interface DataService {
fun sub(): Flowable<Event>
fun write(data: ByteArray): Boolean
fun dispose()
}
interface DataRepository {
fun sub(serviceUUID: UUID): Flowable<Event>
fun write(serviceUUID: UUID, data: ByteArray): Flowable<Boolean>
fun dispose()
}
И, соответственно, реализации под катом
class ConnectivityServiceImpl(private val bluetooth: RxBluetooth,
private val events: EventMatchersAdapter,
private val timeoutSeconds: Long = 15L): ConnectivityService {
override fun sub(service: UUID): Observable<DataService> = when(bluetooth.isBluetoothEnabled && bluetooth.isBluetoothAvailable) {
false -> Observable.empty()
else -> {
ensureBluetoothNotDiscovering()
bluetooth.startDiscovery()
bluetooth.observeDevices()
.filter { device -> device.uuids.contains(ParcelUuid(service)) }
.timeout(timeoutSeconds, TimeUnit.SECONDS)
.take(1)
.doOnNext { _ -> ensureBluetoothNotDiscovering() }
.doOnError { _ -> ensureBluetoothNotDiscovering() }
.doOnComplete { -> ensureBluetoothNotDiscovering() }
.flatMap { device -> bluetooth.observeConnectDevice(device, service) }
.map { connection -> DataServiceImpl(BluetoothConnection(connection), events) }
}
}
private fun ensureBluetoothNotDiscovering() {
if(bluetooth.isDiscovering) {
bluetooth.cancelDiscovery()
}
}
}
class DataServiceImpl constructor(private val connection: BluetoothConnection,
private val adapter: EventMatchersAdapter): DataService {
override fun sub(): Flowable<Event> = Flowable.create<Event>({ emitter ->
val underlying = EventsBridge(adapter = adapter, emitter = emitter)
emitter.setDisposable(object: MainThreadDisposable() {
override fun onDispose() {
if(!underlying.isDisposed) {
underlying.dispose()
}
}
})
connection.observeByteStream().subscribe(underlying)
}, BackpressureStrategy.BUFFER)
override fun write(data: ByteArray): Boolean
= connection.send(data)
override fun dispose()
= connection.closeConnection()
}
class DataRepositoryImpl(private val connectivity: ConnectivityService): DataRepository {
private val services = ConcurrentHashMap<UUID, DataService>()
override fun sub(serviceUUID: UUID): Flowable<Event>
= serviceOf(serviceUUID)
.flatMap { service -> service.sub() }
override fun write(serviceUUID: UUID, data: ByteArray): Flowable<Boolean>
= serviceOf(serviceUUID)
.map { service -> service.write(data) }
override fun dispose() {
for((_, service) in services) {
service.dispose()
}
}
private fun serviceOf(serviceUUID: UUID): Flowable<DataService> = with(services[serviceUUID]) {
when(this) {
null -> connectivity.sub(serviceUUID).doOnNext { service -> services.put(serviceUUID, service) }.toFlowable(BackpressureStrategy.BUFFER)
else -> Flowable.just(this)
}
}
}
И таким образом, в минимальное количество строк, мы получаем возможность делать то, что обычно растягивалось в жуткие цепочки вызовов, или коллбэк-хэлл примерно следующим образом:
repository.sub(UUID.randomUUID())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { event ->
when(event) {
is Event.A -> doSomeStuffA(event)
is Event.B -> doSomeStuffB(event)
is Event.C -> doSomeStuffC(event)
is Event.D -> doSomeStuffD(event)
}
}
11 строк для прослушки четырех событий от произвольного устройства, неплохо, не правда ли?)
Вместо заключения
Если у кого-то из читающих возникнет желание посмотреть на исходники – они лежат здесь.
Если кому-то захочется посмотреть, как впишутся другие правила для образования пакетов из сырых байт – пишите, попробуем добавить.
LionZXY
Очень интересно. Не хотите сделать на основе этого библиотеку? Думаю, она была бы достаточно популярна. Сейчас довольно много работает людей с Bluetooth
KomarovI Автор
Да, только есть проблема: если делать sealed class для эвентов – оформить в фрейм не получится. Если не делать – не будет exhaustive when работать, разве что интерфейсом заглушать. На неделе постараюсь выложить бету фреймворка для произвольных эвентов заглушенных таким способом, потом может что-то получится придумать. Как только что-то соображу – сразу добавлю в статью линк.
LionZXY
Смотрели в сторону protobuf?
KomarovI Автор
Подумываю, но в наших проектах скорее всего будут использоваться кастомные протоколы производителей оборудования, как минимум из-за того, что разработка собственных и поддержка с аппаратной стороны требует ресурсов, которые могут уйти, кхм, на более полезные действия.
А если вы про фреймворки, то вроде бы у protobuf с этим все хорошо, хотя не могу быть уверен, пока что руки не добрались до него.
LionZXY
Т.е. использование стандартного протокола слишком накладно, поэтому каждый производитель использует свой?)
Protobuf позволяет описывать модели на своём псевдоязыке и транслировать потом эту модель данных для различных ЯП. Библиотека достаточно гибкая, так что может её получиться настроить под странные фантазии протоколов производителей.
KomarovI Автор
Проблема на аппаратном уровне (зашумление, потери). А по протоколам – речь идет о протоколах поверх Bluetooth (формате передаваемых данных – пакетов).