Я думаю сейчас не осталось людей, незнакомых с корутинами в Kotlin. Волшебный инструмент, согласны? Ещё более волшебным в них я нахожу возможно вынести вычисление в другой поток:

fun main() = runBlocking {
	println("Hello from ${Thread.currentThread().name}")
  withContext(Dispatchers.Default) {
    println("Hello from ${Thread.currentThread().name}")
	}

	println("Welcome back to ${Thread.currentThread().name}")
}

// Результат:
// Hello from main
// Hello from DefaultDispatcher-worker-2
// Welcome back to main

Всего одна строка и мы получаем привет из другого потока. Как работает этот механизм? На самом деле до боли просто, если рассмотреть CoroutineDispatcher, можно заметить там два важных метода:

public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)

Первый отвечает за необходимость вызова dispatch, а вот второй немного интереснее, он отвечает за выполнение переданного в block Runnable в другом потоке. Стоит обратить внимание, что dispatch должен гарантировать выполнение block, иначе можно словить deadlock и корутина никогда не продолжит выполнение, так же метод не должен не отложено выполнять block, при необходимости продолжить выполнение на текущем потоке лучше вернуть false из isDispatchNeeded.

Стандарт?

У нас есть 4 стандартных диспатера, доступ к которым можно получить из класса Dispatchers: Default, Unconfined, Main, IO.

Unconfined: Самым простым из них является Unconfined, который не будет менять поток выполнения и код из начала статьи уже перестанет быть таким интересным:

runBlocking(Dispatchers.Unconfined) {
	println("Hello from ${Thread.currentThread().name}")
	withContext(Dispatchers.Default) {
    println("Hello from ${Thread.currentThread().name}")
	}

	println("Welcome back to ${Thread.currentThread().name}")
}

// Результат:
// Hello from main
// Hello from DefaultDispatcher-worker-2
// Welcome back to DefaultDispatcher-worker-2)

Достигается это за счет isDispatchNeeded который вечно возвращает false и не допускает вызова dispatch(Справедливости ради стоит отметить, что dispatch всё таки может быть вызван из yield(), но это уже совсем другая история).

Default, IO: Эти два реализованы на основе ExecutorCoroutineDispatcher с сложностями в виде реализации своего Executor'а, Default наследует SchedulerCoroutineDispatcher, который выполняет задачи отправляя их в CoroutineScheduler, это кастомный Executor, с пулом потоков, равному количеству потоков процессора(минимум 2) для CPU-bound задач, который может расширяться до maxPoolSize, равный системному параметру kotlinx.coroutines.scheduler.max.pool.size, или 2097150 максимум, для блокирующих задач(IO). IO Dispatcher работает через Default ограничивая себя потоками равными системному параметру kotlinx.coroutines.io.parallelism или количеству потоков процессора(минимум 64)). CoroutineScheduler должен понимать блокирующая задача или нет и реализуется это методом dispatchWithContext у SchedulerCoroutineDispatcher, где явно указывается тип задачи: BlockingContext из IO и NonBlocking для любых задач из Default.

Main: То, ради чего всё начиналось. Сами корутины(coroutine-core) не предоставляют реализацию MainCoroutineDispatcher, а лишь только механизм его загрузки. Загрузкой занимаются класс MainDispatcherLoader, который использует ServiceLoader и FastServiceLoader(Используется только для Android), который явно пытается инициализировать kotlinx.coroutines.android.AndroidDispatcherFactory. Если MainDispatcherLoader не найдет реализаций MainDispatcherFactory или createDispatcher выкинет исключение, будет создан стандартный MissingMainCoroutineDispatcher, выкидывающий на всё исключения.

Рассмотрим реализацию в Android:

В Android MainCoroutineDispatcher реализован на основе Handler, инициализацией занимается AndroidDispatcherFactory:

override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
	val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
	return HandlerContext(mainLooper.asHandler(async = true))
}

@VisibleForTesting
internal fun Looper.asHandler(async: Boolean): Handler {
	// Async support was added in API 16.
	if (!async || Build.VERSION.SDK_INT < 16) {
		return Handler(this)
	}
	if (Build.VERSION.SDK_INT &gt;= 28) {
    // TODO compile against API 28 so this can be invoked without reflection.
    val factoryMethod = Handler::class.java.getDeclaredMethod("createAsync", Looper::class.java)
    return factoryMethod.invoke(null, this) as Handler
	}

	val constructor: Constructor&lt;Handler&gt;
	try {
    constructor = Handler::class.java.getDeclaredConstructor(Looper::class.java,
        Handler.Callback::class.java, Boolean::class.javaPrimitiveType)
	} catch (ignored: NoSuchMethodException) {
    // Hidden constructor absent. Fall back to non-async constructor.
    return Handler(this)
	}
	return constructor.newInstance(this, null, true)
}

Сам HandlerContext реализует MainCoroutineDispatcher с Delay и выносит выполнение на главный поток используя Handler::post:

override fun dispatch(context: CoroutineContext, block: Runnable) {
	if (!handler.post(block)) {
		cancelOnRejection(context, block)
	}
}

Delay же нужен для переопределения механизма работы функции delay(), которая по умолчанию работает на выделенном потоке, на Android же это будет работать через handler.postDelayed. Так же тут можно посмотреть на реализацию isDispatchNeeded, который для MainCoroutineDispatcher.immediate не будет вызывать dispatch при условии, что вы уже на главном потоке.

Своя реализация MainCoroutineDispatcher: Мне стало интересно, как же затащить корутины в уже существующий проект на Java, с уже реализованным Event-Loop'ом. Благо, у меня есть игровой сервер для экспериментов полностью написанный на Java, работающий в несколько потоков с Event-Loop на главном. Начать стоит с реализации MainCoroutineDispatcher:

internal class ServerDispatcher(
	private val invokeImmediately: Boolean
) : MainCoroutineDispatcher() {
	@Volatile
	private var _immediate = if (invokeImmediately) this else null

	override val immediate = _immediate ?: ServerDispatcher(true).also { _immediate = it }

	override fun isDispatchNeeded(context: CoroutineContext): Boolean =
    !invokeImmediately || !Server.getInstance().isPrimaryThread

	override fun dispatch(context: CoroutineContext, block: Runnable) {
    Server.getInstance().scheduler.scheduleTask(block)
	}

	override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
    throw UnsupportedOperationException("limitedParallelism is not supported for ${this::class.qualifiedName}")
	}
}

Тут isDispatchNeeded ничем не отличается от такового в Android, различие тут в dispatch, который кладет Runnable в очередь задач, которые разбираются и выполняются в цикле на главном потоке. Смотрим как работает:

val scope = CoroutineScope(ServerDispatcher(invokeImmediately = false) + SupervisorJob())
scope.launch {
	logger.info("First message from coroutine!!")
	delay(3000)
	logger.info("Message from coroutine after 3000ms delay on ${Thread.currentThread().name} thread!")
	withContext(Dispatchers.IO) {
    logger.info("Message from other context: ${Thread.currentThread().name}")
	}

	logger.info("You again on ${Thread.currentThread().name} thread!!!")
}

// Результат:
// 16:04:55 [INFO ] First message from coroutine!!
// 16:04:58 [INFO ] Message from coroutine after 3000ms delay on main thread!
// 16:04:58 [INFO ] Message from other context: DefaultDispatcher-worker-1
// 16:04:58 [INFO ] You again on main thread!!!

Убедились, что всё работает, теперь пора сделать загрузку. Создаем фабрику:

class ServerDispatcherFactory : MainDispatcherFactory {
	override val loadPriority: Int = Int.MAX_VALUE
	override fun createDispatcher(
    allFactories: List<MainDispatcherFactory>,
  ): MainCoroutineDispatcher = ServerDispatcher(invokeImmediately = false)
}

Идём в ресурсы и кладем файл kotlinx.coroutines.internal.MainDispatcherFactory в META-INF/services с содержимым:

dev.israpil.coroutines.mygameserver.ServerDispatcherFactory

Проверяем:

val scope = CoroutineScope(Dispatchers.Main.immediate + SupervisorJob())
scope.launch {
	logger.info("First message from coroutine!!")
	delay(3000)
	logger.info("Message from coroutine after 3000ms delay on ${Thread.currentThread().name} thread!")
	withContext(Dispatchers.IO) {
    logger.info("Message from other context: ${Thread.currentThread().name}")
	}

	logger.info("You again on ${Thread.currentThread().name} thread!!!")
}

// Результат:
// 16:04:55 [INFO ] First message from coroutine!!
// 16:04:58 [INFO ] Message from coroutine after 3000ms delay on main thread!
// 16:04:58 [INFO ] Message from other context: DefaultDispatcher-worker-1
// 16:04:58 [INFO ] You again on main thread!!!

Наслаждаемся корутинами.

Комментарии (0)