В своих предыдущих постах я описывал задачи, которые были решены скорее в академических целях. Сегодня я хочу поделиться реальным примером, который работает в продакшене. Речь пойдет о написании сопрограмм, которые можно прервать извне. Изначально мне это понадобилось для реализации механизма deadline или timeout (кому как больше нравится). Согласитесь, довольно часто возникающая задача. На этом примере я продемонстрирую использование еще нескольких возможностей, предоставляемых С++. Речь пойдет об await_transform и конструкторе объекта promise_type.

В UNIX системах вы легко можете прервать любой процесс в любой момент, отправив ему сигнал. И это замечательная возможность! Система гарантирует освобождение всей занятой им памяти и иных ресурсов.

Но вот уже с потоком этот фокус не пройдет — система просто не знает, какие ресурсы процесса принадлежат потоку и должны быть освобождены, а какие еще понадобятся. Обычно такую задачу решают периодической проверкой некого флага, сообщающего о необходимости завершить поток. Проверяя флаг, вы сами решаете, что нужно сделать для корректного завершения.

Корутины можно рассматривать как легковесные потоки. Во многом подходы к принудительному завершению потоков и корутин схожи.

В этот раз не обойдется без boost::asio. Но он будет использован исключительно в демонстрационных целях — в детали его работы я углубляться сильно не буду. А начну по традиции с конца. Давайте напишем простое приложение, которое может быть завершено нажатием CTRL+C. Как раз в нем получилось больше boost, чем во всём остальном коде.

task coroutine() {
  // вся логика приложения вынесена в эту функцию. По понятным причинам
  // main() не может быть корутиной - программа просто завершилась бы
  // при первом же вызове co_await. Но это только теория, компилятор не
  // позволит вам проверить это на практике
  // ...
}

int main(int argc, char *argv[]) {
  // эта переменная - сердце boost::asio обеспечивающее связь вашего
  // приложения и операций ввода/вывода операционной системы
  boost::asio::io_context context;
  // SIGINT - это тот сигнал, который получит ваше приложение, когда вы нажмете CTRL+C
  boost::asio::signal_set signals(context, SIGINT);
  // запускаем наш "настоящий main"
  auto job = coroutine();
  // регистрируем в boost::asio наш обработчик сигнала SIGINT
  signals.async_wait([job = std::move(job)] (auto code) {
    // при получении сигнала прерываем наше приложение
    job.terminate();
  });
  // context.run() будет выполняться до тех пор, пока не будут обработаны
  // все зарегистрированные в этом context события. Так что, мы получим
  // красивый так называемый graceful shutdown
  // на самом деле, я так не делаю. Дело в том, что такой подход лишает
  // ваше приложение возможности завершиться по собственному желанию. В
  // context будет висеть и ждать своего часа обработчик сигнала. Эта
  // проблема может быть решена различными способами, но для
  // упрощения примера мы не будем рассматривать данную ситуацию
  context.run();
}

Что же дают нам для реализации подобного механизма корутины С++? И всё, и ничего. В комментариях к моей статье генераторы на корутинах С++ мне задавали вопрос: что случится, если генератор (корутину) не вызовут после очередного co_yield? Ответ был прост — всё будет хорошо, если все ресурсы внутри генератора освобождаются при помощи RAII. Завершаясь, корутина уничтожит все локальные переменные, вызывая для них деструкторы. Поэтому, используя идиому RAII, вы как бы объясняете компилятору, какие ресурсы принадлежат корутине. А использованные вами операторы co_await и co_yield будут как раз теми точками, где вы будете проверять флаг — требование завершить корутину. На самом деле всё еще круче! Вместо того, чтобы думать о том, как завершить корутину, вам надо просто её не возобновлять. То есть, не делать ничего. Компилятор всё сделает за вас. Казалось бы — успех! Ставим точку, дальше и писать не о чем :)

В действительности это хорошо сработает только при использовании co_yield.

Не всё так просто или что делать, если вам надоело ждать

В чем же проблема co_await ? Я уже касался этой проблемы в другой моей статье про каналы на С++. Дело в том, что co_await не просто ждёт возобновления корутины, он ждет возникновения некого события (например чтения при помощи boost::asio::async_read). И это событие неплохо было бы отменить. Во-первых — чтобы не загружать приложение ненужной работой. А во вторых awaitable объект может использовать ссылки на локальные переменные корутины. Когда awaitable дождется события и начнет сохранять полученные данные в несуществующий уже буфер, в лучшем случае вы получите аварийное завершение приложения.

Почему в лучшем? Потому что будет хотя бы ясно где ошибка. Если вам «повезет», и вы просто перезапишите данные в какой-нибудь случайной переменной, которой не повезло получить область памяти с тем же адресом, найти источник проблемы будет гораздо сложнее.

Находить подобные проблемы очень помогает address sanitizer. Я настоятельно рекомендую отладочные сборки делать именно с ним.

Для решения этой проблемы я решил сделать свои awaitable объекты прерываемыми. И для этого добавил в них еще один метод — on_terminate. Давайте рассмотрим пример реализации операции sleep. Она может выглядеть следующим образом:

template <typename Duration>
auto sleep(Duration duration) {
  struct [[nodisacrd]] awaitable {
    // для асинхронного sleep я использовал таймер из boost
    boost::asio::steady_timer timer;
    // резервируем в нашем awaitable объекте место под результат
    boost::system::error_code error{};
    
    bool await_ready() const { return false; }
    void await_suspend(std::coroutine_handle<> coro) {
      // регистрируем событие в io_context, ссылку на который мы 
      // неявно получили через executor при создании steady_timer
      // смотри ниже
      timer.async_wait([coro] (auto ec) mutable {
        if (!error) { error = ec; }
        // при получении события возобновляем корутину
        coro.resume();
      });
    }
    void on_terminate(boost::system::error_code ec) {
      error = ec;
      // cancel прервет текущую асинхронную операцию, что приведет к вызову
      // callback (нашей лямбды) с ec = boost::asio::error::operation_aborted
      timer.cancel();
    }
    // нашему awaitable нечего возврящать, но в случае
    // возникновения ошибки он выбрасывает исключение
    void await_resume() {
      if (error) { throw boost::system::system_error(error); }
    }
  };
  // на самом деле, не очень честно заводить таймер здесь, потому что
  // отсчет времени начнется с момента создания awaitable объекта, а не
  // с момента вызова co_await. В некоторых случаях это может быть
  // существенно
  // еще вы можете спросить: что такое executor и откуда он взялся?
  // это переменная типа boost::asio::any_io_executor, обычно я передаю
  // её в подобные функции параметром, но всё больше склоняюсь к мысли
  // использовать для этого глобальные thread_local переменные
  return awaitable{.timer = boost::asio::steady_timer{executor, std::forward<Duration>(duration)}};
};

boost позволяет любой вызов асинхронный вызов (async_XXX) превратить в awaitable объект при помощи механизма completion token. Для этого надо в качестве callback передать boost::asio::use_awaitable. Но в нашем случае это ничем нам не поможет.

Для того, чтобы понять как этот метод нам поможет, давайте теперь напишем наш task. Идея в следующем: при каждом вызове co_await мы будем запоминать ссылку на awaitable объект и в случае вызова terminate — вызывать метод on_terminate.

В этом нам как раз и поможет метод promise_type::await_transform(). Если он определен, то перед каждом вызовом co_await в него передается awaitable объект. Метод, в свою очередь, должен вернуть тоже awaitable объект (может вернуть тот же, может другой). И именно этот, полученный от await_transform объект, будет передан в co_await.

class task {
public:
  struct promise_type {
    struct state_t {
      // функтор для вызова on_terminate из текущего awaitable объекта
      std::function<void(boost::system::error_code)> callback;
    };
    std::shared_ptr<state_t> state = std::make_shared<state_t>();
    // никаких лишних остановок корутины ни до
    std::suspend_never initial_suspend() const noexcept { return {}; }
    // ни после
    std::suspend_never final_suspend() const noexcept { return {}; }
    // нашему task надо иметь ссылку на состояние, чтобы вызвать on_terminate
    auto get_return_object() { return task{state}; }
    // корутина ничего не возвращает
    void return_void() { }
    // просто игнорируем все непойманные исключения,
    // они будут приводить к корректному завершению корутин
    void unhandled_exception() { }
    
    template <typename Awaitable>
    auto await_transform(Awaitable &&awaitable) {
      // методы wrapper вызывают соответствующие методы из базового awaitable
      struct [[nodiscard]] wrapper {
        Awaitable base;
        std::shared_ptr<state_t> m_state;
        
        auto await_ready() { return base.await_ready(); }
        auto await_suspend(std::experimental::coroutine_handle<> coro) {
          // сохраняем ссылку на awaitable через замыкание
          m_state->callback = [this] (boost::system::error_code ec) {
            base.on_terminate(ec);
          });
          return base.await_suspend(coro);
        }
        auto await_resume() {
          return base.await_resume();
        }
      };
      return wrapper{std::forward<Awaitable>(awaitable), state};
    }
  };
  
  // как только корутина завершится, promise_type будет уничтожен
  // наш weak_ptr будет указывать на null. В остальных случаях,
  // при однопоточной работе, state будет указывать на текущий
  // awaitable объект, который прервал выполнение корутины
  void terminate(boost::system::error_code ec = boost::asio::error::interrupted) {
    if (auto ptr = m_state.lock(); ptr != nullptr && ptr->callback != nullptr) {
      ptr->callback(ec);
      // защищаемся от повторного вызова terminate()
      ptr->callback = nullptr;
    }
  }
  
private:
  // сохрани мы ссылку на promise_type - и перед нами встал бы вопрос:
  // как узнать, завершилась ли корутина. Данный подход избавляет нас
  // от этой диллемы
  std::weak_ptr<promise_type::state_t> m_state;
  
  explicit task(std::weak_ptr<promise_type::state_t> state)
    : m_state{std::move(state)} {}
};

Мы получили искомое поведение. Правда, наш terminate не завершает корутину немедленно, а лишь приводит к возникновению ошибки (exception). Как по мне, это более прямой и гибкий путь, но незначительно поменяв логику await_suspend, можно сделать и немедленное завершение.

Полный код получившегося приложения
#include <boost/asio/error.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/steady_timer.hpp>
#include <experimental/coroutine>
#include <iostream>

namespace std {
  using namespace experimental;
} // end of namespace std

thread_local boost::asio::any_io_executor current_executor;

class task {
  public:
  struct promise_type {
    struct state_t {
      bool locked = false;
      std::function<void(boost::system::error_code)> on_terminate;
      void lock() { locked = true; }
      void unlock() { locked = false; }
      using ptr = std::shared_ptr<state_t>;
    };
    state_t::ptr state = std::make_shared<state_t>();

    auto get_return_object() { return task{state}; }
    std::suspend_never initial_suspend() noexcept { return {}; }
    std::suspend_never final_suspend() noexcept { return {}; }
    void return_void() {}
    void unhandled_exception() { }
    template <typename Awaitable>
    auto await_transform(Awaitable &&awaitable) {
      struct [[nodiscard]] wrapper {
        std::shared_ptr<state_t> state;
        Awaitable awaitable;
        bool await_ready() { return awaitable.await_ready(); }
        auto await_suspend(std::coroutine_handle<> coro) {
          state->on_terminate = [this] (boost::system::error_code ec) {
            awaitable.on_terminate(ec);
          };
          return awaitable.await_suspend(coro);
        }
        auto await_resume() {
          return awaitable.await_resume();
        }
      };
      return wrapper{state, std::forward<Awaitable>(awaitable)};
    }
  };

  void terminate(boost::system::error_code ec = boost::asio::error::interrupted) {
    if (auto ptr = m_state.lock(); ptr != nullptr && ptr->on_terminate != nullptr && !ptr->locked) {
      ptr->on_terminate(ec);
      ptr->on_terminate = nullptr;
    }
  }

  using state_t = promise_type::state_t::ptr;
private:
  std::weak_ptr<promise_type::state_t> m_state;
  task(std::weak_ptr<promise_type::state_t> state) : m_state(std::move(state)) {}
};

template <typename Duration>
auto sleep(Duration &&duration) {
  struct [[nodiscard]] awaitable {
    boost::asio::steady_timer timer;
    boost::system::error_code error{};

    bool await_ready() { return false; }
    void await_suspend(std::coroutine_handle<> coro) {
      timer.template async_wait([this, coro] (auto ec) mutable {
        if (!error) { error = ec; }
        coro.resume();
      });
    }
    void on_terminate(boost::system::error_code ec) {
      error = ec;
      timer.cancel();
    }
    void await_resume() {
      if (error) { throw boost::system::system_error(error); }
    }
  };
  return awaitable{boost::asio::steady_timer{current_executor, std::forward<Duration>(duration)}};
}

task coroutine() {
  using namespace std::chrono_literals;
  std::cout << "before sleep" << std::endl;
  try {
    co_await sleep(5s);
    std::cout << "after sleep" << std::endl;
  } catch (...) { }
  std::cout << "final cleanup" << std::endl;
  co_await sleep(2s);
}

int main(int argc, char *argv[]) {
  boost::asio::io_context context;
  current_executor = context.get_executor();
  boost::asio::signal_set signals{context, SIGINT};
  auto job = coroutine();
  signals.async_wait([job = std::move(job)] (auto ec, auto code) mutable {
   job.terminate();
  });
  context.run();
  return 0;
}

для мгновенного завершения корутины код await_suspend может быть таким:

void await_suspend(std::coroutine_handle<> coro) {
  timer.template async_wait([this, coro] (auto ec) mutable {
    if (!error) {
      error = ec;
      coro.resume();
    } else {
      coro.destroy();
    }
  });
}

Еще await_transform удобен для сохранения контекста выполнения. Если при использовании потоков с этим замечательно справляются thread_local переменные, при асинхронной работе это вызывает определенные трудности.

Например, если вы хотите записывать в журнал request id обрабатываемого запроса, можно конечно таскать logger во все функции параметром. Но это очень утомительно. Всё же удобнее иметь его в глобальной переменной.

Не прерывайте прерывание

Но иногда хочется сказать: «Постойте! Вот сейчас меня прерывать не надо»

Например, если вы уже выполняете операцию очистки при завершении корутины. В таких случаях RAII спасает не всегда. Иногда хочется выполнить какие-нибудь асинхронные вызовы, а деструктор не может быть корутиной. Или же для операции требуется большое количество локальных переменных. Что можно сделать в этой ситуации?

  • запустить еще одну корутину в конце текущей или из деструктора и выполнить требуемые действия в ней;

  • можно выполнить какой-нибудь специальный co_await и обработать его в том же await_transform;

  • можно написать шаблон-декоратор, который скроет от await_transform метод on_terminate, и научить наш task работать с такими awaitable объектами.

А можно передать состояние корутины в нее параметром. И добавить в это состояние метод, запрещающий прерывание. Дело в том, что если promise_type имеет конструктор, принимающий такой же набор параметров, как и корутина, он будет вызван и в него будут переданы ссылки на копии параметров.

Параметры корутин, в отличие от параметров обычных функций, передаются не через стек. Их копии сохраняются в куче в момент вызова корутины.

Компилятор может принять решение хранить параметры и локальные переменные корутины в стеке, если её время жизни меньше времени жизни вызывающей функции.

Таким образом, мы получаем в конструкторе promise_type доступ к параметрам корутины. Причем мы можем не только читать их, но и менять!

struct promise_type {
  struct state_t {
    bool locked = false;
    std::function<void(boost::system::error_code)> callback;
    void lock() { locked = true; }
    void unlock { locked = false; }
    using ptr = std::shared_ptr<state_t>;
  };
  ptr state;

  // на случай, если наша корутина вообще не имеет параметров
  promise_type() : state{std::make_shared<state_t>()} { }

  // этот конструктор подойдет под любой набор параметров
  template <typename ...Args>
  explicit promise_type(Args & ...args) : promise_type{} {
    // используем fold expression: для каждого параметра будет создана
    // лямбда, затем все их результаты сложены при помощи логического "или"
    //
    // конструкция выглядит страшно, но фактически она скомпилируется
    // в одну операцию присваивания для первого подходящего по типу
    // параметра. Остальное будет безжалостно выкинуто в процессе
    // оптимизации. Кто не верит, вот ссылка на compiler explorer
    // https://godbolt.org/z/hsrPErqfn
    ([this] (auto &param) {
      if constexpr (std::is_same_v<Args, state_t::ptr>) {
        // мы ЗАПИСЫВАЕМ новое значение в параметр корутины
        // и делаем это фактически до её запуска
        param = state;
        return true;
      }
      return false;
    }(args) || ...);
  }
  // ...
};

Это небольшое улучшение позволяет нам написать следующий код:

task coroutine(task::state_t lock = {}) {
  using std::chrono_literals;
  try {
    std::cout << "we are doing our work here" << std::endl;
  	co_await sleep(10s);
    std::cout << "work is complete" << std::endl;
  } catch (...) { }
  std::cout << "final cleanup" << std::endl;
  // я назвал методы state - lock и unlock, это дает
  // мне возможность использовать std::lock_guard
  const std::lock_guard g(*lock);
  // следующий sleep уже не будет прерван
  co_await sleep(5s);
}

Еще одно ограничение корутин: секция catch не может содержать точек переключения co_await или co_yield.

З.Ы.

Механизмы, рассмотренные сегодня, позволяют реализовывать поистине фантастические вещи. А их сочетание может дать просто волшебные возможности и сорвать крышу. Только подумайте, оператор внутри функции может получить неявный доступ к её параметрам. Не уверен, что могу сходу придумать удачный пример использования подобной связки.

Но при аккуратном подходе, в первую очередь разработчиками библиотек и фреймворков, конструкторы promise_type и await_transform могут позволить изящно решить массу задач асинхронного программирования.

Комментарии (4)


  1. Door
    14.10.2021 19:03

    Мне кажется, использование std::shared_ptr здесь лишнее. Вы либо овните корутину в task-е (мануально вызывая .destroy() + suspend_always из final_suspend()) либо "запоминаете все текущие корутины которые активны и которые можно и нужно терминейтить". Это тоже можно сделать пробросив ссылку на что-то что запоминает и регистрируя/удаляя корутину когда случается initial_suspend/final_suspend.


    1. Doktor3lo Автор
      15.10.2021 02:56

      "Овнить" корутину в task - это вариант. Для моего примера он пожалуй даже лучше - проще. Но я использую shared_ptr, чтобы иметь возможность просто запустить корутину как обычную функцию и забыть про нее. Не скажу, что это идеальное решение, так как по сути происходит неявный запуск потока, и я подумываю от такого подхода отказаться в пользу spawn как в boost. Но пока я им пользуюсь.

      Второй предложенный вами вариант - откровенно сложнее. И что он дает? Экономит один malloc (и то не всегда, где-то же вы список корутин будете хранить)


      1. Door
        15.10.2021 14:21
        +1

        возможность просто запустить корутину как обычную функцию и забыть про нее

        Хм, как же её тогда терминейтить? Кому-то нужно иметь референс на таск, чтобы вызвать job.terminate(), как у вас в мейне, в примере?

        spawn как в boost

        это не меняет, наверное, модель того что таска овнит корутину. В случае spawn-а таску (соответственно и корутину) будут овнить внутренности экзекютора в бусте/азио/либо аналог кастомной реализации.

        Экономит один malloc (и то не всегда, где-то же вы список корутин будете хранить)

        Ну, не совсем. Можно иметь интрузивный список корутин, которые вас интересуют.

        откровенно сложнее

        но, на практике, имея как идею "останавливать все корутины при каком-то событии", вам нужен список всех таких корутин, чтобы вызвать job.terminate(), не так ли ?

        В целом, мне кажется, первый вариант с тем что таска овнит корутину - был бы проще, конкретно для вашего примера в статье. И убрал бы с интернета ещё один пример использования shared_ptr ^_^


        1. Doktor3lo Автор
          16.10.2021 02:19

          Хм, как же её тогда терминейтить?

          Никак. Но я ведь не утверждаю, что надо мочь прерывать абсолютно все. Как раз на моей практике это не так. Чаще приходится прерывать "бесконечные" корутины или же awaitable объекты, которые реализованы через корутину, как часть тех же "бесконечных" корутин. А большинства проще и правильнее просто дождаться.

          Данная реализация, так сказать, универсальная. Но я согласен, что эффективнее с точки зрения ресурсов и производительности использовать специфичные реализации под разные задачи.

          В целом вы правы, но пример я уже не буду исправлять. Иначе наша переписка потеряет смысл :)