Мы все похожи на этого кота, когда узнаем новое
Мы все похожи на этого кота, когда узнаем новое

Всем привет! Это один из первых моих постов, поэтому не судите строго. Сегодня хочу поделиться тем как мы добавляли поддержку сокета в наше приложение. Так получилось, что сокет у нас односторонний и отправлять нам ничего в него не нужно, но это еще не значит, что проблем стало меньше.

Немного о себе

Являюсь лидом андроид мобильной команды разработки в финтех компании Peter Partner. Мы реализовали систему по автоматизации торговли, которая интегрирована с крупными торговыми брокерами. Проект локализован на множество языков и им пользуется свыше 1 млн. человек в странах Азии, Африки и Южной Америки.

Что мы хотели?

Открыть сокет и получать из него информацию. Отправлять нам ничего не нужно было.

Что имеем?

  1. KMP приложение для Android/iOS

  2. KotlinFlow

  3. MVI


Архитектура решения

Итак начнем. Первым делом с командой обсудили, что мы хотим от сокета. Выяснилось, что не так и много:

  • incoming: Flow<Result<T>> - собственно сообщения которые падают в сокет. Почему Result скажу чуть позже, почему T думаю не надо объяснять

  • connectState: Flow<ConnectState> - состояние подключения к сети сокета

  • isOpen: Boolean - еще одно состояние сокета ниже объясню почему их два

  • fun close() - тут все просто, нам нужно уметь закрывать сокет.

  • Открывать сокет без использования suspend - нужно для возможности использовать его как поле в местах где это было нужно.

Получился вот такой интерфейс
/**
 * Web socket session with receive type [I]. Open automatically with subscribe on [incoming]
 */
interface WSSession<T> {
    /**
     * Incoming messages flow
     */
    val incoming: Flow<Result<T>>

    /**
     * Flush and close WS
     */
    suspend fun close()

    /**
     * WS network connect state. Empty before first connect
     */
    val connectState: Flow<ConnectState>

    /**
     * return true while not called [close] after return false.
     *
     * while socket closing also return false
     *
     * while socket not open first time also return true
     */
    val isOpen: Boolean
}


enum class ConnectState {
    Connected,
    Disconnected
}

Реализация

Полный пример
inline fun <reified T> HttpClient.wsSession(
    url: String,
    connectivityProvider: ConnectivityProvider,
): WSSession<T> = object : WSSession<T> {
    private var isClosed = false
    private var innerWSSession: WebSocketSession? = null

    override val connectState = MutableSharedFlow<ConnectState>(1)
    override val isOpen: Boolean
        get() = !isClosed

    override val incoming: Flow<Result<T>> = flow<Result<T>> {
        if (isClosed) return@flow
        val ws = webSocketSession(urlString = url)
        innerWSSession = ws
        if (isClosed) {
            ws.close()
            emit(Result.failure(NoData))
            return@flow
        }
        connectState.emit(ConnectState.Connected)
        emitAll(ws.incoming.receiveAsFlow().map { frame -> requestWrapperWs(frame) })
    }.retry { cause: Throwable ->
        if (cause is IOException || cause is HttpRequestTimeoutException) {
            connectState.emit(ConnectState.Disconnected)
            connectivityProvider.waitConnect()
        }
        true
    }.filter { isOpen }

    override suspend fun close() {
        isClosed = true
        innerWSSession?.close()
    }

}


suspend inline fun <reified T> requestWrapperWs(
    frame: Frame,
): Result<T> = requestWrapperWs(frame) {
    when (this) {
        is Frame.Close -> throw CancellationException(this.data.decodeToString())
        is Frame.Ping -> throw NoData
        is Frame.Pong -> throw NoData
        is Frame.Text -> commonJsonConfig.decodeFromString(this.readText())
        is Frame.Binary -> commonJsonConfig.decodeFromString(this.data.decodeToString())
        else -> throw NoData
    }
}

suspend inline fun <reified T> requestWrapperWs(
    frame: Frame,
    modifier: Frame.() -> T,
): Result<T> = catching { frame.modifier() }

val commonJsonConfig = Json {
    ignoreUnknownKeys = true
    allowSpecialFloatingPointValues = true
}

/**
 * no data - Anywhere. When data not set
 */
object NoData : Throwable()

Теперь немного детальнее о Result

Т.к. формат сообщений хоть и ожидается нужный, но никто этого гарантировать не может. Поэтому было решено использовать Result как сущность которая имеет валидное и не очень значение. Под не валидным подразумеваются такие вещи как Ping, Close, сообщения которые мы не обрабатываем.

Зачем нам два состояния isOpen?

Изначально хотели сделать только одно поле connectState ,но потом поняли, что это не одно и тоже. Что же такое isOpen и connectState и почему их два?

isOpen - используется как флаг того что сокет должен быть открыт. Имеется ввиду, что сейчас он может быть не открыт (еще, нету интернета, какая-то ошибка внутри или любая другая причина), но сокет будет пытаться переоткрыться и хранить в себе данные до тех пор пока это нам нужно.

connectState - используется как флаг того, что сокет реально открыт и данные по нему ходят.

Внимательный читатель мог заметить, что есть такое поле как connectivityProvider так вот его реализация может быть любая на ваше усмотрение. Нам нужен от него только метод waitConnect , который является suspend функцией и ждет когда система скажет, что у нас есть доступ к интернету.

Из интересно в реализации стоит выделить то, что закрыть сокет гораздо большая проблема чем открыть.

Как этим пользоваться?

Пример кода
var lastOpenSession: WSSession<*>? = null
_url.onEach { url ->
    lastOpenSession?.close()
    val session = httpClient.wsSession(
        url = url,
        connectivityProvider = connectivityProvider,
    )
    lastOpenSession = session
    session.incoming.onSuccess {
        //your code
    }.launchIn(CoroutineScope(Dispatchers.Default))
    session.connectState.onEach { state ->
        _hasProgress.value = state == ConnectState.Disconnected
    }.launchIn(this)
}.launchIn(storeScope).invokeOnCompletion {
    CoroutineScope(Dispatchers.Default).launch { lastOpenSession?.close() }
}

Почему такая странная реализация вызова? В нашем приложении Store умирает как только пользователь выходит с экрана. Поэтому метод close не будет вызван корректно. Пришлось придумать некоторый костыль если его можно считать таковым. Данный пример откроет сокет для конкретного URL и если он сменится, то сокет закроется и откроется новый.

Итог

Спасибо всем кто дочитал до конца! Надеюсь я кому-то помог открыть свой первый сокет. Пишите в комментариях, что можно улучшить как по коду так и по статье.

Комментарии (4)


  1. wilerat
    11.12.2023 14:39

    Спасибо, интересно. Такие вопросы появились:

    • Может лучше стоило назвать параметр isOpen как нибудь более понятно, к примеру val isConnectedOrConnecting: Boolean ?

    • Может вообще в статусе ConnectState стоило сделать три состояния - Connected, Connecting, Disconnected ?


    1. nin-jin
      11.12.2023 14:39

      shouldBeOpened тогда уж.


    1. kazachenko_ka Автор
      11.12.2023 14:39

      Спасибо большое за комментарий! Как написал nin-jin shouldBeOpened больше подходит по смыслу, но мне больше нравится идея с доп. состояниями для ConnectState. На досуге перепишу, в комментариях скину чего получилось.


  1. Veygard
    11.12.2023 14:39

    В "override val incoming()" разве вторая "if (isClosed){} " будет выполняется? По идее при первой уже должно в ретурн уходить.