Чем больше я читал и смотрел доклады про корутины в Kotlin, тем больше я восхищался этим средством языка. Недавно в Kotlin 1.3 вышел их стабильный релиз, а значит, настало время начать погружение и опробовать корутины в действии на примере моего существующего RxJava-кода. В этом посте мы сфокусируемся на том, как взять существующие запросы к сети и преобразовать их, заменив RxJava на корутины.



Откровенно говоря, перед тем как я попробовал корутины, я думал, что они сильно отличаются от того, что было раньше. Однако, основной принцип корутин включает те же понятия, к которым мы привыкли в реактивных потоках RxJava. Для примера давайте возьмем простую конфигурацию RxJava для создания запроса к сети из одного моего приложения:


  • Определяем сетевой интерфейс для Ретрофита, используя Rx-адаптер (retrofit2:adapter-rxjava2). Функции будут возвращать объекты из Rx-фреймворка, такие как Single или Observable. (Здесь и далее используются функции, а не методы, так как предполагается, что старый код был также написан на Kotlin. Ну или сконвертирован из Java через Android Studio).
  • Вызываем определенную функцию из другого класса (например репозитория, или активити).
  • Определяем для потоков, на каком Scheduler-е они будут выполняться и возвращать результат (методы .subscribeOn() и .observeOn()).
  • Сохраняем ссылку на объект для отписки (например в CompositeObservable).
  • Подписываемся на поток эвентов.
  • Отписываемся от потока в зависимости от событий жизненного цикла Activity.

Это основной алгоритм работы с Rx (не учитывая функции маппинга и детали других манипуляций с данными). Что касается корутин – принцип сильно не меняется. Та же концепция, меняется только терминология.


  • Определяем сетевой интерфейс для Ретрофита, используя адаптер для корутин. Функции будут возвращать Deferred объекты из API корутин.
  • Вызываем эти функции из другого класса (например репозитория, или активити). Единственное отличие: каждая функция должна быть помечен как отложенная (suspend).
  • Определяем dispatcher, который будет использован для корутина.
  • Сохраняем ссылку на Job-объект для отписки.
  • Запускаем корутин любым доступным способом.
  • Отменяем корутины в зависимости от событий жизненного цикла Activity.

Как можно заметить из приведенных выше последовательностей, процесс выполнения Rx и корутин очень похож. Если не учитывать детали реализации, это означает, что мы можем сохранить подход, который у нас есть – мы только заменяем некоторые вещи, чтобы сделать нашу реализацию coroutine-friendly.



Первый шаг, который мы должны сделать – позволить Ретрофиту возвращать Deferred-объекты. Объекты типа Deferred представляют собой неблокирующие future, которые могут быть отменены, если нужно. Эти объекты по сути представляют собой корутинную Job, которая содержит значение для соответствующей работы. Использование Deferred типа позволяет нам смешать ту же идею, что и Job, с добавлением возможности получить дополнительные состояния, такие как success или failure – что делает его идеальным для запросов к сети.


Если вы используете Ретрофит с RxJava, вероятно, вы используете RxJava Call Adapter Factory. К счастью, Джейк Вортон написал её эквивалент для корутин.


Мы можем использовать этот call adapter в билдере Ретрофита, и затем имплементировать наш Ретрофит-интерфейс так же, как было с RxJava:


private fun makeService(okHttpClient: OkHttpClient): MyService {
    val retrofit = Retrofit.Builder()
            .baseUrl("some_api")
            .client(okHttpClient)
            .addCallAdapterFactory(CoroutineCallAdapterFactory())
            .build()
    return retrofit.create(MyService::class.java)
}

Теперь посмотрим на интерфейс MyService, который использован выше. Мы должны заменить в Ретрофит-интерфейсе возвращаемые Observable-типы на Deferred. Если раньше было так:


@GET("some_endpoint")
fun getData(): Observable<List<MyData>>

То теперь заменяем на:


@GET("some_endpoint")
fun getData(): Deferred<List<MyData>>

Каждый раз, когда мы вызовем getData() – нам вернется объект Deferred – аналог Job для запросов к сети. Раньше мы как-то так вызывали эту функцию с RxJava:


override fun getData(): Observable<List<MyData>> {
    return myService.getData()
        .map { result ->
            result.map { myDataMapper.mapFromRemote(it) }
        }
}

В этом RxJava потоке мы вызываем нашу служебную функцию, затем применяем map-операцию из RxJava API с последующим маппингом данных, вернувшихся из запроса, в что-то, используемое в UI слое. Это немного поменяется, когда мы используем реализацию с корутинами. Для начала, наша функция должна быть suspend (отложенной), для того, чтобы сделать ленивую операцию внутри тела функции. И для этого вызывающая функция должна быть также отложенной. Отложенная функция – неблокирующая, и ею можно управлять после того, как она будет первоначально вызвана. Можно ее стартануть, поставить на паузу, возобновить или отменить.


override suspend fun getData(): List<MyData> {
    ...
}

Теперь мы должны вызвать нашу служебную функцию. На первый взгляд, мы выполняем то же самое, но нужно помнить, что теперь мы получаем Deferred вместо Observable.


override suspend fun getData(): List<MyData> {
    val result = myService.getData()
    ...
}

Из-за этого изменения мы не можем больше использовать цепочку map-операция из RxJava API. И даже в этой точке нам не доступны данные – мы только имеем Deferred-инстанс. Теперь мы должны использовать функцию await() для того, чтобы дождаться результата выполнения запроса и затем продолжить выполнение кода внутри функции:


override suspend fun getData(): List<MyData> {
    val result = myService.getData().await()
    ...
}

В этой точке мы получаем завершенный запрос и данные из него, доступные для использования. Поэтому мы можем теперь совершать операции маппинга:


override suspend fun getData(): List<MyData> {
    val result = myService.getData().await()
    return result.map { myDataMapper.mapFromRemote(it) }
}

Мы взяли наш Ретрофит-интерфейс вместе с вызывающим классом и использовали корутины. Теперь же мы хотим вызвать этот код из наших Activity или фрагментов и использовать данные, которые мы достали из сети.


В нашей Activity начнем с создания ссылки на Job, в которую мы сможем присвоить нашу корутинную операцию и затем использовать для управления, например отмены запроса, во время вызова onDestroy().


private var myJob: Job? = null
override fun onDestroy() {
    myJob?.cancel()
    super.onDestroy()
}

Теперь мы можем присвоить что-то в переменную myJob. Давайте посмотрим на наш запрос с корутинами:


myJob = CoroutineScope(Dispatchers.IO).launch {
    val result = repo.getLeagues()
    withContext(Dispatchers.Main) {
        //do something with result
    }
}

В этом посте я не хотел бы углубляться в Dispatchers или исполнение операций внутри корутинов, так как это тема для других постов. Вкратце, что здесь происходит:


  • Создаем инстанс CoroutineScope, используя IO Dispatcher в качестве параметра. Этот диспатчер используется для совершения блокирующих операций ввода-вывода, таких как сетевые запросы.
  • Запускаем наш корутин функцией launch – эта функция запускает новый корутин и возвращает ссылку в переменную типа Job.
  • Затем мы используем ссылку на наш репозиторий для получения данных, выполняя сетевой запрос.
  • В конце мы используем Main диспатчер для совершения работы на UI-потоке. Тут мы сможем показать полученные данные пользователям.

В следующем посте автор обещает копнуть поглубже в детали, но текущего материала должно быть достаточно для начала изучения корутинов.


В этом посте мы заменили RxJava-реализацию ответов Ретрофита на Deferred объекты из API корутин. Мы вызываем эти функции для получения данных из сети, и затем отображем их в нашем активити. Надеюсь, вы увидели, как мало изменений нужно сделать, чтобы начать работать с корутинами, и оценили простоту API, особенно в процессе чтения и написания кода.


В комментариях к оригинальному посту я нашел традиционную просьбу: покажите код целиком. Поэтому я сделал простое приложение, которое при старте получает расписание электричек с API Яндекс.Расписаний и отображает в RecyclerView. Ссылка: https://github.com/AndreySBer/RetrofitCoroutinesExample


Еще хотелось бы добавить, что корутины кажутся неполноценной заменой RxJava, так как не предлагают равноценного набора операций для синхронизации потоков. В этой связи стоит посмотреть на реализацию ReactiveX для Kotlin: RxKotlin.


Если же вы используете Android Jetpack, нашел также пример с Ретрофитом, корутинами, LiveData и MVVM: https://codinginfinite.com/kotlin-coroutine-call-adapter-retrofit/

Комментарии (3)


  1. alfaneos
    08.11.2018 14:06

    Я читал что у корутин есть преимущество в потреблении памяти перед отрытием отдельного потока в Джаве. А как в котлине с производительностью RxKotlin?


    1. AndreySBer Автор
      08.11.2018 14:11

      RxKotlin представляет собой обертку над RxJava с поддержкой Kotlin Extensions. В RxJava работа с потоками осуществляется не напрямую, а через Schedulers — пуллы из однопоточных ExecutorService-ов, что сокращает накладные расходы на открытие потока.


  1. xbitstream
    09.11.2018 20:49

    Выглядит как ремонт фасадов в России — навесить принт со стеной и окнами на руины. Если есть возможность в принципе реорганизовать работу сервиса, то почему бы не выкинуть ретрофит и заюзать котлиновские библиотеки?