Всем привет! На связи Сергей, Android-разработчик Студии Олега Чулакова на проектах Сбера.

В этой статье я хочу рассмотреть один из важнейших аспектов мобильной разработки — многопоточность. Многопоточность позволяет выполнять несколько задач одновременно и повышать производительность приложения. Далее погрузимся в следующие темы:

  1. Что такое многопоточность. 

  2. Запуск приложения системой. 

  3. Создание главного потока в приложении. 

  4. Один поток vs Использование многопоточности. 

  5. Как происходит создание нового потока. 

  6. Основные принципы и примеры использования RxJava. 

  7. Основные принципы и примеры использования Kotlin Coroutines. 

  8. Сравнение RxJava и Kotlin Coroutines. 

1. Что такое многопоточность

Перед тем как рассмотреть многопоточность в мобильной разработке, важно понимать, что такое многопоточность вообще. Многопоточность — возможность программы выполнять несколько задач одновременно в рамках одного процесса. Это позволяет эффективно использовать вычислительные ресурсы компьютера или мобильного устройства и повышать производительность приложения. В контексте мобильной разработки многопоточность может быть особенно полезна, например при загрузке данных из Сети, обработке графики или выполнении других трудоемких задач.

В процессе разработки Android-приложения любые операции, связанные с походом в Сеть, или операции обращения к локальной базе данных — другими словами, операции ввода-вывода — должны осуществляться в фоновом потоке.

Теперь, когда мы определили, что такое многопоточность, перейдем к ее использованию в мобильной разработке.

2. Запуск приложения системой

Для того чтобы понимать, откуда берется главный поток, давайте сначала разберемся, как система запускает приложение.

Представьте, что пользователь нажимает на иконку приложения на главном экране своего устройства, и в этот момент происходит следующее:

  1. ОС начинает поиск приложения, соответствующего иконке, на которую пользователь нажал. Поиск выполняется в системных папках, где находятся все установленные приложения.

  2. Когда приложение найдено, ОС запускает его процесс. Каждый запущенный процесс приложения работает в своей собственной среде, изолированной от других процессов.

  3. При запуске приложения происходит его загрузка в оперативную память. В зависимости от размера приложения и скорости устройства загрузка может занять некоторое время.

  4. После загрузки приложения начинается его инициализация, включая загрузку всех необходимых ресурсов, библиотек и файлов, которые приложение использует для своей работы.

  5. После инициализации приложение отображает свой интерфейс на экране устройства. Интерфейс может содержать различные элементы, такие как кнопки, поля ввода и текстовые блоки.

  6. Приложение ожидает действий пользователя, которые могут включать ввод текста, выбор из списка или выполнение других задач. Приложение также может взаимодействовать с другими приложениями и ОС, например, отправлять уведомления на панель уведомлений или получать данные из интернета.

3. Создание главного потока в приложении

В Android-приложении есть 4 основных компонента.

Если один из компонентов запускается и Android видит, что это первый компонент нашего приложения, то под него создается новый процесс с одним потоком, который является точкой входа. Этот поток называют «Главный поток», «Main thread» или «UI thread».

Главный поток в операционной системе Android применяется для выполнения задач, связанных с пользовательским интерфейсом (UI), таких как обработка пользовательских событий и обновление пользовательского интерфейса. Взаимодействие пользователя с приложением происходит в основном через UI, и это наиболее важная часть приложения, поэтому главный поток называют UI-потоком.

Если главный поток будет блокироваться длительное время, то пользовательский интерфейс не будет отзываться на действия пользователя, и приложение может показать диалоговое окно о том, что оно не отвечает. Поэтому очень важно не блокировать главный поток и не выполнять в нем длительных операций, чтобы пользоваться приложением без задержек.

4. Один поток VS Использование многопоточности

1-й пример. В приложении долгое и тяжелое вычисление выполняется в главном потоке, поэтому наш UI thread блокируется, и приложение перестает отвечать.

2-й пример. В приложении долгое и тяжелое вычисление выполняется в фоновом потоке. Мы не загружаем наш главный поток, поэтому приложение продолжает отвечать на действия пользователя.

5. Создание нового потока

Одним из ключевых аспектов многопоточности в мобильной разработке является создание новых потоков. Создание нового потока — это процесс, который позволяет выполнять определенный код параллельно с основным потоком выполнения приложения. Такой подход может значительно улучшить отзывчивость приложения и общую производительность. Однако важно знать, как правильно создавать новые потоки и как эти потоки будут взаимодействовать между собой и с основным потоком приложения. Рассмотрим более подробно, как происходит создание нового потока.

Создавать поток в процессе разработки можно разными способами, в которые мы не будем углубляться. Просто определим, что у нас есть класс Thread, который мы запускаем на выполнение. В этот момент происходит следующее:

  1. Выделение памяти для потока: для нового потока выделяется память в размере 1 Мб, которая будет использоваться для хранения переменных, данных и другой информации, связанной с потоком.

  2. Инициализация потока: созданный объект Thread инициализируется, устанавливая параметры, такие как приоритет, имя и другие свойства, связанные с потоком.

  3. Запуск потока: приложение запускает новый поток, вызывая метод start() объекта Thread. Этот метод запускает новый поток и вызывает метод run() в этом потоке.

После запуска потока метод run() в объекте Thread начинает выполнять код, который был передан ему при создании потока.

При окончании работы метода run() в объекте Thread поток завершается. Приложение может выполнять дополнительные действия после завершения потока, такие как очистка ресурсов, освобождение памяти и т.д.

Здесь важно отметить, что создание нового потока является тяжелой операцией, требующей ресурсы от системы, поэтому не стоит бездумно создавать новые потоки, а следует подходить к этому с умом, так как формирование нового потока может быть ущербнее для приложения, чем выполнение двух строчек кода в созданном потоке.

6. Основные принципы и примеры использования RxJava

После того как мы рассмотрели концепцию многопоточности в мобильной разработке и создание новых потоков, давайте перейдем к одной из популярных библиотек для разработки Android-приложений на языке Java и Kotlin — RxJava.

RxJava — это библиотека реактивного программирования, которая позволяет разрабатывать асинхронный и реактивный код. Идея реактивности построена на паттерне проектирования Observer.

У нас есть подписчики и то, на что мы подписываемся. После подписки, как только появляется новое сообщение, всем подписчикам приходит notify, то есть уведомление. Это базовый паттерн.

В данной схеме есть:

  • Subject —  тот, кто публикует новые сообщения;

  • Observer —  тот, кто на них подписан. В реактивных потоках подписчик обычно называется Subscriber. Термины разные, но, по сути, это одно и то же. В большинстве сообществ более привычны термины Publisher/Subscriber.

Это базовая идея, на которой все строится. 

RxJava предоставляет набор операторов, которые позволяют создавать и преобразовывать последовательности данных, называемые Observable. Observable могут испускать различные типы событий, такие как данные, ошибки или завершение.

Один из ключевых аспектов RxJava — это возможность управлять асинхронными операциями и реактивно реагировать на события. С помощью операторов RxJava можно создавать цепочки операций над данными, объединять, фильтровать, преобразовывать и комбинировать последовательности. Это позволяет удобно обрабатывать асинхронные задачи, такие как сетевые запросы, обработка данных в фоновом режиме и управление параллельными вычислениями.

RxJava также обладает мощным механизмом управления подписками, который позволяет контролировать жизненный цикл потоков данных. Вы можете создавать подписки, отменять их, обрабатывать ошибки и освобождать ресурсы, связанные с потоком данных, когда они больше не нужны.

Другим важным аспектом RxJava является возможность обработки ошибок и исключений. RxJava предоставляет операторы для обработки ошибок и восстановления после них, что упрощает обработку и управление ошибками в асинхронной среде.

В RxJava существуют два типа источников данных: горячие (hot) и холодные (cold). Разница между ними заключается в том, как и когда данные начинают эмитироваться и как управляется их поток.

Горячие источники данных (Hot Observables)

Горячие источники начинают эмитировать данные независимо от наличия подписчиков. Если наблюдатель подписывается на горячий источник, он присоединяется к потоку данных в текущий момент времени. Это означает, что наблюдатель может пропустить некоторые данные, которые были эмитированы до его подписки. Горячие источники используются, например, для событий в реальном времени или для общего доступа к одному потоку данных из нескольких наблюдателей. 

В RxJava существует несколько типов горячих источников данных. Ниже подробное описание каждого из них. 

1. PublishSubject

Эмитирует только те элементы, которые были добавлены после подписки наблюдателя. Новые подписчики получают только последующие элементы после своей подписки. Не сохраняет предыдущие элементы и не повторяет их для новых подписчиков. Если подписчик пропускает события, он не получит их после своей подписки.

Пример:

// Создание PublishSubject
val publishSubject = PublishSubject.create<Int>()

// Подписка на PublishSubject
val disposable = publishSubject.subscribe(
  // onNext - обработка нового значения
  { value -> println("Получено значение: $value") },
  // onError - обработка ошибки
  { error -> println("Ошибка: ${error.message}") },
  // onComplete - обработка завершения последовательности
  { println("Последовательность завершена") }
)

// Эмитирование событий
publishSubject.onNext(1)
publishSubject.onNext(2)
publishSubject.onNext(3)

// Генерация ошибки
publishSubject.onError(Exception("Ошибка в потоке данных"))

// Завершение последовательности
publishSubject.onComplete()

// Отписка от PublishSubject
disposable.dispose()

В этом примере мы:

  1. создаем PublishSubject с помощью метода PublishSubject.create();

  2. подписываемся на PublishSubject, используя метод subscribe(). В методе subscribe() мы передаем три аргумента: функцию для обработки нового значения (onNext), функцию для обработки ошибки (onError) и функцию для обработки завершения последовательности (onComplete);

  3. эмитируем несколько событий с помощью метода onNext() для передачи значений в PublishSubject;

  4. генерируем ошибку, используя метод onError(), чтобы проверить обработку ошибок;

  5. завершаем последовательность с помощью метода onComplete(), чтобы указать, что больше значений не будет;

  6. отписываемся от PublishSubject, вызывая метод dispose() для освобождения ресурсов, связанных с подпиской.

При выполнении данного кода мы увидим следующий вывод в консоли:

I/System.out: Получено значение: 1

I/System.out: Получено значение: 2

I/System.out: Получено значение: 3

I/System.out: Ошибка: Ошибка в потоке данных

Важно отметить, что мы не увидим сообщение «Последовательность завершена», потому что после генерации ошибки с помощью метода onError() последовательность PublishSubject считается завершенной и дальнейшие события не будут доставлены наблюдателям.

2. BehaviorSubject

Горячий источник, который хранит последний элемент и эмитирует его немедленно при подписке нового наблюдателя.

Новые подписчики получают последний эмитированный элемент и все последующие элементы.

Подписчики, которые пропустили некоторые элементы, получат только последний элемент и все последующие после своей подписки.

Пример:

// Создание BehaviorSubject
val behaviorSubject = BehaviorSubject.createDefault(0)

// Эмитирование событий
behaviorSubject.onNext(1)
behaviorSubject.onNext(2)

// Подписка на BehaviorSubject
val disposable = behaviorSubject.subscribe(
  // onNext - обработка нового значения
  { value -> println("Получено значение: $value") },
  // onError - обработка ошибки
  { error -> println("Ошибка: ${error.message}") },
  // onComplete - обработка завершения последовательности
  { println("Последовательность завершена") }
)

// Эмитирование событий
behaviorSubject.onNext(3)

// Завершение последовательности
behaviorSubject.onComplete()

// Отписка от BehaviorSubject
disposable.dispose()

При выполнении данного кода мы увидим следующий вывод в консоли:

I/System.out: Получено значение: 2

I/System.out: Получено значение: 3

I/System.out: Последовательность завершена

3. ReplaySubject

Горячий источник, который сохраняет все или ограниченное количество предыдущих элементов и повторяет их при подписке нового наблюдателя.

Новые подписчики получают все сохраненные предыдущие элементы и все последующие элементы.

Можно настроить размер буфера для определения количества предыдущих элементов, которые нужно сохранить и повторить.

Пример:

// Создание ReplaySubject с размером буфера 2
val replaySubject = ReplaySubject.createWithSize<Int>(2)

// Эмитирование событий
replaySubject.onNext(1)
replaySubject.onNext(2)
replaySubject.onNext(3)

// Подписка на ReplaySubject
val disposable = replaySubject.subscribe(
  // onNext - обработка нового значения
  { value -> println("Получено значение: $value") },
  // onError - обработка ошибки
  { error -> println("Ошибка: ${error.message}") },
  // onComplete - обработка завершения последовательности
  { println("Последовательность завершена") }
)

// Эмитирование событий
replaySubject.onNext(4)

// Завершение последовательности
replaySubject.onComplete()

// Отписка от ReplaySubject
disposable.dispose()

При выполнении данного кода мы увидим следующий вывод в консоли:

I/System.out: Получено значение: 2

I/System.out: Получено значение: 3

I/System.out: Получено значение: 4

I/System.out: Последовательность завершена

4. AsyncSubje

Горячий источник, который эмитирует только последний элемент и только после завершения последовательности.

Новые подписчики получают лишь последний элемент, но только после того, как последовательность завершена.

Этот тип источника полезен, когда важен результат или окончательное значение и все предыдущие элементы не играют роли.

Пример:

// Создание AsyncSubject
val asyncSubject = AsyncSubject.create<Int>()

// Подписка на AsyncSubject
val disposable = asyncSubject.subscribe(
  // onNext - обработка нового значения
  { value -> println("Получено значение: $value") },
  // onError - обработка ошибки
  { error -> println("Ошибка: ${error.message}") },
  // onComplete - обработка завершения последовательности
  { println("Последовательность завершена") }
)

// Эмитирование событий
asyncSubject.onNext(1)
asyncSubject.onNext(2)
asyncSubject.onNext(3)

При выполнении указанного кода мы не увидим ничего в выводе, потому что AsyncSubject возвращает только последнее значение из последовательности лишь после завершения.

В данном примере кода после эмитирования значений 1, 2 и 3 с помощью метода onNext() мы не завершаем последовательность с помощью метода onComplete(). 

Чтобы увидеть значения в выводе, мы должны вызвать метод onComplete(), чтобы AsyncSubject завершился и передал последнее значение наблюдателям.

Каждый из этих горячих источников в RxJava имеет свои особенности и подходит для разных сценариев использования. Применяя их, можно создавать мощные потоки данных, управлять подписками и обеспечивать доставку событий различным наблюдателям в асинхронной среде.

В представленных выше примерах мы видим, что результатом подписки на любой из источников данных в RxJava является Disposable.

Disposable в RxJava — объект, который представляет подписку на поток данных и позволяет контролировать ее состояние и отписываться от нее. Disposable предоставляет метод dispose(), который позволяет прекратить подписку и освободить ресурсы.

В контексте Android управление Disposable играет важную роль для предотвращения утечек памяти и правильного управления жизненным циклом подписки. Особенно важно отписываться от подписок, когда связанный с ними компонент, например ViewModel, уничтожается.

Чтобы правильно очистить Disposable, можно использовать подход, основанный на применении CompositeDisposable. CompositeDisposable представляет собой контейнер, который хранит и управляет несколькими Disposable.

private val compositeDisposable = CompositeDisposable()

val disposable = Observable.interval(1, TimeUnit.SECONDS)
    .subscribe { result -> println(result) }
    
compositeDisposable.add(disposable)

У Disposable имеются методы clear() и dispose(). Они являются двумя разными способами для отписки от подписок и освобождения ресурсов. 

compositeDisposable.clear(). Этот метод используется для отписки от всех активных подписок, содержащихся в CompositeDisposable, и очищает контейнер, удаляя все Disposable из него. Однако сам CompositeDisposable остается активным и может продолжать применяться для добавления новых Disposable.

compositeDisposable.dispose(). Этот метод используется для окончательной отписки от всех подписок в CompositeDisposable и освобождения всех ресурсов, связанных с ними. После вызова этого метода CompositeDisposable больше не может быть использован для добавления новых Disposable.

Использование clear() или dispose() зависит от ваших потребностей и контекста приложения. Если вы хотите отписаться от подписок и продолжить использовать CompositeDisposable для добавления новых Disposable в будущем, используйте clear(). Если вы больше не планируете использовать CompositeDisposable и хотите полностью освободить ресурсы, используйте dispose().

Холодные источники данных (Cold Observables)

Холодные источники данных начинают эмитировать данные только тогда, когда на них подписывается наблюдатель (Observer). Каждый наблюдатель получает полную последовательность данных с начала. Это означает, что данные не будут теряться, даже если наблюдатель подписывается позже. Холодные источники в RxJava создаются, например, с помощью операторов Observable.create() или Observable.fromIterable().

Пример холодного источника:

val coldObservable: Observable<Int> = Observable.fromIterable(listOf(1, 2, 3, 4, 5))

coldObservable.subscribe(
            {
                println(it)
            },
            {
                Log.d("TAG", it.toString())
            }
)

В этом примере создается холодный источник данных (Cold Observable). Давайте рассмотрим, что происходит, более подробно. 

  1. Преобразование в Observable: Observable.fromIterable() принимает список и преобразует его в Observable. Каждый элемент списка становится отдельным событием, которое будет эмитироваться Observable.

  1. Подписка на Observable: после создания Observable можно подписаться на него с помощью метода subscribe(). При подписке будет создан наблюдатель (Observer), который будет слушать эмитируемые события.

  1. Эмитирование событий: после подписки на Observable каждый элемент списка будет последовательно эмитироваться в порядке их появления. В данном случае 1 будет первым событием, затем 2, 3, 4 и, наконец, 5.

  1. Обработка событий: подписанный наблюдатель будет обрабатывать каждое эмитированное событие, выполняя определенные действия в соответствии с логикой, которая была определена в его методах onNext(), onError() и onComplete().

  1. Завершение последовательности: после того как все элементы списка были эмитированы, Observable автоматически завершится и вызовет метод onComplete() у наблюдателя, чтобы указать, что последовательность завершена.

  1. В итоге при подписке на данный Observable наблюдатель будет получать последовательность событий, соответствующих элементам списка [1, 2, 3, 4, 5], и выполнять обработку каждого события.

В RxJava, чтобы работать с асинхронными потоками данных, можно использовать различные типы Cold Observable, такие как Single, Completable, Maybe, Observable и Flowable. Эти типы предоставляют удобный и понятный способ работы с потоками данных. Давайте рассмотрим типы Observable более подробно и примеры их использования в мобильной разработке.

Single представляет Observable, которые выдают одно значение или ошибку. Single может быть подходящим, когда у вас есть задача, ориентированная на наблюдаемое, и вы ожидаете одно значение, например сетевой запрос, который выполняется один раз, и возвращаемое значение (или ошибка). Сетевой вызов работает один раз, что означает, что вы не ожидаете, что он вернет дополнительные значения с течением времени. Другой пример — операция выборки данных из БД.

Completable представляет Observable, которые не выдают никакого значения, а только терминальные события, либо onError или onCompleted. Completable уместен, когда у вас есть Observable, а значение возвращаемого результата неинтересно. Пример — обновление кэша: допустим, операция может завершиться успешно / завершиться неудачно, но не возвращает никакого значения. Другим примером является некоторая длительная операция инициализации, которая ничего не возвращает. Это может быть сетевой вызов UPDATE/PUT, который привел только к индикации успеха.

С Maybe мы можем либо иметь некоторое значение, точно так же как Single, либо ничего не возвращать, так же как Completable. Кроме того, как и у обоих, у нас есть ошибка. Maybe ценно, когда мы хотим отметить, что Observable может не иметь значения и будет просто завершен.

Flowable и Observable устроены очень похоже. Оба генерируют от нуля до n элементов. Оба могут завершаться успешно или с ошибкой.

Все сводится к такой штуке, как backpressure. Не углубляясь в подробности, скажу лишь, что backpressure позволяет замедлить работу источника данных. Существующие системы имеют ограниченные ресурсы. И с помощью backpressure мы можем сказать всем, кто шлет нам данные, чтобы они притормозили, потому что мы не можем обрабатывать информацию с той скоростью, с которой она к нам поступает.

Schedulers

Еще одним важным аспектом работы с многопоточностью в RxJava являются Schedulers. Schedulers позволяют управлять тем, на каком потоке будут выполняться операции Observable и Subscriber. RxJava предоставляет несколько встроенных Schedulers, а также позволяет создавать собственные Schedulers. Рассмотрим более подробно, как работают Schedulers в RxJava и какие есть возможности для управления потоками выполнения в мобильной разработке.

Schedulers.io(). Этот планировщик основывается на неограниченном пуле потоков и используется для интенсивной работы с вводом-выводом, например доступ к файловой системе, выполнение сетевых вызовов, доступ к базе данных и т.д. Количество потоков в этом планировщике неограниченно и может расти по мере необходимости.

Schedulers.computation(). Этот планировщик используется для выполнения работы, высоко нагружающей ЦП, такой как обработка больших объемов данных, изображений и т.д. Планировщик основывается на ограниченном пуле потоков — по количеству доступных ядер. Так как этот планировщик подходит только для интенсивной работы с ЦП, количество его потоков ограничено. Сделано это для того, чтобы потоки не конкурировали за процессорное время и не простаивали.

Schedulers.newThread(). Этот планировщик создает совершенно новый поток при каждом вызове. Потоки очень затратно создавать и уничтожать. Вы должны быть осторожны и не злоупотреблять чрезмерным созданием потоков, так как это может привести к замедлению работы системы и переполнению памяти. Новый поток будет создаваться для обработки каждого элемента, полученного из observable-источника. В идеале вы должны использовать этот планировщик довольно редко, в основном для выведения в отдельный поток долго работающих частей программы.

Schedulers.single(). Этот планировщик основывается на единственном потоке, который используется для последовательного выполнения задач. Он может быть очень полезен, когда у вас есть набор фоновых заданий в разных местах вашего приложения, но нельзя допустить одновременного выполнения более чем одного из этих заданий.

Schedulers.from(Executor executor). Этот планировщик будет основываться на вашем собственном Executor. Может возникнуть ситуация, в которой необходимо будет выполнять определенные задачи в планировщике на основании собственной логики распределения потоков. Допустим, вы хотите ограничить число параллельных сетевых вызовов, которые делает ваше приложение. Можно создать собственный планировщик, который будет работать на базе ограниченного в размерах пула потоков (Scheduler.from(Executors.newFixedThreadPool(n))), и использовать его во всех местах, связанных с сетевыми вызовами.

AndroidSchedulers.mainThread(). Это специальный планировщик, который недоступен в библиотеке RxJava. Необходимо использовать расширяющую библиотеку RxAndroid для доступа к этому планировщику. Он полезен в android-приложениях для выполнения действий в потоке пользовательского интерфейса. По умолчанию этот планировщик ставит задания в очередь в Looper, связанный с основным потоком, но есть возможность переопределения — AndroidSchedulers.from(Looper looper).

Давайте посмотрим, как будет выглядеть подписка на источник данных:

private fun fetchUsers(): Single<List<User>> {
        return apiService.getUsers()
}

Эта функция возвращает холодный источник данных — Single.

fetchUsers()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnSubscribe { screenState.value = ScreenState.Loading }
        .subscribe(
            { users ->
                screenState.value = ScreenState.Content(data = users)
            },
            { throwable ->
                screenState.value = ScreenState.Error(throwable = throwable)
            }
         )

Этот участок кода выполняет асинхронный запрос для получения пользователей и обрабатывает полученные данные. Давайте пошагово рассмотрим, что происходит в этом участке кода:

  1. функция fetchUsers() возвращает Single<List<User>>;

  1. subscribeOn(Schedulers.io()) устанавливает планировщик выполнения операции подписки на io(), что означает, что операция будет выполняться на одном из потоков, предназначенных для операций ввода-вывода;

  1. observeOn(AndroidSchedulers.mainThread()) устанавливает планировщик для обработки данных на главном потоке Android, то есть все последующие операции будут выполняться на главном потоке;

  1. doOnSubscribe { screenState.value = ScreenState.Loading } вызывает блок кода doOnSubscribe, который устанавливает значение screenState в ScreenState.Loading — это событие происходит перед началом подписки на операцию;

  1. subscribe() вызывает подписку, которая принимает два аргумента: лямбда-выражение для обработки успешного результата и лямбда-выражение для обработки ошибки. В случае успешного получения пользователей значение screenState устанавливается в ScreenState.Content(data = users). В случае возникновения ошибки, значение screenState устанавливается в ScreenState.Error(throwable = throwable).

Что касается особенности работы subscribeOn и observeOn, то они определяют, на каких потоках выполняются различные этапы операции подписки.

subscribeOn устанавливает поток выполнения для операции подписки (в данном случае io()). Это означает, что все этапы операции, начиная с создания и отправки запроса, будут выполняться на указанном потоке. Принцип работы этой функции можно описать как «снизу вверх», поскольку поток выполнения устанавливается на самой нижней точке цепочки операций.

observeOn устанавливает поток выполнения для обработки результатов операции подписки (в данном случае mainThread()). Это означает, что все последующие операции, включая обработку успешного результата или ошибки, будут выполняться на указанном потоке. Принцип работы этой функции можно описать как «сверху вниз», поскольку поток выполнения устанавливается на более высокой точке цепочки операций.

Таким образом, subscribeOn и observeOn позволяют контролировать поток выполнения операций подписки и обработки результатов, обеспечивая гибкость и возможность правильного распределения нагрузки на различные потоки в асинхронных операциях.

Промежуточные операторы в RxJava

В RxJava есть большое количество промежуточных операторов для преобразования, фильтрации и комбинирования этих данных. Промежуточные операторы позволяют изменять элементы в потоке данных, фильтровать их, объединять и трансформировать, чтобы получить нужный результат. Они позволяют осуществлять преобразования, которые требуются для выполнения конкретных задач и достижения целей вашего приложения.

Например, мы можем использовать оператор map для преобразования элементов в потоке данных в другой тип данных или для применения определенной логики к каждому элементу.

Observable.range(1, 5)
    .map(number -> number * 2)
    .subscribe(result -> println(result))
    
// Выводит: 2, 4, 6, 8, 10

filter позволяет отфильтровывать элементы на основе заданного условия и передавать только те элементы, которые удовлетворяют этому условию.

Observable.range(1, 5)
    .filter(number -> number % 2 == 0)
    .subscribe(result -> println(result))
    
// Выводит: 2, 4

flatMap и concatMap позволяют преобразовывать элементы в потоке данных в другие Observable и объединять результаты в один поток. Например, мы можем использовать flatMap для выполнения асинхронных операций для каждого элемента входного потока данных, возвращая результаты в порядке их завершения.

Observable.range(1, 3)
    .flatMap(number -> Observable.range(1, number))
    .subscribe(result -> println(result))

// Выводит: 1, 1, 2, 1, 2, 3

zip комбинирует элементы из нескольких Observable в соответствии с определенной функцией и передает результаты в виде пары или кортежа.

val numbers = Observable.range(1, 3);
val letters = Observable.just("A", "B", "C")

Observable.zip(numbers, letters, (number, letter) -> number + letter)
    .subscribe(result -> println(result))

// Выводит: "1A", "2B", "3C"

merge объединяет элементы из нескольких Observable в один поток данных без определенного порядка.

val source1 = Observable.just(1, 2, 3)
val source2 = Observable.just(4, 5, 6)

Observable.merge(source1, source2)
    .subscribe(result -> println(result))

// Выводит: 1, 2, 3, 4, 5, 6

distinct удаляет повторяющиеся элементы из потока данных.

Observable.just(1, 2, 2, 3, 3, 3)
    .distinct()
    .subscribe(result -> println(result))
    
// Выводит: 1, 2, 3

take берет только указанное количество элементов из потока данных.

Observable.range(1, 5)
    .take(3)
    .subscribe(result -> println(result))

// Выводит: 1, 2, 3

skip пропускает указанное количество элементов в потоке данных и передает остальные элементы.

Observable.range(1, 5)
    .skip(2)
    .subscribe(result -> println(result))

// Выводит: 3, 4, 5

debounce подавляет быстро идущие друг за другом элементы в потоке данных и передает только последний элемент за определенный период времени.

Observable.create { emitter ->
    emitter.onNext(1);
    Thread.sleep(100);
    emitter.onNext(2);
    Thread.sleep(300);
    emitter.onNext(3);
}
.debounce(200, TimeUnit.MILLISECONDS)
.subscribe(result -> println(result))

// Выводит: 2, 3

Это лишь некоторые из операторов, доступных в RxJava. Они предоставляют мощные возможности для манипуляции с потоками данных и обработки асинхронных событий в android-приложениях.

При использовании промежуточных операторов важно обратить внимание на выбор планировщика (Scheduler) для выполнения операций. Планировщик позволяет управлять контекстом выполнения операторов, определяя, в каком потоке будет производиться код оператора. Это важно для обеспечения правильного управления потоками и предотвращения блокировки пользовательского интерфейса (UI) или других проблем с многопоточностью.

Заключение к блоку о RxJava

Мы рассмотрели ключевые аспекты использования RxJava в Android-разработке. RxJava основана на реактивном программировании, которое строится на основе паттерна Observable. 

Одним из важных аспектов RxJava являются горячие и холодные источники данных. Горячие источники уже активно излучают данные, независимо от наличия подписчиков. При подписке на горячий источник подписчик получает только новые данные, пропуская предыдущие. С другой стороны, холодные источники начинают излучать данные только после подписки каждого отдельного подписчика, что позволяет каждому подписчику получать полный набор данных.

Schedulers в RxJava предоставляют механизм управления потоками выполнения операций. Они позволяют нам указывать, на каком потоке будет выполняться код для Observable и подписчиков.

Промежуточные операторы в RxJava предоставляют множество функциональности для преобразования и фильтрации данных.

В итоге RxJava предоставляет нам мощный инструментарий для работы с асинхронными операциями и управления потоками данных в Android-приложениях.

7. Основные принципы и примеры использования Kotlin Coroutines

Кроме RxJava, еще одним популярным инструментом для работы с многопоточностью в мобильной разработке являются Kotlin Coroutines. Они позволяют писать асинхронный код в стиле синхронного, что делает его более понятным и легким в поддержке. Kotlin Coroutines также предоставляют удобные и понятные средства для управления жизненным циклом асинхронных операций и позволяют управлять потоками выполнения кода. Давайте рассмотрим более подробно, как работают Kotlin Coroutines и как они используются в мобильной разработке.

Операции в корутинах являются сопрограммами. Это легковесные потоки, которые выполняются в контексте реальных потоков. Это означает, что они не создают дополнительную нагрузку на систему, так как не являются отдельными потоками.

CoroutineScope

CoroutineScope — это основной компонент для управления корутинами в Kotlin. Он предоставляет API для запуска и отмены корутин и позволяет определять, на каком потоке должны выполняться операции. CoroutineScope также позволяет управлять жизненным циклом корутин и предотвращает утечки памяти. Рассмотрим более подробно, как работает CoroutineScope и как он используется в мобильной разработке.

CoroutineScope — это интерфейс, который предоставляет функции для запуска и управления корутинами. Корутины запускаются внутри CoroutineScope, который определяет их жизненный цикл и контекст выполнения.

Примеры использования CoroutineScope в Android-разработке:

GlobalScope — это глобальный CoroutineScope, который может быть использован для запуска корутин в приложении.

Он не связан с жизненным циклом компонентов Android и продолжает выполнение корутин, даже если активность или фрагмент были уничтожены.

Пример использования GlobalScope:

GlobalScope.launch {
    // Код корутины
}

Однако GlobalScope также имеет свои опасности, так как он не ограничен жизненным циклом и может привести к утечкам памяти, если корутины не будут явно отменены или завершены.

viewModelScope — это CoroutineScope, связанный с жизненным циклом ViewModel. Он автоматически отменяет все связанные с ним корутины при уничтожении ViewModel.

Пример использования viewModelScope внутри ViewModel:

class MyViewModel : ViewModel() {
    fun doSomething() {
        viewModelScope.launch {
            // Код корутины
        }
    }
}

lifecycleScope является удобным способом создания CoroutineScope, связанного с жизненным циклом компонента LifecycleOwner (например фрагмента или активности).

Преимуществом использования lifecycleScope является автоматическая отмена корутин. Корутины, запущенные в lifecycleScope, автоматически отменяются при уничтожении фрагмента или активности. Вам не нужно беспокоиться о явной отмене корутин: это происходит автоматически.

Вот пример использования lifecycleScope в корутинах:

class MyFragment : Fragment() {

    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)
        
        viewLifecycleOwner.lifecycleScope.launch {
            // Код корутины
        }
    }

    // Остальной код фрагмента
}

CoroutineScope, созданный локально:

class MyFragment : Fragment() {

    private val myCoroutineScope = CoroutineScope(Dispatchers.Main)

    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)

        myCoroutineScope.launch {
            // Код корутины
        }
    }

    override fun onDestroyView() {
        super.onDestroyView()
        myCoroutineScope.cancel()
    }

    // Остальной код фрагмента
}

Обратите внимание, что при создании своего CoroutineScope вам необходимо самостоятельно отслеживать и отменять корутины. Убедитесь, что вы правильно отменяете свой CoroutineScope, чтобы избежать утечек памяти и неправильного поведения вашего приложения.

Использование собственного CoroutineScope особенно полезно, когда вам нужно управлять выполнением корутин в определенной области или жизненном цикле вашего компонента.

Suspend

private suspend fun fetchUsers(): List<User> {
        return apiService.getUsers()
}

Ключевое слово suspend в Kotlin используется для обозначения функций, которые приостанавливают выполнение, но не блокируют поток выполнения. В контексте Kotlin Coroutines это означает, что функции с пометкой suspend могут быть приостановлены 

в процессе своего выполнения и возобновлены позже без блокировки основного потока выполнения.

Функции, помеченные как suspend, могут выполнять длительные операции ввода-вывода или другие блокирующие операции, не блокируя при этом основной поток выполнения. Вместо этого они могут приостанавливать свое выполнение и дать возможность другим сопрограммам его продолжить.

Функции с ключевым словом suspend могут быть вызваны только из корутин или других функций с ключевым словом suspend. Это делает код более безопасным и позволяет избежать проблем, связанных с блокировкой потоков.

Continuation

В качестве примера используем корутину:

launch {
    val data = fetchUsers() //suspend function

    println(data.toString())
}

Когда мы пишем код на Kotlin с использованием сопрограмм (корутин), этот код будет преобразован в java-классы при компиляции. Корутины не являются чем-то магическим, а просто абстракцией, которая реализована внутри этих классов.

Одной из ключевых составляющих корутин является ключевое слово suspend. Оно позволяет функции приостанавливать свое выполнение без блокировки потока, в котором она вызывается. Вместо этого функция будет выполняться в отдельном потоке, позволяя другому коду продолжать работу. Когда функция завершит свою работу, она возобновит выполнение кода, и функция, которая вызвала suspend-функцию, продолжит свое выполнение.

Когда kotlin-код преобразуется в java-код, для каждой корутины будет создан соответствующий java-класс, называемый Continuation. Этот класс содержит всю логику приостановки и возобновления выполнения корутины. Внутри Continuation используется механизм сохранения состояния и передачи управления между выполнением suspend-функций.

Для лучшего понимания механизма работы корутин и suspend-функций мы начнем с упрощенной версии Continuation, из которой исключим множество деталей, чтобы постепенно добавить их и получить более полный и оригинальный код. Это поможет нам разобраться в принципах работы корутин и понять, что под капотом происходит обычный код на Java.

Основная задача Continuation — сделать так, чтобы println был выполнен только после того, как функция fetchUsers завершит загрузку пользователей в фоновом потоке. Обычно в Java в таких случаях используется колбэк, но Continuation идет другим путем. Код в методе invokeSuspend делится свитчем на две части. И добавляется переменная label. 

class GeneratedContinuationClass extends SuspendLambda {

    List<User> users;

    int label;
 
    void invokeSuspend() {
    switch (label) {
        case 0: {
	label = 1;
            users = fetchUsers(); // suspend function
            return;
        }
        case 1: {
            println(users.toString());
            return;
        }
    }

}

Suspend-функция, в нашем случае fetchUsers, поделит выполнение кода на две части. Код, который находится в ней и перед вызовом этой функции, будет относиться к первой части, в то время как код, который следует после вызова функции, будет относиться ко второй части.

В зависимости от значения переменной label, переданной в метод invokeSuspend, будет выполнена соответствующая часть кода. Если label равен 0, то выполнится первая часть кода (fetchUsers). Если label равен 1, то выполнится вторая часть кода (println). Это позволяет отделить вызов fetchUsers от вызова println.

Первый вызов метода invokeSuspend происходит при запуске корутины. В этом вызове будет выполнена первая часть кода (в случае label, равного 0), а затем будет запущена suspend-функция. После завершения работы suspend-функции необходимо сделать второй вызов метода invokeSuspend, чтобы выполнить вторую часть кода, то есть вызов println.

Однако перед вторым вызовом invokeSuspend необходимо изменить значение переменной label с 0 на 1. Это можно сделать в первой части кода перед вызовом suspend-функции.

Так мы рассмотрели основные концепции, связанные с suspend-функциями и Continuation.

Dispatchers

По аналогии с Schedulers в RxJava для управления потоками выполнения операций в Kotlin Coroutines используются так называемые Dispatchers. Dispatchers позволяют указывать, на каком потоке должна выполняться каждая корутина. Kotlin предоставляет несколько встроенных диспетчеров, а также позволяет создавать свои собственные диспетчеры. Рассмотрим более подробно, как работают Dispatchers и как они используются в мобильной разработке.

Существует несколько встроенных диспетчеров, доступных в Kotlin. 

Dispatchers.Main — это диспетчер, который используется для выполнения корутин в главном потоке Android. Он должен применяться для всех операций, которые изменяют пользовательский интерфейс, таких как обновление View, Toast и т.д.

Dispatchers.IO — диспетчер, который используется для ввода-вывода (I/O) операций, таких как чтение или запись файлов, сетевые операции и т.д. Он также имеет доступ к пулу потоков с несколькими потоками.

Dispatchers.Default — это диспетчер, который используется по умолчанию. Он предназначен для выполнения вычислительных задач и имеет доступ к пулу потоков с несколькими потоками. Если вы не указываете явно диспетчер для корутины, она будет выполнена на диспетчере Default.

Dispatchers.Unconfined — это диспетчер, который не ограничивает выполнение корутины каким-либо конкретным потоком. Корутина будет продолжена на том же потоке, на котором была запущена. Этот диспетчер должен использоваться только в очень ограниченном числе случаев, когда корутина может быть запущена и продолжена на любом потоке.

viewModelScope.launch(Dispatchers.IO) {
            screenState.postValue(ScreenState.Loading)
            val users = fetchUsers()
            screenState.postValue(ScreenState.Content(data = users))
}

Давайте подробно разберем, что происходит на данном участке кода:

  1. В viewModelScope при помощи корутин-билдера launch создается новая корутина. Корутина будет выполняться в контексте Dispatchers.IO, что означает выполнение на фоновом потоке.

  2. Внутри созданной корутины начинается асинхронный блок кода, который будет выполняться параллельно с основным потоком.

  3. С помощью screenState.postValue(ScreenState.Loading) изменяется состояние screenState на Loading. Это означает, что экран или интерфейс должен отобразить состояние загрузки данных.

  4. Затем выполняется вызов функции fetchUsers(), которая выполняет операцию получения пользователей из какого-то источника данных (например сетевого запроса или базы данных). Возвращаемое значение сохраняется в переменную users.

  5. Далее с помощью screenState.postValue(ScreenState.Content(data = users)) состояние screenState изменяется на Content с передачей данных пользователей. Это означает, что экран или интерфейс должен отобразить полученные данные.

В заключение предлагаю перейти к сравнению RxJava и Kotlin Coroutines, исходя из рассмотренного материала.

Мы рассмотрели различные аспекты работы с многопоточностью в мобильной разработке, начиная от базовых понятий многопоточности и создания потоков в приложении и заканчивая более продвинутыми инструментами, такими как RxJava и Kotlin Coroutines.

Изучили основные принципы работы с потоками в RxJava и Kotlin Coroutines, а также рассмотрели возможности управления потоками выполнения кода с помощью Schedulers и Dispatchers. Кроме того, изучили различные типы Observable в RxJava.

В итоге мы пришли к выводу, что выбор инструмента для работы с многопоточностью зависит от конкретной задачи и потребностей приложения и что RxJava и Kotlin Coroutines представляют собой два мощных инструмента для работы с асинхронными операциями в мобильной разработке.

Комментарии (0)