Есть потрясающая библиотека Rust под названием loom, которую можно использовать для тщательного тестирования неблокируемых (lock-free) структур данных. Я давно хотел разобраться, как она работает. И сейчас хочу! Но недавно я случайно реализовал небольшой эксперимент, который, как мне кажется, содержит часть идей loom, поэтому о нём стоит написать. Моя цель — не научить вас тому, что нужно использовать на практике (если вы хотите этого, то почитайте документацию loom), а, скорее, вывести пару идей из фундаментальных принципов.

▍ Раз, два, три, два


Как обычно, для экспериментов нам понадобится максимально простой пример. Пример, который мы будем использовать, взят из одной замечательной статьи.

Узрите простой (и поломанный) конкурентный счётчик:

use std::sync::atomic::{
  AtomicU32,
  Ordering::SeqCst,
};
#[derive(Default)]
pub struct Counter {
  value: AtomicU32,
}
impl Counter {
  pub fn increment(&self) {
    let value = self.value.load(SeqCst);
    self.value.store(value + 1, SeqCst);
  }
  pub fn get(&self) -> u32 {
    self.value.load(SeqCst)
  }
}

Баг здесь сразу заметен — инкремент не является атомарным. Но какой тест лучше всего написать, чтобы выявить его?

▍ Тривиальный тест


Первым делом в голову приходит просто атаковать один и тот же счётчик из нескольких потоков, а затем проверить в конце результат:

#[test]
fn threaded_test() {
  let counter = Counter::default();
  let thread_count = 100;
  let increment_count = 100;
  std::thread::scope(|scope| {
    for _ in 0..thread_count {
      scope.spawn(|| {
        for _ in 0..increment_count {
          counter.increment()
        }
      });
    }
  });
  assert_eq!(counter.get(), thread_count * increment_count);
}

Тест успешно проваливается:

thread 'counter::trivial' panicked:
assertion `left == right` failed
  left: 9598
 right: 10000

Но я бы не назвал этот тест удовлетворительным — он очень сильно зависит от тайминга, поэтому его невозможно воспроизводить детерминированно и отлаживать. Кроме того, его нельзя минимизировать — если снизить количество потоков и инкременты, то прохождение теста будет зависеть от удачи!

▍ PBT


Разумеется, нам хотелось бы применить здесь настоящее тестирование! Задача почти подходит для этого: у нас есть входные данные, которые легко генерировать (последовательность инкрементов, распределённая по множеству потоков), удобное для проверки свойство (результат конкурентных инкрементов должен быть идентичен последовательному исполнению) и стремление минимизировать тест.

Но как нам подключить потоки в тестирование на основе свойств (property-based test, PBT)?

PBT отлично подходят для тестирования конечных автоматов. Можно пропустить конечный автомат через последовательность шагов; на каждом шаге PBT выбирает произвольное следующее действие, применяемое к состоянию:

#[test]
fn state_machine_test() {
  arbtest::arbtest(|rng| {
    // Это наш конечный автомат!
    let mut state: i32 = 0;
    // Мы будем выполнять его случайное количество шагов (до ста).
    let step_count: usize = rng.int_in_range(0..=100)?;
    for _ in 0..step_count {
      // На каждом этапе мы бросаем монетку и
      // выполняем или инкремент, или декремент.
      match *rng.choose(&["inc", "dec"])? {
        "inc" => state += 1,
        "dec" => state -= 1,
        _ => unreachable!(),
      }
    }
    Ok(())
  });
}

Кажется, можно применить здесь ту же методику. На каждой итерации мы будем выбирать случайный поток, чтобы он делал один шаг. Если можно выполнять шаги потоков вручную, должно быть просто маневрировать одним потоком между загрузкой и сохранением другого потока.

Но мы не можем пошагово выполнять потоки! Или можем?

▍ Простая оснастка


Давайте пока притворимся, что можем это делать! Взглянем на метод инкремента с багом:

pub fn increment(&self) {
  let value = self.value.load(SeqCst);
  self.value.store(value + 1, SeqCst);
}

В идеале нам бы хотелось как-то «приостанавливать» поток между атомными операциями. Примерно так:

pub fn increment(&self) {
  pause();
  let value = self.value.load(SeqCst);
  pause();
  self.value.store(value + 1, SeqCst);
  pause();
}
fn pause() {
    // ¯\_(ツ)_/¯
}

Начнём с реализации собственной обёртки вокруг AtomicU32, содержащей вызовы «паузы».

use std::sync::atomic::Ordering;
struct AtomicU32 {
  inner: std::sync::atomic::AtomicU32,
}
impl AtomicU32 {
  pub fn load(&self, ordering: Ordering) -> u32 {
    pause();
    let result = self.inner.load(ordering);
    pause();
    result
  }
  pub fn store(&self, value: u32, ordering: Ordering) {
    pause();
    self.inner.store(value, ordering);
    pause();
  }
}
fn pause() {
  // всё ещё не знаю, как её реализовать :(
}

▍ API управляемых потоков


Правило создания отличной структуры API заключается в том, чтобы начать с одного пользователя API, чтобы понять, как API должен выглядеть, и только потом переходить к настоящей реализации.

Поэтому давайте продолжим притворяться: просто напишем PBT, используя эти управляемые потоки с возможностью паузы, несмотря на то, что мы пока всё ещё не представляем, как же реализовать эту паузу.

Начнём с создания счётчика и двух управляемых потоков. Вероятно, мы захотим передавать каждому из потоков ссылку на счётчик:

let counter = Counter::default();
let t1 = managed_thread::spawn(&counter);
let t2 = managed_thread::spawn(&counter);

Теперь нам нужно пошагово выполнять потоки:

while !rng.is_empty() {
  let coin_flip: bool = rng.arbitrary()?;
  if t1.is_paused() {
    if coin_flip {
      t1.unpause();
    }
  } else if t2.is_paused() {
    if coin_flip {
      t2.unpause();
    }
  }
}

Можно отрефакторить это в чуть более семантически сжатый вид:

let counter = Counter::default();
let t1 = managed_thread::spawn(&counter);
let t2 = managed_thread::spawn(&counter);
let threads = [t1, t2];
while !rng.is_empty() {
  for t in &mut threads {
    if t.is_paused() && rng.arbitrary()? {
      t.unpause()
    }
  }
}

То есть на каждом шаге нашего конечного автомата мы будем в цикле проходить по всем потокам и снимать с паузы их случайное подмножество.

Но кроме включения и отключения паузы нам нужно, чтобы потоки что-то делали, чтобы выполнялся инкремент счётчика. Например, можно скопировать идею API std::spawn и передавать замыкание:

let t1 = managed_thread::spawn({
  let counter = &counter;
  move || {
    for _ in 0..100 {
      counter.increment();
    }
  }
});

Но так как эти потоки управляемы и нам нужно контролировать их из тестов, давайте дадим контролирующему потоку возможность менять код, исполняемый в управляемом потоке. То есть мы будем запускать управляемые потоки без функции main и предоставим API для исполнения произвольных замыканий в контексте этого инертного по умолчанию потока:

let counter = Counter::default();
// Мы передаём state, &counter, in, но во всём остальном поток инертный.
let t = managed_thread::spawn(&counter);
// Однако мы можем вручную управлять им:
t.submit(|thread_state: &Counter| thread_state.increment());
t.submit(|thread_state: &Counter| thread_state.increment());

Соединив всё вместе, получим красивый тест свойств:

#[cfg(test)]
use managed_thread::AtomicU32;
#[cfg(not(test))]
use std::sync::atomic::AtomicU32;
#[derive(Default)]
pub struct Counter {
  value: AtomicU32,
}
impl Counter {
  // ...
}
#[test]
fn test_counter() {
  arbtest::arbtest(|rng| {
    // Наша "тестируемая конкурентная система".
    let counter = Counter::default();
    // Последовательная модель, с которой мы будем сравнивать результаты.
    let counter_model: u32 = 0;
    // Два управляемых потока, которые мы будем пошагово выполнять
    // вручную.
    let t1 = managed_thread::spawn(&counter);
    let t2 = managed_thread::spawn(&counter);
    let threads = [t1, t2];
    // Основная часть теста: бросаем в цикле монетку и двигаем вперёд
    // один из потоков.
    while !rng.is_empty() {
      for t in &mut [t1, t2] {
        if rng.arbitrary() {
          if t.is_paused() {
            t.unpause()
          } else {
            // Стандартное свойство "эквивалентности модели": применяем
            // изоморфные действия к системе и к её модели.
            t.submit(|c| c.increment());
            counter_model += 1;
          }
        }
      }
    }
    for t in threads {
      t.join();
    }
    assert_eq!(counter_model, counter.get());
    Ok(())
  });
}

Эх, вот если бы смогли заставить работать этот API… А пока наша реализация pause — это пожимающий плечами эмодзи!

Возможно, вы уже сердитесь на меня за то, что я риторически притворяюсь, что не знаю ответа. Не стоит сердиться: когда я впервые писал этот код, то следовал именно таким путём — осознал, что мне нужна возможность приостановки AtomicU32, поэтому реализовал это (с имитацией вызовов паузы), а затем поэкспериментировал с API, который хотел получить, придя приблизительно к этой же точке, пока ещё не зная, как заставить его работать и возможно ли это вообще.

Хотя если откровенно, я кое-что знал заранее. Не думаю, что можно избежать здесь создания реальных потоков, если только не придумать что-то крайне сложное со встроенным ассемблерным кодом. Когда что-то вызывает эту функцию pause() и мы хотим, чтобы это что-то оставалось на паузе до дальнейшего уведомления, то это обязано происходить в потоке, хранящем стек отдельно от стека нашего теста. А если мы собираемся порождать потоки, то с тем же успехом можем порождать и потоки области видимости (scoped thread), чтобы можно было свободно заимствовать локальные для стека данные. А чтобы породить поток области видимости, нужен параметр Scope. То есть в реальности нам потребуется здесь ещё один уровень отступов:

    std::thread::scope(|scope| {
      let t1 = managed_thread::spawn(scope, &counter);
      let t2 = managed_thread::spawn(scope, &counter);
      let threads = [t1, t2];
      while !rng.is_empty() {
        for t in &mut [t1, t2] {
          // ...
        }
      }
    });

▍ Реализация управляемых потоков


А теперь самое интересное: как нам реализовать включение и отключение паузы? Для начала необходима какая-то коммуникация между основным потоком (t.unpause()) и управляемым потоком (pause()). А поскольку мы не хотим менять Counter API под какой-то контекст только для теста, необходимо контекст передать «контрабандой». То есть нужно использовать thread_local!. И этот контекст будет общим для двух потоков, поэтому его нужно обернуть в Arc.

struct SharedContext {
  // ¯\_(ツ)_/¯
}
thread_local! {
  static INSTANCE: RefCell<Option<Arc<SharedContext>>> =
    RefCell::new(None);
}
impl SharedContext {
  fn set(ctx: Arc<SharedContext>) {
    INSTANCE.with(|it| *it.borrow_mut() = Some(ctx));
  }
  fn get() -> Option<Arc<SharedContext>> {
    INSTANCE.with(|it| it.borrow().clone())
  }
}

Как это обычно бывает при использовании thread_local! или lazy_static!, удобно сразу же обернуть их в функции доступа с более удобной типизацией. А учитывая, что мы всё равно используем здесь Arc, можно избежать использования with thread_local, клонировав Arc.

То есть теперь мы наконец можем реализовать глобальную функцию pause (или, по крайней мере, отложить решение проблемы на подольше):

fn pause() {
  if let Some(ctx) = SharedContext::get() {
    ctx.pause()
  }
}
impl SharedContext {
  fn pause(&self) {
    // :(
  }
}

Ладно, а что делать дальше? Нам как-то нужно скоординировать контролирующий поток и управляемый поток. И ещё нам нужен какой-нибудь механизм уведомлений, чтобы управляемый поток знал, когда он может продолжить работу. Самым грубым решением здесь будет пара из мьютекса, защищающего какое-то состояние, и условной переменной. Мьютекс охраняет состояние, которым можно манипулировать из любого потока. Условную переменную можно использовать, чтобы сигнализировать об изменениях.

struct SharedContext {
  state: Mutex<State>,
  cv: Condvar,
}
struct State {
  // ?
}

Так, похоже, у меня кончаются эмодзи. Больше не осталось слоёв косвенности или инфраструктуры, так что нам нужно написать реальный код, который и реализует работу с паузой. Допустим, state отслеживает состояние управляемого потока, который может или работать, или быть на паузе:

#[derive(PartialEq, Eq, Default)]
enum State {
  #[default]
  Running,
  Paused,
}

А дальше идёт логика функции паузы — меняем состояние с Running на Paused, уведомляем контролирующий поток, что мы находимся в состоянии Paused, и ждём, пока контролирующий поток не вернёт состояние обратно в Running:

impl SharedContext {
  fn pause(&self) {
    let mut guard = self.state.lock().unwrap();
    assert_eq!(*guard, State::Running);
    *guard = State::Paused;
    self.cv.notify_all();
    while *guard == State::Paused {
      guard = self.cv.wait(guard).unwrap();
    }
    assert_eq!(*guard, State::Running);
  }
}

Примечание: API языка Rust для условных переменных прекрасен. Condvar — это сложно, я не полностью понимал их, пока не изучил сигнатуры функций Rust. Обратите внимание, что функция wait получает в качестве аргумента guard мьютекса и возвращает тоже guard мьютекса. Это защищает нас от гонок логики и подталкивает к стандартному паттерну применения condvar:

Сначала мы блокируем мьютекс вокруг общего состояния. Затем проверяем, нужное ли это состояние. Если это так, то отлично, мы делаем необходимое нам и разблокируем мьютекс. Если нет, то тогда, всё ещё блокируя мьютекс, мы ждём условную переменную. Это значит, что мьютекс разблокируется и другие потоки получают шанс изменить общее состояние. Когда они меняют его и уведомляют condvar, наш поток просыпается и получает обратно заблокированный мьютекс (но состояние теперь отличается). Из-за возможности ложных пробуждений нам нужно тщательно проверять состояние и быть готовыми вернуться обратно к ожиданию.

Естественно, существует вспомогательная функция, инкапсулирующая весь паттерн:

impl SharedContext {
  fn pause(&self) {
    let mut guard = self.state.lock().unwrap();
    assert_eq!(*guard, State::Running);
    *guard = State::Paused;
    self.cv.notify_all();
    guard = self
      .cv
      .wait_while(guard, |state| *state == State::Paused)
      .unwrap();
    assert_eq!(*guard, State::Running)
  }
}

Отлично, это уже выглядит как разумная реализация pause. Давайте перейдём к managed_thread::spawn:

fn spawn<'scope, T: 'scope + Send>(
  scope: &Scope<'scope, '_>,
  state: T,
) {
  // ? ? ?? ??? ?????
}

Здесь должно произойти довольно многое:

  • Как уже выяснилось, мы будем порождать поток (области видимости), так что нам нужен параметр scope с его тремя сроками жизни. Я не знаю, как это работает, поэтому просто сделаю всё по документации!
  • Мы будем возвращать некий дескриптор, который можно будет использовать для включения и отключения паузы управляемого потока. И этот дескриптор будет параметризирован по сроку жизни того же 'scope, потому что будет привязан к реальному дескриптору join.
  • Мы будем передавать нашему новому потоку универсальное состояние, и это состояние должно быть Send и ограничено тем же сроком жизни, что и поток области видимости.
  • Внутри мы будем порождать поток, и нам нужно подготовить поток INSTANCE, локальный для этого потока.
  • И будет предусмотрительно поместить ссылку на этот SharedContext в возвращаемый нами дескриптор.

То есть сделать нам предстоит многое. Давайте приступим:

struct ManagedHandle<'scope> {
  inner: std::thread::ScopedJoinHandle<'scope, ()>,
  ctx: Arc<SharedContext>,
}
fn spawn<'scope, T: 'scope + Send>(
  scope: &'scope Scope<'scope, '_>,
  state: T,
) -> ManagedHandle<'scope> {
  let ctx: Arc<SharedContext> = Default::default();
  let inner = scope.spawn({
    let ctx = Arc::clone(&ctx);
    move || {
      SharedContext::set(ctx);
      drop(state); // TODO: ¿
    }
  });
  ManagedHandle { inner, ctx }
}

Порождаемая нами, по сути, no-op-функция выглядит подозрительно. Позже мы это исправим! Давайте для начала реализуем is_paused и unpause! Они должны быть относительно простыми. Для is_paused нам достаточно заблокировать мьютекс и проверить состояние:

impl ManagedHandle<'_> {
  pub fn is_paused(&self,) -> bool {
    let guard = self.ctx.state.lock().unwrap();
    *guard == State::Paused
  }
}

Для unpause нам нужно дополнительно поменять состояние обратно на Running и уведомить другой поток:

impl ManagedHandle<'_> {
  pub fn unpause(&self) {
    let mut guard = self.ctx.state.lock().unwrap();
    assert_eq!(*guard, State::Paused);
    *guard = State::Running;
    self.ctx.cv.notify_all();
  }
}

Но я думаю, это не совсем корректно. Догадываетесь, почему?

При такой реализации после unpause контролирующий и управляемый потоки будут работать конкурентно. А это приводит к недетерминированности — той самой проблеме, которой мы и пытались избежать! В частности, если вызвать is_paused сразу после того, как выполнен unpause потока, вам с большой вероятностью вернётся false, потому что другой поток всё ещё будет работать. Но он может и попасть на следующий вызов pause, так что в зависимости от тайминга можно получить true.

Мы же хотим добиться полного устранения неуправляемой конкурентности. То есть в любой момент времени должен выполняться только один поток (контролирующий или управляемый). Так что правильной семантикой unpause будет разблокирование управляемого потока с последующей блокировкой контролирующего потока, пока для управляемого не наступит следующая пауза!

impl ManagedHandle<'_> {
  pub fn unpause(&self) {
    let mut guard = self.ctx.state.lock().unwrap();
    assert_eq!(*guard, State::Paused);
    *guard = State::Running;
    self.ctx.cv.notify_all();
    guard = self
      .ctx
      .cv
      .wait_while(guard, |state| *state == State::Running)
      .unwrap();
  }
}

Итак, теперь мы можем порождать управляемый поток, приостанавливать и продолжать его исполнение. Но пока он ничего не делает. Далее нам нужно реализовать прямую отправку контролирующим потоком произвольного замыкания управляемому, чтобы заставить его что-то делать:

impl<'scope> ManagedHandle<'scope> {
  pub fn submit<F: FnSomething>(&self, f: F)
}

Давайте разберёмся с ограничениями этой FnSomething. Мы будем передавать эту f управляемому потоку и выполнять её там один раз, поэтому FnOnce. Она пересекает границу потоков, так что должна быть + Send. А поскольку мы используем потоки области видимости, она не должна быть 'static, достаточно просто 'scope. Более того, в этом управляемом потоке f будет иметь исключительный доступ к состоянию потока T. Итак, получается:

impl<'scope> ManagedHandle<'scope> {
  pub fn submit<F: FnOnce(&mut T) + Send + 'scope>(self, f: F)
}

Реализовать это будет немного сложно. Для начала нам потребуется какой-то канал для перемещения функции. Затем, аналогично логике unpause, нам нужна синхронизация, чтобы контролирующий поток не продолжался, пока управляемый поток не начнёт выполнять f и не дойдёт до паузы (или, может быть, не завершит f). И ещё нам понадобится новое состояние Ready, потому что теперь есть две отдельные причины возможной блокировки управляемого потока — он может ждать события unpause или может ждать исполнения следующей f. Вот новый код:

#[derive(Default)]
enum State {
  #[default]
  Ready,
  Running,
  Paused,
}
struct ManagedHandle<'scope, T> {
  inner: std::thread::ScopedJoinHandle<'scope, ()>,
  ctx: Arc<SharedContext>,
  sender: mpsc::Sender<Box<dyn FnOnce(&mut T) + 'scope + Send>>,
}
pub fn spawn<'scope, T: 'scope + Send>(
  scope: &'scope Scope<'scope, '_>,
  mut state: T,
) -> ManagedHandle<'scope, T> {
  let ctx: Arc<SharedContext> = Default::default();
  let (sender, receiver) =
    mpsc::channel::<Box<dyn FnOnce(&mut T) + 'scope + Send>>();
  let inner = scope.spawn({
    let ctx = Arc::clone(&ctx);
    move || {
      SharedContext::set(Arc::clone(&ctx));
      for f in receiver {
        f(&mut state);
        let mut guard = ctx.state.lock().unwrap();
        assert_eq!(*guard, State::Running);
        *guard = State::Ready;
        ctx.cv.notify_all()
      }
    }
  });
  ManagedHandle { inner, ctx, sender }
}
impl<'scope, T> ManagedHandle<'scope, T> {
  pub fn submit<F: FnOnce(&mut T) + Send + 'scope>(&self, f: F) {
    let mut guard = self.ctx.state.lock().unwrap();
    assert_eq!(*guard, State::Ready);
    *guard = State::Running;
    self.sender.send(Box::new(f)).unwrap();
    guard = self
      .ctx
      .cv
      .wait_while(guard, |state| *state == State::Running)
      .unwrap();
  }
}

Изменения в коде

Последний оставшийся кусок головоломки — это функция join. Она почти стандартная! Сначала мы закрываем нашу сторону канала. Это служит естественным стоп-сигналом для другого потока, так что он выполняет выход. Что в свою очередь позволяет нам присоединиться к нему. Небольшая проблема здесь в том, что поток может быть поставлен на паузу, когда мы пытаемся к нему присоединиться, так что нам предварительно нужно снять его с паузы:

impl<'scope, T> ManagedHandle<'scope, T> {
  pub fn join(self) {
    while self.is_paused() {
      self.unpause();
    }
    drop(self.sender);
    self.inner.join().unwrap();
  }
}

А теперь соединяем всё вместе!

Вспомогательная библиотека managed_thread.rs:

use std::{
  cell::RefCell,
  sync::{atomic::Ordering, mpsc, Arc, Condvar, Mutex},
  thread::Scope,
};
#[derive(Default)]
pub struct AtomicU32 {
  inner: std::sync::atomic::AtomicU32,
}
impl AtomicU32 {
  pub fn load(&self, ordering: Ordering) -> u32 {
    pause();
    let result = self.inner.load(ordering);
    pause();
    result
  }
  pub fn store(&self, value: u32, ordering: Ordering) {
    pause();
    self.inner.store(value, ordering);
    pause();
  }
}
fn pause() {
  if let Some(ctx) = SharedContext::get() {
    ctx.pause()
  }
}
#[derive(Default)]
struct SharedContext {
  state: Mutex<State>,
  cv: Condvar,
}
#[derive(Default, PartialEq, Eq, Debug)]
enum State {
  #[default]
  Ready,
  Running,
  Paused,
}
thread_local! {
  static INSTANCE: RefCell<Option<Arc<SharedContext>>> =
    RefCell::new(None);
}
impl SharedContext {
  fn set(ctx: Arc<SharedContext>) {
    INSTANCE.with(|it| *it.borrow_mut() = Some(ctx));
  }
  fn get() -> Option<Arc<SharedContext>> {
    INSTANCE.with(|it| it.borrow().clone())
  }
  fn pause(&self) {
    let mut guard = self.state.lock().unwrap();
    assert_eq!(*guard, State::Running);
    *guard = State::Paused;
    self.cv.notify_all();
    guard = self
      .cv
      .wait_while(guard, |state| *state == State::Paused)
      .unwrap();
    assert_eq!(*guard, State::Running)
  }
}
pub struct ManagedHandle<'scope, T> {
  inner: std::thread::ScopedJoinHandle<'scope, ()>,
  sender: mpsc::Sender<Box<dyn FnOnce(&mut T) + 'scope + Send>>,
  ctx: Arc<SharedContext>,
}
pub fn spawn<'scope, T: 'scope + Send>(
  scope: &'scope Scope<'scope, '_>,
  mut state: T,
) -> ManagedHandle<'scope, T> {
  let ctx: Arc<SharedContext> = Default::default();
  let (sender, receiver) =
    mpsc::channel::<Box<dyn FnOnce(&mut T) + 'scope + Send>>();
  let inner = scope.spawn({
    let ctx = Arc::clone(&ctx);
    move || {
      SharedContext::set(Arc::clone(&ctx));
      for f in receiver {
        f(&mut state);
        let mut guard = ctx.state.lock().unwrap();
        assert_eq!(*guard, State::Running);
        *guard = State::Ready;
        ctx.cv.notify_all()
      }
    }
  });
  ManagedHandle { inner, ctx, sender }
}
impl<'scope, T> ManagedHandle<'scope, T> {
  pub fn is_paused(&self) -> bool {
    let guard = self.ctx.state.lock().unwrap();
    *guard == State::Paused
  }
  pub fn unpause(&self) {
    let mut guard = self.ctx.state.lock().unwrap();
    assert_eq!(*guard, State::Paused);
    *guard = State::Running;
    self.ctx.cv.notify_all();
    guard = self
      .ctx
      .cv
      .wait_while(guard, |state| *state == State::Running)
      .unwrap();
  }
  pub fn submit<F: FnOnce(&mut T) + Send + 'scope>(&self, f: F) {
    let mut guard = self.ctx.state.lock().unwrap();
    assert_eq!(*guard, State::Ready);
    *guard = State::Running;
    self.sender.send(Box::new(f)).unwrap();
    guard = self
      .ctx
      .cv
      .wait_while(guard, |state| *state == State::Running)
      .unwrap();
  }
  pub fn join(self) {
    while self.is_paused() {
      self.unpause();
    }
    drop(self.sender);
    self.inner.join().unwrap();
  }
}

Тестируемая система, не совсем атомный счётчик:

use std::sync::atomic::Ordering::SeqCst;
#[cfg(test)]
use managed_thread::AtomicU32;
#[cfg(not(test))]
use std::sync::atomic::AtomicU32;
#[derive(Default)]
pub struct Counter {
  value: AtomicU32,
}
impl Counter {
  pub fn increment(&self) {
    let value = self.value.load(SeqCst);
    self.value.store(value + 1, SeqCst);
  }
  pub fn get(&self) -> u32 {
    self.value.load(SeqCst)
  }
}

И сам тест:

#[test]
fn test_counter() {
  arbtest::arbtest(|rng| {
    eprintln!("begin trace");
    let counter = Counter::default();
    let mut counter_model: u32 = 0;
    std::thread::scope(|scope| {
      let t1 = managed_thread::spawn(scope, &counter);
      let t2 = managed_thread::spawn(scope, &counter);
      let mut threads = [t1, t2];
      while !rng.is_empty() {
        for (tid, t) in threads.iter_mut().enumerate() {
          if rng.arbitrary()? {
            if t.is_paused() {
              eprintln!("{tid}: unpause");
              t.unpause()
            } else {
              eprintln!("{tid}: increment");
              t.submit(|c| c.increment());
              counter_model += 1;
            }
          }
        }
      }
      for t in threads {
        t.join();
      }
      assert_eq!(counter_model, counter.get());
      Ok(())
    })
  });
}

Выполнив его, мы выявим сбой:

---- test_counter stdout ----
begin trace
0: increment
1: increment
0: unpause
1: unpause
1: unpause
0: unpause
0: unpause
1: unpause
0: unpause
0: increment
1: unpause
0: unpause
1: increment
0: unpause
0: unpause
1: unpause
0: unpause
thread 'test_counter' panicked at src/lib.rs:56:7:
assertion `left == right` failed
  left: 4
 right: 3
arbtest failed!
    Seed: 0x4fd7ddff00000020

Хотя… этого же результата мы добились примерно в самом начале статьи при помощи обычных потоков! Но этот сбой гораздо полезнее. Во-первых, он воспроизводимый. Если я укажу тот же seed, то получу то же самое чередование (interleaving):

#[test]
fn test_counter() {
  arbtest::arbtest(|rng| {
    eprintln!("begin trace");
    ...
  })
    .seed(0x71aafcd900000020);
}

И этот тест полностью не зависит от машины! Если вы сами введёте этот seed, то получите точно такое же чередование. Так что если бы у меня возникли проблемы с его отладкой, я мог вам отправить это шестнадцатеричное значение в Zulip, и вы могли бы мне помочь!

Но и это ещё не всё: нам не нужно отлаживать этот сбой, можно его минимизировать!

#[test]
fn test_counter() {
  arbtest::arbtest(|rng| {
    eprintln!("begin trace");
    ...
  })
    .seed(0x71aafcd900000020)
    .minimize();
}

Таким образом я получил следующую трассировку минимизации:

begin trace
0: increment
1: increment
0: unpause
1: unpause
1: unpause
0: unpause
0: unpause
1: unpause
0: unpause
0: increment
1: unpause
0: unpause
1: increment
0: unpause
0: unpause
1: unpause
0: unpause
seed 0x4fd7ddff00000020, seed size 32, search time 106.00ns
begin trace
0: increment
1: increment
0: unpause
0: unpause
1: unpause
0: unpause
1: unpause
0: unpause
1: unpause
1: unpause
1: increment
seed 0x540c0c1c00000010, seed size 16, search time 282.16µs
begin trace
0: increment
1: increment
0: unpause
1: unpause
1: unpause
1: unpause
seed 0x084ca71200000008, seed size 8, search time 805.74µs
begin trace
0: increment
1: increment
0: unpause
1: unpause
seed 0x5699b19400000004, seed size 4, search time 1.44ms
begin trace
0: increment
1: increment
0: unpause
1: unpause
seed 0x4bb0ea5c00000002, seed size 2, search time 4.03ms
begin trace
0: increment
1: increment
0: unpause
1: unpause
seed 0x9c2a13a600000001, seed size 1, search time 4.31ms
minimized
seed 0x9c2a13a600000001, seed size 1, search time 100.03ms

То есть в результате у нас получился вот такой крошечный минимальный пример:

#[test]
fn test_counter() {
  arbtest::arbtest(|rng| {
    eprintln!("begin trace");
    ...
  })
    .seed(0x9c2a13a600000001);
}

begin trace
0: increment
1: increment
0: unpause
1: unpause

И именно так нужно правильно тестировать конкурентные структуры данных.

▍ Постскриптум


Разумеется, это всего лишь искусственный пример, но его можно расширить. Например, пока наш AtomicU32 просто выполняет делегирование реальному типу. Но вместо этого можно для каждого атомного типа хранить множество записываемых значений и при чтении возвращать произвольное записанное значение, согласованное со слабой моделью памяти.

Ещё можно умнее подойти к исследованию чередований. Вместо того, чтобы перемежать потоки случайным образом, как делаем мы, можно попробовать применить методики проверки модели и доказать, что мы рассмотрели все важные чередования.

Или можно применить методику из Generate All The Things и перечислить все чередования скажем, до пяти инкрементов. Действительно, почему бы и не сделать так?

$ cargo add exhaustigen


#[test]
fn exhaustytest() {
  let mut g = exhaustigen::Gen::new();
  let mut interleavings_count = 0;
  while !g.done() {
    interleavings_count += 1;
    let counter = Counter::default();
    let mut counter_model: u32 = 0;
    let increment_count = g.gen(5) as u32;
    std::thread::scope(|scope| {
      let t1 = managed_thread::spawn(scope, &counter);
      let t2 = managed_thread::spawn(scope, &counter);
      'outer: while t1.is_paused()
        || t2.is_paused()
        || counter_model < increment_count
      {
        for t in [&t1, &t2] {
          if g.flip() {
            if t.is_paused() {
              t.unpause();
              continue 'outer;
            }
            if counter_model < increment_count {
              t.submit(|c| c.increment());
              counter_model += 1;
              continue 'outer;
            }
          }
        }
        return for t in [t1, t2] {
          t.join()
        };
      }
      assert_eq!(counter_model, counter.get());
    });
  }
  eprintln!("interleavings_count = {:?}", interleavings_count);
}

Структура теста более-менее осталась такой же, только теперь нам нужно убедиться в отсутствии «фальшивых» итераций и проверять, что мы всегда или снимаем поток с паузы, или передаём инкремент.

Естественно, мы снова обнаружим тот же баг:

thread 'exhaustytest' panicked at src/lib.rs:103:7:
assertion `left == right` failed
  left: 2
 right: 1

Но здорово то, что если мы решим проблему при помощи атомного инкремента…

impl AtomicU32 {
  pub fn fetch_add(
    &self,
    value: u32,
    ordering: Ordering,
  ) -> u32 {
    pause();
    let result = self.inner.fetch_add(value, ordering);
    pause();
    result
  }
}
impl Counter {
  pub fn increment(&self) {
    self.value.fetch_add(1, SeqCst);
  }
}

… то сможем убрать из теста достаточно специфичные утверждения о корректности того, что корректна любая последовательность из максимум пяти инкрементов:

$ t cargo t -r -- exhaustytest --nocapture
running 1 test
all 81133 interleavings are fine!
test exhaustytest ... ok
real 8.65s
cpu  8.16s (2.22s user + 5.94s sys)
rss  63.91mb

И последняя мелочь: вспомним, что наш PBT минимизировал первую найденную им последовательность…

begin trace
0: increment
1: increment
0: unpause
1: unpause
1: unpause
0: unpause
0: unpause
1: unpause
0: unpause
0: increment
1: unpause
0: unpause
1: increment
0: unpause
0: unpause
1: unpause
0: unpause
thread 'test_counter' panicked at src/lib.rs:56:7:
assertion `left == right` failed
  left: 4
 right: 3
arbtest failed!
    Seed: 0x4fd7ddff00000020

… до

begin trace
0: increment
1: increment
0: unpause
1: unpause
thread 'test_counter' panicked at src/lib.rs:57:7:
assertion `left == right` failed
  left: 2
 right: 1
arbtest failed!
    Seed: 0x9c2a13a600000001

Но мы не применяли shrinking! Как это возможно? Ну, строго говоря, это уже выходит за рамки темы нашего поста. Я уже писал об этом в другом блоге.

Но я всё же объясню ещё раз! Хитрость заключается в упрощённой методике hypothesis. Использованная в этом посте библиотека PBT arbtest основана на знакомом интерфейсе PRNG:

arbtest::arbtest(|rng| {
  let random_int: usize = rng.int_in_range(0..=100)?;
  let random_bool: bool = rng.arbitrary()?;
  Ok(())
});

Но здесь есть одна тонкость! Это конечный PRNG. Поэтому если попросить его бросить монетку, он вернёт орла, а в следующий раз вы можете получить решку. Но если продолжать запрашивать новые значения, то в какой-то момент он выдаст Err(OutOfEntropy). Именно поэтому используются все эти ? и внешний цикл while !rng.is_empty() {.

Иными словами, как только у теста начинает заканчиваться энтропия, он выполняется по упрощённой схеме и завершается. А это означает, что при снижении величины доступной энтропии тест становится короче, и это работает вне зависимости от уровня сложности логики внутри теста!

Страшное слово «энтропия» здесь на самом деле означает, что PRNG внутри устроен просто как &mut &[u8]. То есть это срез случайных байтов, который укорачивается каждый раз, когда вы запрашиваете случайное число. И чем короче исходный срез, тем проще становится тест. Минимизация может быть и такой простой!

Исходный код для этой статьи можно найти здесь.

Telegram-канал со скидками, розыгрышами призов и новостями IT ?

Комментарии (3)


  1. mentin
    10.07.2024 20:33

    Это не честно, читаешь, мысленно готовишь комментарии, и читая дальше видишь что на все уже есть ответ в следующей итерации :). Классная статья!


  1. Panzerschrek
    10.07.2024 20:33

    Мне кажется, описанный в статье пример реализации, где создаются реальные потоки, которые потом жёстко синхронизируются между собой, является чересчур сложным. Гораздо эффективнее можно было бы эмулировать несколько потоков, используя один реальный поток ОС - переключая стек и указатель потока команд в местах с pause. Такой подход, правда, на чистом Rust не реализовать - нужны будут ассемблерные вставки. Да и подобные (псевдо)потоки придётся отдельным образом создавать.


    1. KanuTaH
      10.07.2024 20:33
      +2

      Ну, начнём с того, что предлагаемым вами образом невозможно протестировать код на наличие проблем, связанных с отсутствием или неверным порядком синхронизирующих операций load/store на архитектурах с weak memory model, т.к. проблемы такого рода будут проявляться только на реальных потоках, одновременно выполняющихся на нескольких физических ядрах. Впрочем, лишний мутекс у автора тоже вносит некоторое искажение в этом плане.