«Корутины - это легковесные потоки», сколько раз вы слышали эту формулировку? Она что-нибудь вам говорит? Скорее всего не очень много. Если вы хотите узнать больше о том, как на самом деле корутины выполняются в рантайме Android, как они связаны с потоками, а также о проблемах параллелизма, которые неизбежны при использовании потоковой модели языка программирования Java, то эта статья для вас.

Корутины и потоки

Корутины призваны упростить код, который выполняется асинхронно. Когда мы говорим о корутинах в рантайме Android, блок кода, переданный в качестве лямбды в билдер корутин, в конечном итоге выполняется в определенном потоке. Например, это простое вычисление Фибоначчи:

// Корутина, которая вычисляет 10-й элемент ряда Фибоначчи в фоновом потоке
someScope.launch(Dispatchers.Default) {
   val fibonacci10 = synchronousFibonacci(10)
   saveFibonacciInMemory(10, fibonacci10)
}

private fun synchronousFibonacci(n: Long): Long { /* … */ }

Вышеупомянутый асинхронный (async) блок кода корутины, который выполняет синхронное блокирующее вычисление фибоначчи и сохраняет его в памяти, диспатчится и планируется для выполнения в пуле потоков (thread pool), управляемом библиотекой корутин, настроенной для Dispatchers.Default. Код будет выполняться в потоке из пула потоков в какой-то момент в будущем в зависимости от его политик.

Обратите внимание, что приведенный выше код полностью выполняется в одном потоке, потому что его нельзя приостановить. Корутина может выполняться в разных потоках, если выполнение перемещается в другой диспатчер (dispatcher), или если блок содержит код, который можно передавать (yield)/приостанавливать (suspend) в диспtтчере, который использует пул потоков.

Точно так же вы можете выполнить приведенную выше логику без корутин с помощью прямого использования потоков следующим образом:

// Создаем пул из 4 потоков
val executorService = Executors.newFixedThreadPool(4)

// Планируем на выполнение этот код в одном из этих потоков
executorService.execute {
   val fibonacci10 = synchronousFibonacci(10)
   saveFibonacciInMemory(10, fibonacci10)
}

Хотя управление пулами потоков вручную возможно, корутины являются рекомендуемым решением для асинхронного программирования на Android благодаря встроенной поддержке отмены и упрощению обработки ошибок, структурированному параллелизму, снижающему вероятность утечек памяти, и его интеграции с библиотеками Jetpack.

Под капотом

Что происходит с момента создания корутины до ее выполнения в потоке? Когда вы создаете корутину с помощью стандартных билдеров корутин, вы можете указать, на каком CoroutineDispatcher ее запускать; в противном случае используется Dispatchers.Default .

CoroutineDispatcher отвечает за диспетчеризацию выполнения корутины в поток. Под капотом, когда используется CoroutineDispatcher, он перехватывает корутину, используя метод interceptContinuation, который оборачивает Continuation (то есть корутину) в DispatchedContinuation. Это возможно, потому что CoroutineDispatcher реализует ContinuationInterceptor интерфейс.

Если вы читали мою статью о том, как корутины работают под капотом, вы уже знаете, что компилятор создает стейтмашину, а информация о ней (например, что нужно выполнить дальше) хранится в объекте Continuation.

Метод resumeWith класса DispatchedContinuation отвечает за диспетчеризацию корутины к соответствующему диспатчеру в случае, если Continuation должна быть выполнена в другом диспатчере!

Кроме того, DispatchedContinuation наследуется от абстрактного класса DispatchedTask, который в реализации языка программирования Java, является типом, реализующим интерфейс Runnable. Следовательно, DispatchedContinuation может выполняться в потоке! Разве это не круто? Когда указан CoroutineDispatcher, корутина преобразуется в DispatchedTask, который диспатчится для выполнения в потоке как Runnable!

А теперь… Как вызывается метод dispatch при создании корутины? Когда вы создаете корутину с использованием стандартных билдеров корутин, вы можете указать, как корутина запускается с помощью параметра start типа CoroutineStart. Например, вы можете указать ей запускаться только тогда, когда это необходимо, с помощью CoroutineStart.LAZY. По умолчанию используется CoroutineStart.DEFAULT, который планирует выполнение корутины в соответствии с ее CoroutineDispatcher. Бинго!

Иллюстрация того, как блок кода в корутине попадает на выполнение в поток
Иллюстрация того, как блок кода в корутине попадает на выполнение в поток

Иллюстрация того, как блок кода в корутине попадает на выполнение в поток

Диспатчеры и пулы потоков

Вы можете выполнять корутины в любом из пулов потоков вашего приложения, преобразовав их в CoroutineDispatcher с помощью функции расширения Executor.asCoroutineDispatcher(). В качестве альтернативы вы можете использовать диспатчеры по умолчанию (Dispatchers), которые входят в библиотеку корутин.

Вы можете посмотреть, как Dispatchers.Default инициализируется в методе createDefaultDispatcher. По умолчанию используется DefaultScheduler. Если вы ознакомитесь с реализацией Dispatchers.IO, он также использует DefaultScheduler и позволяет создавать по запросу не менее 64 потоков. Dispatchers.Default и Dispatchers.IO неявно связаны друг с другом, поскольку они используют один и тот же пул потоков, что плавно подводит меня к следующей теме. Каковы накладные расходы во время выполнения при вызове withContext с разными диспатчерами?

Потоки и производительность withContext

Если в рантайме Android создано больше потоков, чем доступно ядер ЦП, переключение между потоками сопряжено с некоторыми накладными расходами во время выполнения. Переключение контекста не из дешевых! ОС должна сохранять и восстанавливать контекст выполнения, а ЦП должен тратить время на планирование потоков, а не на выполнение реальной работы приложения. Кроме того, переключение контекста может произойти, если поток выполняет блокирующий код. Если так обстоят дела с потоками, есть ли какое-либо снижение производительности при использовании withContext с разными диспатчерами?

К счастью, как вы могли догадаться, пулы потоков берут на себя всю сложность управления этим процессом, пытаясь оптимизировать работу, чтобы она выполнялась максимально эффективно (поэтому выполнение работы в пуле потоков лучше, чем выполнение в потоках вручную). От этого также выигрывают и корутины, поскольку они планируются в пулах потоков! Вдобавок ко всему, корутины не блокируют потоки, они вместо этого приостанавливают (suspend) их работу! Это еще эффективнее!

CoroutineScheduler, который является пулом потоков используемым в реализации языка программирования Java по умолчанию, выполняет дистрибьюцию задиспатченых корутин рабочим потокам наиболее эффективным образом. Поскольку Dispatchers.Default и Dispatchers.IO используют один и тот же пул потоков, переключение между ними оптимизировано, чтобы избежать переключения потоков, когда это возможно. Библиотека корутин может оптимизировать эти вызовы, оставаться в том же диспатчере и потоке и следовать fast-path.

Поскольку Dispatchers.Main обычно является другим потоком в приложениях пользовательского интерфейса, переключение между Dispatchers.Default и Dispatchers.Main в корутинах не сопряжено с большими потерями производительности, поскольку корутина просто приостанавливается (то есть перестает выполняться в одном потоке) и становится запланированной на выполнение в другом потоке.

Проблемы параллелизма в корутинах

Корутины ДЕЙСТВИТЕЛЬНО упрощают асинхронное программирование из-за того, насколько простым становится планирование работы в разных потоках. С другой стороны, эта простота может быть палкой о двух концах: поскольку корутины основаны на потоковой модели языка программирования Java, они не могут просто взять и избежать проблем параллелизма, которые влечет за собой эта многопоточная модель. Таким образом, вы должны быть внимательны, чтобы самому их избегать.

За прошедшие годы хорошие практики, такие как иммутабельность, позволили смягчить некоторые проблемы, связанные с потоками, с которыми вы можете столкнуться на практике. Однако есть некоторые случаи, которых нельзя избежать с помощью иммутабельности. Мать всех проблем параллелизма - это управление состоянием! В частности, доступ к мутабельному состоянию в многопоточной среде.

Порядок операций в многопоточном приложении непредсказуем. Помимо оптимизации компилятора, которая может переупорядочить операции, запуск потоков в определенном порядке не гарантирован, а переключение контекста может происходить в любое время. Если при доступе к мутабельному состоянию не приняты необходимые меры предосторожности, потоки, помимо всего прочего, могут видеть устаревшие данные, потерять обновления или пострадать от состояния гонки.

Обратите внимание, что дискуссии на тему мутабельного состояния и порядка доступа не относятся исключительно к языку программирования Java. Они также влияют на корутины в других платформах.

Приложение, использующее корутины, по своей природе является многопоточным. Классы, которые используют корутины и содержат мутабельное состояние, должны принимать меры предосторожности для обеспечения предсказуемости, то есть гарантировать, что код, выполняемый в корутинах, видит самую последнюю версию данных. Таким образом, разные потоки не будут мешать друг другу. Проблемы с параллелизмом могут привести к очень тонким ошибкам, которые очень сложно дебажить в ваших приложениях, даже к гейзенбагам!

Подобные классы не редкость. Возможно, классу необходимо сохранить информацию о вошедшем в систему пользователю в памяти или кэшировать некоторые значения, пока приложение работает. Проблемы с параллелизмом могут возникать и в корутинах, если вы не будете осторожны! Не гарантировано, что функция приостановки (suspend) с использованием withContext(defaultDispatcher) всегда будет выполняться в одном и том же потоке!

Допустим, у нас есть класс, который кэширует транзакции пользователей. Если доступ к кэшу не происходит должным образом, как, например, ниже, могут возникнуть ошибки параллелизма:

class TransactionsRepository(
  private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default
) {

  private val transactionsCache = mutableMapOf<User, List<Transaction>()

  private suspend fun addTransaction(user: User, transaction: Transaction) =
   // ОСТОРОЖНО! Доступ к кэшу не защищен.
   // Возможны ошибки параллелизма: потоки могут видеть устаревшие данные
   // и может возникнуть состояние гонки.
    withContext(defaultDispatcher) {
      if (transactionsCache.contains(user)) {
        val oldList = transactionsCache[user]
        val newList = oldList!!.toMutableList()
        newList.add(transaction)
        transactionsCache.put(user, newList)
      } else {
        transactionsCache.put(user, listOf(transaction))
      }
    }
}

Даже если мы говорим о Kotlin, книга «Параллелизм в Java на практике» Брайана Гетца - отличный ресурс, чтобы узнать больше об этой теме и тонкостях параллелизма в системах языка программирования Java. Кроме того, у Jetbrains есть документация об общем мутабельном состоянии и параллелизме.

Защита мутабельного состояния

Как защитить мутабельное состояние или найти хорошую политику синхронизации, полностью зависит от природы данных и задействованных операций. В этом разделе рассказывается о проблемах параллелизма, с которыми вы можете столкнуться, вместо того, чтобы перечислять все различные способы и API для защиты мутабельного состояния. Тем не менее, вот несколько советов и API, с которых вы можете начать, чтобы сделать ваши мутабельные переменные потокобезопасными.

Инкапсуляция

Мутабельное состояние должно быть инкапсулировано и принадлежать классу. Этот класс централизует доступ к состоянию и будет защищать операции чтения и записи с помощью политики синхронизации, которая лучше соответствует варианту использования.

Привязка к потоку

Решением может быть ограничение доступа к чтению/записи всего до одного потока. Доступ к мутабельному состоянию может быть выполнен посредством производителя/потребителя (producer/consumer) с использованием очереди. У JetBrains есть хорошая документация по этой теме.

Не изобретайте колесо

В рантайме Android есть потокобезопасные структуры данных, которые можно использовать для защиты мутабельных переменных. Например, если вам нужен простой счетчик вы можете использовать AtomicInteger. Или, чтобы защитить map из фрагмента кода выше, вы можете использовать ConcurrentHashMap. ConcurrentHashMap - это потокобезопасная синхронизированная коллекция, которая оптимизирует пропускную способность чтения и записи в map.

Обратите внимание, что потокобезопасные структуры данных не защищают от проблем с упорядочением вызывающих объектов, они просто обеспечивают атомарный доступ к памяти. Они помогают избежать использования блокировок, если логика не слишком сложна. Например, их нельзя использовать в примере с transactionCache, показанном выше, потому что порядок операций и логика между ними требуют защиты потоков и доступа к данным.

Кроме того, данные в этих потокобезопасных структурах данных должны быть иммутабельными или защищенными, чтобы предотвратить состояние гонки при изменении уже хранящихся в них объектов.

Индивидуальные решения

Если у вас есть сложные действия, которые необходимо синхронизировать, @Volatile переменные или потокобезопасные структуры данных не помогут! И возможно, что встроенная аннотация @Synchronized недостаточно детализирована, чтобы сделать ваш вариант использования эффективным.

В этих случаях вам может потребоваться создать собственный механизм синхронизации с использованием параллельных утилит, таких как защелки (latches), семафоры (semaphores) или барьеры (barriers). В других случаях вы можете безоговорочно защитить многопоточный доступ к коду с помощью блокировок (locks) или мьютексов (mutexes).

Mutex в Котлин имеет функции приостановки lock и unlock, чтобы вы могли вручную защитить части кода вашей корутины. Удобно, что функция расширения Mutex.withLock упрощает нам работу:

class TransactionsRepository(
  private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default
) {

  // Мьютекс, защищающий мутабельное состояние кэша
  private val cacheMutex = Mutex()
  private val transactionsCache = mutableMapOf<User, List<Transaction>()

  private suspend fun addTransaction(user: User, transaction: Transaction) =
    withContext(defaultDispatcher) {
      // Мьютекс, защищающий мутабельное состояние кэша
      cacheMutex.withLock {
        if (transactionsCache.contains(user)) {
          val oldList = transactionsCache[user]
          val newList = oldList!!.toMutableList()
          newList.add(transaction)
          transactionsCache.put(user, newList)
        } else {
          transactionsCache.put(user, listOf(transaction))
        }
      }
    }
}

Корутина, приостанавливающая выполнение до тех пор, пока оно не будет продолжено, с использованием Mutex, намного эффективнее, чем блокировка языка программирования Java, которая полностью блокирует поток. Будьте осторожны при использовании классов синхронизации языка программирования Java в корутинах, так как это может заблокировать поток, в котором выполняется корутина, и тем самым создать проблемы с жизнеспособностью.

Блок кода, переданный билдеру корутин, в конечном итоге выполняется в одном или нескольких потоках. Таким образом, корутины работают в потоковой модели рантайма Android со всеми ее ограничениями. С помощью корутин все еще можно написать неправильный уязвимый многопоточный код. Так что следите за доступом к общему мутабельному состоянию в вашем коде!


Перевод статьи подготовлен в преддверии старта курса Android Developer. Basic.

Также приглашаем всех желающих записаться на бесплатный демо-урок по теме: "Хранение данных. Room"

Узнать подробнее о курсе Android Developer. Professional