Я хочу показать три парадигмы асинхронного программирования — callbacks, futures, coroutines на примере простого веб приложения на фрейморке Vertx. Код будем писать на Котлине.

Допустим у нас есть приложение, которое получает некую строку в HTTP запросе, по ней ищет URL в базе данных, идет по этому URL и его содержимое отправляет обратно клиенту.
Vertx задуман как асинхронный фрейворк для высоконагруженных приложений, использует netty, new IO, event bus

Как принято в Vertx один Verticle (аналог актора, если Вы знаете Akka) получает запрос, отправляет по event bus строку по которой надо искать URL некоему другому BusinessVerticle, который и занимается собственно работой.

object Main {
    @JvmStatic
    fun main(args: Array<String>) {
        val vertx =  Vertx.vertx()
        vertx.deployVerticle(HttpVerticle())
        vertx.deployVerticle(BusinessVerticle())
    }
}

class HttpVerticle : AbstractVerticle() {

    @Throws(Exception::class)
    override fun start(startFuture: Future<Void>) {
        val router = createRouter()

        vertx.createHttpServer()
            .requestHandler(router)
            .listen(8080) { result ->
                if (result.succeeded()) {
                    startFuture.complete()
                } else {
                    startFuture.fail(result.cause())
                }
            }
    }

    private fun createRouter(): Router = Router.router(vertx).apply {
        get("/").handler(handlerRoot)
    }


    private val handlerRoot = Handler<RoutingContext> { rc ->
        vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "") 
               { resp: AsyncResult<Message<String>> ->
                    if (resp.succeeded()) {
                        rc.response().end(resp.result().body())
                    } else {
                       rc.fail(500)
                   }
        }
    }

}

В стандартном API вся асинхронность делается через колбеки, поэтому начальная имплементация BusinessVerticle будет выглядеть так:

class BusinessVerticle : AbstractVerticle() {


    private lateinit var dbclient: JDBCClient
    private lateinit var webclient: WebClient

    override fun start() {
        vertx.eventBus().consumer<String>("my.addr") { message ->
            handleMessage(message)
        }
        dbclient = JDBCClient.createShared(
            vertx, JsonObject()
                .put("url", "jdbc:postgresql://localhost:5432/payroll")
                .put("driver_class", "org.postgresql.Driver")
                .put("user", "vala")
                .put("password", "vala")
                .put("max_pool_size", 30)
        )

        val options = WebClientOptions()
            .setUserAgent("My-App/1.2.3")

        options.isKeepAlive = false
        webclient = WebClient.create(vertx, options)
    }

    private fun handleMessage(message: Message<String>) {
        dbclient.getConnection { res ->
            if (res.succeeded()) {

                val connection = res.result()

                connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 ->
                    if (res2.succeeded()) {
                        try {
                            val url = res2.result().rows[0].getString("url").removePrefix("http://")
                            webclient
                                .get(url,"/")
                                .send { ar ->
                                    if (ar.succeeded()) {
                                        val response = ar.result()
                                        message.reply(response.bodyAsString())
                                    } else {
                                        message.fail(500, ar.cause().message)
                                    }
                                }

                        } catch (e: Exception) {
                            message.fail(500, e.message)
                        }
                    } else {
                        message.fail(500, res2.cause().message)
                    }
                }
            } else {
                message.fail(500, res.cause().message)
            }
        }
    }

}

Выглядет прямо скажем, так себе — callbacks hell, особенно доставляет обработка ошибок.

Попробуем улучшить ситуацию, как нас учат гуру колбеков — выделением каждого колбека в отдельный метод:

 class BusinessVerticle: AbstractVerticle() {


    private lateinit var dbclient: JDBCClient
    private lateinit var webclient: WebClient

   override fun start() {
        vertx.eventBus().consumer<String>("my.addr") { message ->
            handleMessage(message)
        }
        dbclient = JDBCClient.createShared(
            vertx, JsonObject()
                .put("url", "jdbc:postgresql://localhost:5432/payroll")
                .put("driver_class", "org.postgresql.Driver")
                .put("user", "vala")
                .put("password", "vala")
                .put("max_pool_size", 30)
        )

        val options = WebClientOptions()
            .setUserAgent("My-App/1.2.3")

        options.isKeepAlive = false
        webclient = WebClient.create(vertx, options)
    }

    private fun handleMessage(message: Message<String>) {
        dbclient.getConnection { res ->
            handleConnectionCallback(res, message)
        }
    }

    private fun handleConnectionCallback(
        res: AsyncResult<SQLConnection>,
        message: Message<String>
    ) {
        if (res.succeeded()) {

            val connection = res.result()

            connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 ->
                handleQueryCallBack(res2, message)
            }
        } else {
            message.fail(500, res.cause().message)
        }
    }

    private fun handleQueryCallBack(
        res2: AsyncResult<ResultSet>,
        message: Message<String>
    ) {
        if (res2.succeeded()) {
            try {
                val url = res2.result().rows[0].getString("url").removePrefix("http://")
                webclient
                    .get(url, "/")
                    .send { ar ->
                        handleHttpCallback(ar, message)
                    }

            } catch (e: Exception) {
                message.fail(500, e.message)
            }
        } else {
            message.fail(500, res2.cause().message)
        }
    }

    private fun handleHttpCallback(
        ar: AsyncResult<HttpResponse<Buffer>>,
        message: Message<String>
    ) {
        if (ar.succeeded()) {
            // Obtain response
            val response = ar.result()
            message.reply(response.bodyAsString())
        } else {
            message.fail(500, ar.cause().message)
        }
    }

}

Ну что, стало лучше. Но тоже так себе.

Много строчек, не особо читаемый код, надо тащить за собой объект message для ответа, размазанная по коду обработка ошибок.

Попробуем переписать эту хрень пользуясь Futures
Особенно хороши Futures тем, что их можно легко комбинировать пользуясь Future.compose()

Для начала переведем стандартные методы Vertx, которые принимают колбек и ничего не возвращают, в методы, которые возвращают Future.

Воспользуемся возможностью Котлина добавлять методы в существующие классы:


fun JDBCClient.getConnectionF(): Future<SQLConnection> {
    val f = Future.future<SQLConnection>()
    getConnection { res ->
        if (res.succeeded()) {
            val connection = res.result()
            f.complete(connection)
        } else {
            f.fail(res.cause())
        }
    }
    return f
}

fun SQLConnection.queryF(query:String): Future<ResultSet> {
    val f = Future.future<ResultSet>()
    query(query) { res ->
        if (res.succeeded()) {
            val resultSet = res.result()
            f.complete(resultSet)
        } else {
            f.fail(res.cause())
        }
    }
    return f
}

fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> {
    val f = Future.future<HttpResponse<M>>()
    send() { res ->
        if (res.succeeded()) {
            val response = res.result()
            f.complete(response)
        } else {
            f.fail(res.cause())
        }
    }
    return f
}

И превратим наш BusinessVerticle.handleMessage в такое:

  private fun handleMessage(message: Message<String>) {
        val content = getContent(message)

        content.setHandler{res->
            if (res.succeeded()) {
                // Obtain response
                val response = res.result()
                message.reply(response)
            } else {
                message.fail(500, res.cause().message)
            }
        }

    }

    private fun getContent(message: Message<String>): Future<String> {
        val connection = dbclient.getConnectionF()
        val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") }
        val url = resultSet.map { it.rows[0].getString("url").removePrefix("http://") }
        val httpResponse = url.compose { webclient.get(it, "/").sendF() }
        val content = httpResponse.map { it.bodyAsString() }
        return content
    }

Выглядит классно.

Простой, читаемый код. Обработка ошибок в одном месте. Если надо, можно сделать разную реакцию на разные исключения, или, скажем, вынести в отдельную функцию.

Мечта поэта!

Но что будет, если нам надо по какому то условию прекратить цепочку Futures?
Например если нет соответствующей записи в БД мы хотим не выбросить исключение (и код 500 клиенту), а вернуть строку «No record» с кодом 200.

Единственный способ (который я знаю) прекратить цепочку из Future.compose() — выбросить исключение.

Т.е. надо сделать что то такое: определить свой тип исключения, выбросить это исключение если нет записи в базе, обработать это исключение особенным образом.

class NoContentException(message:String):Exception(message)

 private fun getContent(message: Message<String>): Future<String> {
        val connection = dbclient.getConnectionF()
        val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") }
        val url = resultSet.map {
            if (it.numRows<1)
                throw NoContentException("No records")
            it.rows[0].getString("url").removePrefix("http://")
        }
        val httpResponse = url.compose { webclient.get(it, "/").sendF() }
        val content = httpResponse.map { it.bodyAsString() }
        return content
    }
    
    private fun handleMessage(message: Message<String>) {
        val content = getContent(message)

        content.setHandler{res->
            if (res.succeeded()) {
                // Obtain response
                val response = res.result()
                message.reply(response)
            } else {
                if (res.cause() is NoContentException)
                    message.reply(res.cause().message)
                else
                    message.fail(500, res.cause().message)
            }
        }
    }

Работает!

Но уже выглядит хуже — использовать исключения для управления потоком исполнения — не красиво. А если таких случаев, которые требуют отдельной обработки будет много — код станет намного менее читаемым.

Попробуем сделать то же самое с корутинами котлина.
Про корутины написано много, в том числе на Хабре (1,2,3,...) поэтому отдельно про них писать не буду.

В Vertx последних версий автоматически генерируются корутинные версии всех методов, которые должны принимать колбек.

Подключаем библиотеки
'vertx-lang-kotlin-coroutines'
'vertx-lang-kotlin'

и получаем, например

JDBCClient.getConnectionAwait()
SQLConnection.queryAwait()

и т.д.

Тогда наши методы обработки сообщения превращаются во что то милое и простое:

private suspend fun handleMessage(message: Message<String>) {
        try {
            val content = getContent(message)
            message.reply(content)
        } catch(e:Exception){
            message.fail(500, e.message)
        }

    }

    private suspend fun getContent(message: Message<String>): String {
        val connection = dbclient.getConnectionAwait()
        val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'")
        val url =  resultSet.rows[0].getString("url").removePrefix("http://")
        val httpResponse = webclient.get(url, "/").sendAwait()
        val content = httpResponse.bodyAsString()
        return content
    }

ну и надо изменить вызов предоставив контекст корутины:

vertx.eventBus().consumer<String>("my.addr") { message ->
           GlobalScope.launch(vertx.dispatcher()) {  handleMessage(message)}
        }


Что тут происходит?

Все эти методы с Await вызывают код асинхронно, ждут от него результата, а пока ждут, поток (thread) переключается на исполнение другой корутины.

Если мы посмотрим под капот, то это выглядит так:

suspend fun SQLClient.getConnectionAwait(): SQLConnection {
  return awaitResult {
    this.getConnection(it)
  }
}

suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T {
  val asyncResult = awaitEvent(block)
  if (asyncResult.succeeded()) return asyncResult.result()
  else throw asyncResult.cause()
}

suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T {
  return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
    try {
      block.invoke(Handler { t ->
        cont.resume(t)
      })
    } catch (e: Exception) {
      cont.resumeWithException(e)
    }
  }
}

Чем то похоже на нашу самописную реализацию с Futures.

Но тут мы получаем нормальный код — String как возвращаемый тип (вместо Future), try/catch вместо уродливого колбека с AsyncResult

И если нам надо прекратить посредине цепочку исполнения, это выглядит естественно, без всяких исключений:

  private suspend fun getContent(message: Message<String>): String {
        val connection = dbclient.getConnectionAwait()
        val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'")
        if (resultSet.numRows<1)
            return "No records"
        val url =  resultSet.rows[0].getString("url").removePrefix("http://")
        val httpResponse = webclient.get(url, "/").sendAwait()
        val content = httpResponse.bodyAsString()
        return content
    }

По моему, прекрасно!

Комментарии (4)


  1. mystdeim
    23.04.2019 11:53
    +1

    Корутины лучше вызывать на vertx диспатчере:

    GlobalScope.launch(vertx.dispatcher())
    А в случае когда корутина работает в хендлере — это делать обязательно чтобы она крутилась тоже в event loop


    1. javax Автор
      23.04.2019 11:54

      Спасибо!
      А почему важно

      чтобы она крутилась тоже в event loop
      ?


      1. mystdeim
        23.04.2019 12:03
        +1

        не потоко-безопасно будет, тут правило такое: передавать/обрабатывать данные через event loop, а иначе у вас получится, что корутина возьмёт поток из common pool и будет на нём обрабатывать


        1. javax Автор
          23.04.2019 12:06

          Понял, спасибо, исправляю