Асинхронный код – сложный. Все это знают. Писать асинхронные тесты – еще сложнее. Недавно я починил мигающий (flaky) тест и хотел бы поделиться некоторыми мыслями по поводу написания асинхронных тестов.

В этой статье мы разберем общую для асинхронных тестов проблему – как выстроить потоки в определенном порядке, заставляя отдельные операции отдельных потоков выполняться раньше других операций других потоков. В обычных условиях мы не стремимся выстраивать принудительный порядок исполнения разных потоков, поскольку это противоречит самой причине использования потоков, которая заключается в том, чтобы обеспечить параллельность и позволить процессору выбирать наилучший порядок исполнения, исходя из имеющихся в наличии ресурсов и состояния приложения. Но определённый порядок может потребоваться во время тестирования для обеспечения стабильности тестов.



Тестируем throttler


Throttler (Ограничитель) – это класс, отвечающий за ограничение количества одновременных операций, выполняемых с некоторым ресурсом (например, пул соединения, сетевой буфер или ресурсоемкие операции процессора). В отличие от других инструментов синхронизации, роль ограничителя заключается в том, чтобы запросы, превышающие квоту, завершались ошибкой немедленно, без ожидания. Быстрое завершение важно, поскольку альтернатива, ожидание, потребляет ресурсы – порты, потоки и память.

Вот простая реализация ограничителя (в своей основе это обертка вокруг класса Semaphore; в реальном мире тут может встречаться ожидание, повторные попытки и т.д.):
class ThrottledException extends RuntimeException("Throttled!")
class Throttler(count: Int) {
 private val semaphore = new Semaphore(count)
 def apply(f: => Unit): Unit = {
   if (!semaphore.tryAcquire()) throw new ThrottledException
   try {
     f
   } finally {
     semaphore.release()
   }
 }
}

Начнем с простого юнит-теста: тестируем ограничитель для одного потока (для тестов мы используем библиотека specs2). В этом тесте мы проверяем, что можем выполнить больше последовательных вызовов, чем установленное для ограничителя максимальное количество одновременных вызовов (переменная maxCount ниже). Заметьте, что поскольку мы используем единственный поток, мы не проверяем способность ограничителя к «быстрому завершению», так как мы не нагружаем ограничитель. По сути, мы проверяем лишь тот факт, что пока ограничитель не нагружен, он не прерывает выполнение операций.
class ThrottlerTest extends Specification {
  "Throttler" should {
    "execute sequential" in new ctx {
      var invocationCount = 0
      for (i <- 0 to maxCount) {
        throttler {
          invocationCount += 1
        }
      }
      invocationCount must be_==(maxCount + 1)
    }
  }
  trait ctx {
    val maxCount = 3
    val throttler = new Throttler(maxCount)
  }
}


Тестируем ограничитель асинхронно


В предыдущем тесте мы не нагружали ограничитель просто потому, что это невозможно сделать с единственным потоком. Поэтому наш следующий шаг заключается в проверке работоспособности ограничителя в многопоточном окружении.

Подготовка:
val e = Executors.newCachedThreadPool()
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(e)
private val waitForeverLatch = new CountDownLatch(1)
 
override def after: Any = {
 waitForeverLatch.countDown()
 e.shutdownNow()
}
 
def waitForever(): Unit = try {
 waitForeverLatch.await()
} catch {
 case _: InterruptedException =>
 case ex: Throwable => throw ex
}

Объект ExecutionContext используется для конструкции Future; метод waitForever держит поток до момента, пока не обнулится счетчик waitForeverLatch – перед окончанием теста. В следующей за этим функции мы закрываем ExecutorService.

Упрощенный способ проверки многопоточного поведения ограничителя выглядит так:
"throw exception once reached the limit [naive,flaky]" in new ctx {
  for (i <- 1 to maxCount) {
    Future {
      throttler(waitForever())
    }
  }
  throttler {} must throwA[ThrottledException]
}

Здесь мы создаем потоки в количестве равном maxCount. В каждом потоке мы вызываем функцию waitForever, которая ждет до окончания теста. Затем мы пытаемся выполнить еще одну операцию, чтобы обойти ограничитель – maxCount +1. Предполагается, что в этом месте мы должны получить исключение ThrottledException. Однако, хотя мы ждем исключения, оно не наступает. Последний вызов ограничителя (с ожиданием) может произойти до запуска любого из future (это приводит к тому, что исключение бросается в этом экземпляре future, но не в рамках ожидания).

Проблема с вышеприведенным тестом в том, что мы не знаем о запуске всех потоков и ждем в функции waitForever до того, как попробуем обойти ограничитель, ожидая, что ограничитель бросит исключение. Чтобы исправить это, нам нужно каким-то образом подождать, пока все потоки future будут запущены. Вот подход, знакомый многим из нас: просто добавить вызов метода sleep с некоей разумной длительностью.
"throw exception once reached the limit [naive, bad]" in new ctx {
  for (i <- 1 to maxCount) {
    Future {
      throttler(waitForever())
    }
  }
  Thread.sleep(1000)
  throttler {} must throwA[ThrottledException]
}

Хорошо, теперь наш тест будет почти всегда проходить. Но это неправильный подход, как минимум по двум причинам:
Длительность теста будет в точности равна установленной нами «разумной длительности».
В очень редких ситуациях, например, при высокой загруженности машины, этой разумной длительности может не хватить.

Если вы все еще сомневаетесь, поищите в Google иные причины.
Более правильный подход заключается в синхронизации старта наших потоков (future) и нашего ожидания.

Будем использовать класс CountDownLatch из пакета java.util.concurrent:
"throw exception once reached the limit [working]" in new ctx {
  val barrier = new CountDownLatch(maxCount)
 
  for (i <- 1 to maxCount) {
    Future {
      throttler {
        barrier.countDown()
        waitForever()
      }
    }
  }
 
  barrier.await(5, TimeUnit.SECONDS) must beTrue
 
  throttler {} must throwA[ThrottledException]
}

Мы используем CountDownLatch для барьерной синхронизации. Метод await блокирует главный поток до обнуления счетчика latch. При запуске других потоков (будем обозначать эти другие потоки как futures), каждый из этих futures вызывает барьерный метод countDown, чтобы снизить значение счетчика latch на единицу. Когда счетчик latch становится равным нулю, все futures располагаются внутри метода waitForever.
К этому моменту мы убедились, что ограничитель нагружен и содержит количество потоков, равное maxCount. Попытка другого потока задействовать ограничитель приведет к исключению. Таким образом, мы получили детерминированный порядок исполнения, при котором мы можем проверить поведение ограничителя в главном потоке. Главный поток может и будет продолждать выполнение в этой точке (счетчик barrier достигает нуля и CountDownLatch освобождает ожидающий поток).
Мы используем несколько завышенный таймаут во избежание бесконечной блокировки, если произойдет что-то непредвиденное. Если что-то произойдет – тест упадет. Этот таймаут не повлияет на длительность теста, поскольку, если ничего непредвиденного не произойдет, нам не надо его ждать.

Напоследок


При тестировании асинхронного кода достаточно часто есть потребность в определенном порядке потоков для определенного теста. Если не применять никакой синхронизации, получим нестабильные тесты, которые иногда отрабатывают, а иногда падают. Использование Thread.sleep снижает нестабильность тестов, но не решает проблемы. В большинстве случаев, когда нам необходимо определять порядок потоков в тесте, мы можем использовать CountDownLatch вместо Thread.sleep. Преимущество CountDownLatch в том, что мы можем указать, когда сбросить ожидание (удержание) потока, что дает нам два важных преимущества: детерминированное определение порядка и, благодаря этому, более стабильные тесты и более быстрое прохождение тестов. Даже для обычного ожидания, например, функции waitForever, мы могли бы использовать что-нибудь вроде Thread.sleep(Long.MAX_VALUE), но ненадежных подходов лучше всегда избегать.

Разработчик конструктора сайтов Wix,
Дмитрий Команов
Оригинал статьи: блог инженеров компании Wix
Поделиться с друзьями
-->

Комментарии (4)


  1. rumoku
    21.06.2016 15:16

    «запросы, превышающие квоту, завершались ошибкой немедленно» — это всего лишь один из подходов тротлинга. Абсолютно допустимой альтернативой является блокировка и ожидания освобождения ресурсов. Такой сценарий между прочим является более интересным.


  1. sshikov
    21.06.2016 19:31
    +2

    При тестировании асинхронного кода достаточно часто есть потребность в определенном порядке потоков для определенного теста. Если не применять никакой синхронизации, получим нестабильные тесты, которые иногда отрабатывают, а иногда падают.


    Мне кажется, или вы перевернули все с ног на голову? Если тесты падают при случайном порядке выполнения потоков — эначит вы тестируете что-то, что зависит от этого порядка. Поскольку в рабочей программе порядок не определен — она заведомо может работать не так. И что толку в таких тестах?

    Представьте, что вы тестируете просто цикл, некий итерационный численный алгоритм, при этом ваш тест зависит от числа повторений. Мне кажется, это какой-то неправильный тест, нет?

    Нужно поискать инвариант, который не зависит от порядка выполнения, и проверить, что он правилен.


  1. Apathetic
    21.06.2016 23:34

    Опять костыль на КДПВ.


  1. Relaximus
    22.06.2016 12:20

    Вы не могли бы где-то пометить, что код представлен на Scala? Если кто-либо не знаком с синтаксисом, это будет удобно, чтобы найти нужную документацию.