Всем привет! Это один из первых моих постов, поэтому не судите строго. Сегодня хочу поделиться тем как мы добавляли поддержку сокета в наше приложение. Так получилось, что сокет у нас односторонний и отправлять нам ничего в него не нужно, но это еще не значит, что проблем стало меньше.
Немного о себе
Являюсь лидом андроид мобильной команды разработки в финтех компании Peter Partner. Мы реализовали систему по автоматизации торговли, которая интегрирована с крупными торговыми брокерами. Проект локализован на множество языков и им пользуется свыше 1 млн. человек в странах Азии, Африки и Южной Америки.
Что мы хотели?
Открыть сокет и получать из него информацию. Отправлять нам ничего не нужно было.
Что имеем?
KMP приложение для Android/iOS
KotlinFlow
MVI
Архитектура решения
Итак начнем. Первым делом с командой обсудили, что мы хотим от сокета. Выяснилось, что не так и много:
incoming: Flow<Result<T>> - собственно сообщения которые падают в сокет. Почему Result скажу чуть позже, почему T думаю не надо объяснять
connectState: Flow<ConnectState> - состояние подключения к сети сокета
isOpen: Boolean - еще одно состояние сокета ниже объясню почему их два
fun close() - тут все просто, нам нужно уметь закрывать сокет.
Открывать сокет без использования
suspend
- нужно для возможности использовать его как поле в местах где это было нужно.
Получился вот такой интерфейс
/**
* Web socket session with receive type [I]. Open automatically with subscribe on [incoming]
*/
interface WSSession<T> {
/**
* Incoming messages flow
*/
val incoming: Flow<Result<T>>
/**
* Flush and close WS
*/
suspend fun close()
/**
* WS network connect state. Empty before first connect
*/
val connectState: Flow<ConnectState>
/**
* return true while not called [close] after return false.
*
* while socket closing also return false
*
* while socket not open first time also return true
*/
val isOpen: Boolean
}
enum class ConnectState {
Connected,
Disconnected
}
Реализация
Полный пример
inline fun <reified T> HttpClient.wsSession(
url: String,
connectivityProvider: ConnectivityProvider,
): WSSession<T> = object : WSSession<T> {
private var isClosed = false
private var innerWSSession: WebSocketSession? = null
override val connectState = MutableSharedFlow<ConnectState>(1)
override val isOpen: Boolean
get() = !isClosed
override val incoming: Flow<Result<T>> = flow<Result<T>> {
if (isClosed) return@flow
val ws = webSocketSession(urlString = url)
innerWSSession = ws
if (isClosed) {
ws.close()
emit(Result.failure(NoData))
return@flow
}
connectState.emit(ConnectState.Connected)
emitAll(ws.incoming.receiveAsFlow().map { frame -> requestWrapperWs(frame) })
}.retry { cause: Throwable ->
if (cause is IOException || cause is HttpRequestTimeoutException) {
connectState.emit(ConnectState.Disconnected)
connectivityProvider.waitConnect()
}
true
}.filter { isOpen }
override suspend fun close() {
isClosed = true
innerWSSession?.close()
}
}
suspend inline fun <reified T> requestWrapperWs(
frame: Frame,
): Result<T> = requestWrapperWs(frame) {
when (this) {
is Frame.Close -> throw CancellationException(this.data.decodeToString())
is Frame.Ping -> throw NoData
is Frame.Pong -> throw NoData
is Frame.Text -> commonJsonConfig.decodeFromString(this.readText())
is Frame.Binary -> commonJsonConfig.decodeFromString(this.data.decodeToString())
else -> throw NoData
}
}
suspend inline fun <reified T> requestWrapperWs(
frame: Frame,
modifier: Frame.() -> T,
): Result<T> = catching { frame.modifier() }
val commonJsonConfig = Json {
ignoreUnknownKeys = true
allowSpecialFloatingPointValues = true
}
/**
* no data - Anywhere. When data not set
*/
object NoData : Throwable()
Теперь немного детальнее о Result
Т.к. формат сообщений хоть и ожидается нужный, но никто этого гарантировать не может. Поэтому было решено использовать Result
как сущность которая имеет валидное и не очень значение. Под не валидным подразумеваются такие вещи как Ping, Close, сообщения которые мы не обрабатываем.
Зачем нам два состояния isOpen
?
Изначально хотели сделать только одно поле connectState
,но потом поняли, что это не одно и тоже. Что же такое isOpen
и connectState и почему их два?
isOpen
- используется как флаг того что сокет должен быть открыт. Имеется ввиду, что сейчас он может быть не открыт (еще, нету интернета, какая-то ошибка внутри или любая другая причина), но сокет будет пытаться переоткрыться и хранить в себе данные до тех пор пока это нам нужно.
connectState
- используется как флаг того, что сокет реально открыт и данные по нему ходят.
Внимательный читатель мог заметить, что есть такое поле как connectivityProvider
так вот его реализация может быть любая на ваше усмотрение. Нам нужен от него только метод waitConnect
, который является suspend функцией и ждет когда система скажет, что у нас есть доступ к интернету.
Из интересно в реализации стоит выделить то, что закрыть сокет гораздо большая проблема чем открыть.
Как этим пользоваться?
Пример кода
var lastOpenSession: WSSession<*>? = null
_url.onEach { url ->
lastOpenSession?.close()
val session = httpClient.wsSession(
url = url,
connectivityProvider = connectivityProvider,
)
lastOpenSession = session
session.incoming.onSuccess {
//your code
}.launchIn(CoroutineScope(Dispatchers.Default))
session.connectState.onEach { state ->
_hasProgress.value = state == ConnectState.Disconnected
}.launchIn(this)
}.launchIn(storeScope).invokeOnCompletion {
CoroutineScope(Dispatchers.Default).launch { lastOpenSession?.close() }
}
Почему такая странная реализация вызова? В нашем приложении Store умирает как только пользователь выходит с экрана. Поэтому метод close
не будет вызван корректно. Пришлось придумать некоторый костыль если его можно считать таковым. Данный пример откроет сокет для конкретного URL и если он сменится, то сокет закроется и откроется новый.
Итог
Спасибо всем кто дочитал до конца! Надеюсь я кому-то помог открыть свой первый сокет. Пишите в комментариях, что можно улучшить как по коду так и по статье.
Комментарии (4)
Veygard
11.12.2023 14:39В "override val incoming()" разве вторая "if (isClosed){} " будет выполняется? По идее при первой уже должно в ретурн уходить.
wilerat
Спасибо, интересно. Такие вопросы появились:
Может лучше стоило назвать параметр isOpen как нибудь более понятно, к примеру
val isConnectedOrConnecting: Boolean
?Может вообще в статусе
ConnectState
стоило сделать три состояния -Connected
,Connecting
,Disconnected
?nin-jin
shouldBeOpened тогда уж.
kazachenko_ka Автор
Спасибо большое за комментарий! Как написал nin-jin
shouldBeOpened
больше подходит по смыслу, но мне больше нравится идея с доп. состояниями дляConnectState.
На досуге перепишу, в комментариях скину чего получилось.