Эта статья посвящена тому, как распределять задачи между конвейерами очередей, чтобы минимизировать общее время обработки, а также неожиданной связи между этим методом планирования и методом Томаса Джефферсона.

Предыстория и мотивация

Вы знаете, как на конвейерных линиях в производстве и в конвейерах инструкций в процессорах создаётся форма параллелизма, без нарушения детерминизма (порядок входных данных в конвейер сохраняется на выходе)?

Мы назовем это неявным параллелизмом через конвейеризацию. Неявным, потому что просто разделив задачу на этапы, мы автоматически получаем параллелизм. Когда этот метод применяется в программном обеспечении, он позволяет нам писать полностью последовательные программы (для каждого этапа), при этом эффективно используя параллельное оборудование.

Как это работает? Каждый этап работает независимо (на одном процессоре/ядре), и этапы соединены через очереди. Когда первый процессор/ядро завершает первый этап первого элемента, он передает его на второй этап и продолжает работать над первым этапом второго элемента. По мере того как очереди между этапами заполняются, мы достигаем параллелизма, сохраняя детерминизм (выходы поступают в том же порядке, что и входы).

Вот пример конвейера на фабрике по производству бутылок:

Каждый этап соединен конвейерными лентами (очередями) и работает параллельно с другими, например, после наполнения первой бутылки можно наполнять вторую, одновременно закрывая первую бутылку, и т. д.

Если какой‑то этап выполняется медленно, то входную очередь на этом этапе можно разделить или разбить на части, добавив еще один обработчик на этот этап. При этом каждый второй входной элемент будет отправляться новому обработчику, что эффективно почти удвоит пропускную способность.

В этой статье я хотел бы обсудить следующий вопрос: как лучше всего распределить процессоры/ядра между этапами? Например, если у нас есть только два процессора/ядра, но три этапа, то не имеет смысла выделять один из них на последний этап до тех пор, пока хотя бы часть элементов не будет обработана на втором этапе.

Сначала я хочу разработать библиотеку, чтобы протестировать эти концепции, но в долгосрочной перспективе я хотел бы создать язык программирования, который использует эти идеи, чтобы выяснить, сможем ли мы создать что‑то, что будет масштабироваться по мере увеличения числа доступных процессоров/ядер.

Вдохновение и предыдущие работы

Хотя примеры конвейеризации в производстве существовали еще до Генри Форда, похоже, именно тогда она получила широкое распространение. Википедия гласит:

The assembly line, driven by conveyor belts, reduced production time for a Model T to just 93 minutes by dividing the process into 45 steps. Producing cars quicker than paint of the day could dry, it had an immense influence on the world.

Для сравнения, до внедрения конвейера на производство автомобиля уходило около 12,5 часов.

Процессоры — это еще один пример использования конвейеров с целью ускорения обработки инструкций. Конвейер может выглядеть следующим образом: получение инструкции, получение операндов, выполнение инструкции и, наконец, запись результатов.

Учитывая колоссальный успех этой концепции как в производстве, так и в аппаратном обеспечении, можно было бы ожидать, что конвейеризация станет популярной и в программном обеспечении? Однако, по каким‑то причинам, этого пока не произошло, хотя у этой идеи есть сторонники.

Джим Грей говорил о параллелизме в программных конвейерах и разделении задач в своём интервью по случаю получения премии Тьюринга. Языки программирования на основе потоков данных, в частности программирование на основе потоков данных Пола Моррисона, используют эту идею. Паттерн LMAX Disruptor также основан на конвейерном параллелизме и поддерживает то, что Джим называет параллелизмом разделения. Одним из источников, на который ссылается Disruptor, является статья SEDA: An Architecture for Well‑Conditioned, Scalable Internet Services (2001), где также говорится о конвейерах и динамическом распределении потоков по этапам. Недавно, изучая работы Джима, я обнаружил, что движки баз данных тоже реализуют нечто похожее на конвейерный параллелизм. Пример движков баз данных, использующих эту технику, приведен в статье: Morsel‑Driven Parallelism.

Эти примеры программного конвейерного параллелизма вдохновили меня задуматься об этой концепции. Однако по‑настоящему я начал размышлять над тем, как спроектировать язык программирования для более простого применения конвейеризации в софте, только после того, как прочитал следующее высказывание Мартина Томпсона, одного из разработчиков LMAX Disruptor:

If there’s one thing I’d say to the Erlang folks, it’s you got the stuff right from a high-level, but you need to invest in your messaging infrastructure so it’s super fast, super efficient and obeys all the right properties to let this stuff work really well.

Услышав анекдот Джо Армстронга о том, что немодифицированная программа на Erlang работает на 64-ядерной машине только в 33 раза быстрее, а не в 64 раза, как ожидало руководство Ericsson, я начал думать о том, как можно разработать язык программирования, чтобы облегчить конвейеризацию в программах.

Я начал исследовать эту тему в двух предыдущих статьи, а также писал о эластичном масштабировании одного этапа вверх и вниз, но в этом посте мы рассмотрим более глобальный подход.

Общая картина

Система состоит из трех частей: конвейера, обработчиков и планировщика:

Планировщик следит за конвейером, оценивая длину входных очередей для каждого этапа и среднее время обслуживания на этом этапе. Используя эти данные, он рассчитывает, как распределить доступные обработчики.

Алгоритм распределения обработчиков работает следующим образом:

  1. Генерируются все возможные конфигурации распределения обработчиков по этапам;

  2. Каждая конфигурация оценивается с использованием следующей формулы:\sum_{s}\frac{l_{s} \cdot t_{s}}{w_{s} + 1}​​​, где s — это этап, l_{s}​ — длина входной очереди на этапе s, t_{s} — среднее время обработки на этапе s, а w_{s} — количество обработчиков, выделенных на этот этап.

  3. Выбирается конфигурация с наименьшим значением, то есть та, где общее время обработки минимально.

Обработчики, обычно по одному на доступный процессор/ядро, обрабатывают партию входных данных на этапе, который им указывает планировщик, и затем сообщают об этом планировщику, после чего процесс повторяется до завершения потока входных данных.

Если мы увеличим масштаб и посмотрим на конвейер более подробно, то увидим, что он состоит из источника, N этапов и конечного узла:

Источник может быть файлом, сетевым сокетом, списком элементов, предоставленным пользователем, и т. д., из которого создаются входные данные для очереди первого этапа. Входные данные могут быть байтами с префиксом длины, необработанными байтами, разделенными новыми строками байтами, и т. д. Аналогично, конечный узел также может быть файлом, стандартным выходом или сокетом. Между источником и конечным узлом происходит основная обработка на разных этапах.

Прототип реализации

Надеюсь, что из картинки выше становиться ясно, что большая часть кода будет представлять из себя «обвязку» (соединение компонентов с помощью очередей). Самая интересная часть: как планировщик определяет, какую задачу поручить обработчику, после того как он завершит работу над предыдущей.

Начнем с представления конфигурации обработчиков, распределенных по пайплайну. Каждая стадия имеет имя или идентификатор, поэтому конфигурацию можно представить как пару идентификатор‑количество обработчиков, назначенных на эту стадию.

newtype Config = Config (Map StageId NumOfWorkers)
  deriving Show
type NumOfWorkers = Int

Начальная конфигурация - всем стадиям назначено ноль обработчиков:

initConfig :: [StageId] -> Config
initConfig stageIds =
  Config (Map.fromList (zip stageIds (replicate (length stageIds) 0)))

Реализация распределения обработчиков начинается с генерации всех возможных вариантов конфигураций, отбрасывания тех, которые назначают обработчики на завершенные стадии («завершенная» — стадия на которую больше не поступят данные), оценки всех конфигураций и выбора той, у которой наименьший балл.

allocateWorkers :: Int -> Map StageId QueueStats -> Set StageId -> Maybe Config
allocateWorkers cpus qstats done = case result of
  []                -> Nothing
  (cfg, _score) : _ -> Just cfg
  where
    result = sortBy (comparing snd)
               [ (cfg, sum (Map.elems (scores qstats cfg)))
               | cfg <- possibleConfigs cpus (Map.keys qstats)
               , not (allocatesDoneStages cfg done)
               ]

Все возможные конфигурации генерируются следующим образом:

possibleConfigs :: Int -> [StageId] -> [Config]
possibleConfigs cpus stages = map (Config . Map.fromList . zip stages) $ filter ((== cpus) . sum)
  [ foldl' (\ih i -> update i succ ih) (replicate (length stages) 0) slot
  | choice <- combinations [0.. (cpus + length stages - 1)] cpus
  , let slot = [ c - i | (i, c) <- zip [0.. ] choice ]
  ]
  where
    combinations :: [a] -> Int -> [[a]]
    combinations xs n = filter ((== n) . length) (subsequences xs)
    -- update i f xs = xs[i] := f (xs[i])
    update :: Int -> (a -> a) -> [a] -> [a]
    update i f = go [] i
      where
        go acc _ []       = reverse acc
        go acc 0 (x : xs) = reverse acc ++ f x : xs
        go acc n (x : xs) = go (x : acc) (n - 1) xs

Тогда как оценка производится следующим:

scores :: Map StageId QueueStats -> Config -> Map StageId Double
scores qss (Config cfg) = joinMapsWith score qss cfg
  where
    score :: QueueStats -> Int -> Double
    score qs workers =
      (fromIntegral (queueLength qs) * fromIntegral avgServiceTimePicos)
      /
      (fromIntegral workers + 1)
      where
        avgServiceTimePicos :: Word64
        avgServiceTimePicos
          | len == 0  = 1 -- XXX: What's the right value here?
          | otherwise = sum (serviceTimesPicos qs) `div` len
          where
            len :: Word64
            len = genericLength (serviceTimesPicos qs)

Небольшая вспомогательная функция для объединения хэш-таблиц:

joinMapsWith :: Ord k => (a -> b -> c) -> Map k a -> Map k b -> Map k c
joinMapsWith f m1 m2 = assert (Map.keys m1 == Map.keys m2) $
  Map.fromList
    [ (k, f x (m2 Map.! k))
    | (k, x) <- Map.toList m1
    ]

Последнее, что нам нужно сделать, это определить конфигурации, которые назначают обработчики на завершенные стадии:

allocatesDoneStages :: Config -> Set StageId -> Bool
allocatesDoneStages (Config cfg) done =
  any (\(stageId, numWorkers) -> stageId `Set.member` done && numWorkers > 0)
      (Map.toList cfg)

Запуск прототипа

Давайте завершим с парой примеров в REPL. Предположим, у нас есть два обработчика и два этапа (A и B). Этап A имеет три элемента в своей входной очереди (и это будут все входные данные, которые он получит), и ни один этап еще не завершен (это последний аргумент S.empty):

>>> allocateWorkers 2 
                    (M.fromList [ ("A", QueueStats 3 [])
                                , ("B", QueueStats 0 [])]) 
                    S.empty

(Конструктор QueueStats принимает длину входной очереди в качестве первого аргумента и список времени обработки в качестве второго аргумента.)

Если мы запустим приведённый выше код, то получим:

Just (Config (fromList [("A",2),("B",0)]))

Это означает, что оба обработчика должны быть выделены для этапа A. Предположим, что мы сделали это распределение, и после одной временной единицы оба обработчика завершили работу. Это значит, что в входной очереди теперь остался один элемент, в то время как на втором этапе (B) теперь два элемента в его входной очереди. Поскольку оба обработчика завершили работу, мы снова запускаем функцию распределения:

>>> allocateWorkers 2 
                    (M.fromList [ ("A", QueueStats 1 [1,1])
                                , ("B", QueueStats 2 [])]) 
                    S.empty
Just (Config (fromList [("A",1),("B",1)]))

Это значит, что мы должны выделить по одному обработчику для каждого этапа. Если мы снова представим, что мы сделали это и они оба завершили работу после одной временной единицы, мы окажемся в ситуации, когда все три элемента были обработаны на первом этапе (A), и мы можем отметить его как завершенный, в то время как на втором этапе будет два элемента в его входной очереди:

>>> allocateWorkers 2 
                    (M.fromList [ ("A", QueueStats 0 [1,1,1])
                                , ("B", QueueStats 2 [1])]) 
                    (S.fromList ["A"])
Just (Config (fromList [("A",0),("B",2)]))

В этом случае распределение обработчиков будет означать, что оба обработчика будут выделены для второго этапа. После того как обработчики завершат работу над этими элементами, второй этап также обработает все элементы, и мы завершили процесс:

>>> allocateWorkers 2 
                    (M.fromList [ ("A", QueueStats 0 [1,1,1])
                                , ("B", QueueStats 0 [1,1,1])]) 
                    (S.fromList ["A", "B"])
Nothing

Неожиданная связь с Томасом Джефферсоном

Когда я разработал идею планирования, описанную выше, я обсудил ее с моим другом Даниэлем Густафссоном, который сразу же ответил: «Это немного напоминает метод Джефферсона» (по распределению мест в парламентах).

Вот как он работает:

После того как все голоса подсчитаны, для каждой партии вычисляются последовательные коэффициенты. Партия с наибольшим коэффициентом выигрывает одно место, и её коэффициент пересчитывается. Это повторяется до тех пор, пока не будет заполнено необходимое количество мест. Формула для вычисления коэффициента выглядит так:

quot = \frac{V}{s + 1}

где:

  • V — это общее количество голосов, которые получила партия, и

  • s — количество мест, которые партия уже получила, изначально 0 для всех партий».

Аналогия следующая:

  • партии: этапы в конвейере

  • места на партию: обработчики, выделенные на этап

  • голоса: «оценка» (длина входной очереди, умноженная на среднее время обработки)

  • раунды: общее количество обработчиков

Попробуем повторить пример, который мы рассматривали ранее, где этап A и этап B имели длину очереди 3 и 2 соответственно, но используя метод Джефферсона:

  1. В первом раунде партия/этап A получает 1 голос, в то время как партия/этап B получает 2 голоса, так что коэффициенты составляют \frac{1}{0 + 1} и \frac{2}{0 + 1} соответственно, что означает, что этап B выигрывает раунд и получает одно место.

  2. Во втором раунде мы получаем коэффициенты: \frac{1}{0 + 1} = 1 и \frac{2}{1 + 1} = 1 (обратите внимание, что здесь s = 1, поскольку этап/партия B уже выиграла место в предыдущем раунде). Это означает, что мы получаем ничью, и в этом случае, я полагаю, мы можем произвольно выбрать первую партию, чтобы наш пример совпадал с реализацией*.

Даниэль также объяснил, что, хотя Джефферсон предложил этот метод, он фактически не используется в США, но большинство стран Европы, включая парламент ЕС, применяют этот метод.

Заключение и будущая работа

Мы рассмотрели стратегию, как можно эластично масштабировать количество процессоров/ядер, выделенных для одного этапа в конвейере. Возможность сделать это будет полезна в следующих случаях:

  • Когда нагрузка на систему изменяется, и один этап внезапно становится медленнее другого; благодаря эластичности мы можем перераспределить ядра и поддерживать пропускную способность.

  • Когда нагрузка уменьшается, мы можем уменьшить масштаб и использовать ядра в других частях системы.

Мы также увидели, как метод распределения мест в парламенте, предложенный Томасом Джефферсоном, можно использовать для решения той же задачи. Эта неожиданная связь заставляет меня задуматься, где еще может всплыть этот алгоритм.

Мы все еще далеки от реализации рантайма параллельного языка программирования с использованием этих идей. В частности, текущая реализация использует простые конкурентные очереди для соединения этапов, что означает, что увеличение масштаба этапа не сохраняет детерминированность вывода. Это можно решить с помощью Disruptors, как я описывал в своей старой статье. Я собрал множество других задач, которые нужно выполнить, в отдельном файле. Если вас интересует что‑то из этого, не стесняйтесь связаться со мной.

Комментарии (0)