При выполнении параллельной программы, активно задействующей CPU, нам часто необходимо, чтобы пул потоков или процессов имел размер, сопоставимый с количеством ядер CPU на машине. Если потоков меньше, то вы не будете использовать все преимущества ядер, если больше, то программа начнёт работать медленнее, так как несколько потоков будет конкурировать за одно ядро. Ну, или такова ситуация в теории.

Как же проверить, сколько ядер есть у компьютера? И действительно ли это хороший совет?

Оказывается, на удивление сложно определить, сколько потоков выполнять:

  • В стандартной библиотеке Python есть множество API для получения этой информации, но ни одного из них недостаточно.

  • Хуже того, из-за таких функций CPU, как параллельность на уровне команд и одновременной многопоточности (Hyper-threading в CPU Intel), количество ядер, которое можно эффективно использовать, зависит от того кода, который напишете вы!

Давайте разберёмся, почему так сложно определить, сколько ядер CPU может использовать программа, а затем подумаем над потенциальным решением.

Получение количества ядер CPU из Python

Если вы читали документацию стандартной библиотеки Python, то знаете, что в ней есть стандартная функция os.cpu_count(), возвращающая «количество логических CPU». Что значит «логических»? Мы вернёмся к этому чуть позже.

В документации также говорится, что «len(os.sched_getaffinity(0)) получает количество логических CPU, которым ограничен вызывающий поток текущего процесса». Ограничение процесса конкретными ядрами обеспечивается привязкой планировщика (scheduler affinity).

К сожалению, этого API тоже недостаточно. Например, в Linux есть  API cgroups, используемый для реализации Docker и других контейнерных систем; он имеет множество способов ограничения нагрузки на CPU. В примере ниже мы ограничиваем CPU эквивалентом 2,25 ядер; механизм отличается, но результат будет схожим:

$ docker run -i -t --cpus=2.25 python:3.12-slim
Python 3.12.1 (main, Dec  9 2023, 00:21:37) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> os.cpu_count()
20
>>> len(os.sched_getaffinity(0))
20

Одновременно мы можем использовать эквивалент 2,25 ядер, но ни один API не знает об этом.

Что такое логический CPU?

Опции операционной системы — лишь начало наших проблем, но прежде чем рассматривать пример, мы должны понять, что такое физические и логические ядра CPU. В моём компьютере установлен процессор Intel i7-12700K, обладающий:

  • 12 физическими ядрами (8 высокопроизводительных ядер и 4 менее мощных).

  • 20 логическими ядрами.

Современные ядра CPU способны параллельно исполнять несколько команд. Но что произойдёт, если CPU «зависнет», ожидая загрузки каких-то данных из ОЗУ? Он не сможет выполнять никакую работу, пока этого не произойдёт.

Чтобы позволить использовать эти потенциально впустую растрачиваемые ресурсы, вычислительные ресурсы физического ядра CPU могут предоставляться операционной системе как несколько ядер. В моём CPU каждое из восьми быстрых ядер можно представить как два ядра, что даёт суммарно 16 логических ядер. Пары логических ядер будут совместно использовать вычислительные ресурсы одного физического ядра. Например, если логическое ядро не полностью использует внутренние АЛУ, допустим, потому что оно ожидает загрузки из памяти, то код, выполняемый парным логическим ядром, всё равно сможет использовать эти простаивающие ресурсы.

Эта технология называется одновременной многопоточностью (Hyper-threading в терминологии Intel). Если у вас PC, то эту функцию часто можно отключить в BIOS.

Это объяснение очень неточное, и настоящая реализация может быть разной на различных моделях CPU, даже от одного производителя. Но главное здесь то, что логические ядра не совпадают с физическими ядрами, и этого объяснения достаточно для наших целей.

Итак, у нас возникает новый вопрос. Если забыть о привязке планировщика и тому подобном, нужно ли нам использовать количество физических или логических ядер в качестве размера пула потоков?

Неожиданный пример параллельности

Рассмотрим две функции, компилируемые в машинный код при помощи Numba. Чтобы обеспечить параллельность, мы отключим GIL.

Обе функции делают одно и то же, но одна гораздо быстрее другой. Мы можем запустить эти функции параллельно в нескольких потоках и теоретически получать линейный рост производительности, пока не закончатся ядра, просто благодаря параллельной обработке большего количества изображений.

from numba import njit
import numpy as np

@njit(nogil=True)
def slow_threshold(img, noise_threshold):
    noise_threshold = img.dtype.type(noise_threshold)
    result = np.empty(img.shape, dtype=np.uint8)
    for i in range(result.shape[0]):
        for j in range(result.shape[1]):
            result[i, j] = img[i, j] // 256
    for i in range(result.shape[0]):
        for j in range(result.shape[1]):
            if result[i, j] < noise_threshold // 256:
                result[i, j] = 0
    return result

@njit(nogil=True)
def fast_threshold(img, noise_threshold):
    noise_threshold = np.uint8(noise_threshold // 256)
    result = np.empty(img.shape, dtype=np.uint8)
    for i in range(result.shape[0]):
        for j in range(result.shape[1]):
            value = img[i, j] >> 8
            value = (
                0 if value < noise_threshold else value
            )
            result[i, j] = value
    return result

Мы запустим функцию для изображения и замерим, как долго она выполняется:

rng = np.random.default_rng(12345)

def make_image(size=256):
    noise = rng.integers(0, high=1000, size=(size, size), dtype=np.uint16)
    signal = rng.integers(0, high=5000, size=(size, size), dtype=np.uint16)
    # A noisy, hard to predict image:
    return noise | signal

NOISY_IMAGE = make_image()
assert np.array_equal(
    slow_threshold(NOISY_IMAGE, 1000),
    fast_threshold(NOISY_IMAGE, 1000)
)

Вот сколько времени требуется для выполнения каждой из функций на одном ядре:

%timeit slow_threshold(NOISY_IMAGE, 1000)
90.6 µs ± 77.7 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

и

%timeit fast_threshold(NOISY_IMAGE, 1000)
24.6 µs ± 10.8 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

Вам интересно, почему быстрая функция намного быстрее? Возможно, стоит прочитать книгу об оптимизации низкоуровневого кода, над которой я работаю.

Масштабируемся до нескольких ядер

Теперь, когда у нас есть пара функций, можно обработать список изображений при помощи пула потоков; при этом каждый поток будет обрабатывать по 10 изображений за раз:

from multiprocessing.dummy import Pool as ThreadPool

def apply_in_thread_pool(
    num_threads, function, images
):
    with ThreadPool(num_threads) as pool:
        result = pool.map(
            lambda img: function(img, 1000),
            images,
            chunksize=10
        )
        assert len(result) == len(images)

Далее мы создадим графики времени выполнения при разных количествах потоков различных функций при помощи библиотеки benchit (можно также использовать perfplot, но учтите, что она имеет лицензию GPL):

import benchit
benchit.setparams(rep=1)

# Через каждый поток будет обработано 4000 изображений:
IMAGES = [make_image() for _ in range(4000)]

def slow_threshold_in_pool(num_threads):
    apply_in_thread_pool(num_threads, slow_threshold, IMAGES)

def fast_threshold_in_pool(num_threads):
    apply_in_thread_pool(num_threads, fast_threshold, IMAGES)

# Измеряем показатели двух функций с использованием от 1 до 24 потоков:
timings = benchit.timings(
    [slow_threshold_in_pool, fast_threshold_in_pool],
    range(1, 25),
    input_name="Number of threads"
)
timings.plot(logy=True, logx=False)
Graph showing slow vs fast runtime. Both variants have a run time that declines as the number of threads increases, up to a certain minimum runtime. Past that minimum, adding more threads actually slows things down. Slow and fast have different optimal number of threads, though!
График времени исполнения медленной и быстрой функций. У обоих вариантов время исполнения снижается с увеличением количества потоков до определённого минимального времени. Ниже этого минимума добавление потоков замедляет работу. Однако у медленной и быстрой функций оптимальное количество потоков различается!

Обратите внимание, что время исполнения снижается с увеличением количества потоков... до определённой точки. После неё время исполнения снова начинает повышаться. Этого мы и ожидали. Но есть и нечто неожиданное: оптимальное количество потоков для двух функций различается.

timings.to_dataframe().idxmin(axis="rows")

Функции

Оптимальное количество потоков

slow_threshold

19

fast_threshold

9

Оптимальный уровень параллельности также зависит от вашего кода

Медленная функция, по сути, способна использовать преимущества всех логических ядер. Возможно, один поток не использует полностью всю доступную вычислительную мощь конкретного физического ядра, так что логические ядра обеспечивают бОльшую параллельность.

И наоборот, более быстрая функция может использовать преимущества не более чем девяти ядер; при увеличении их количества начинается замедление. Возможно, она упирается в какое-то другое узкое место, не в вычислительные ресурсы, например, в пропускную способность памяти.

Не существует размера пула потоков, оптимального для обеих функций.

Другой подход: эмпирические измерения

При определении оптимального количества потоков мы столкнулись с несколькими проблемами:

  1. Сложно получить точное количество ядер с учётом всевозможных способов, которыми операционная система способна ограничивать используемые ресурсы CPU.

  2. Оптимальный уровень параллельности, то есть количество потоков зависит от рабочей нагрузки.

  3. Количество ядер — не единственное узкое место.

  4. Бонусная проблема: если вы исполняете код в облаке, то используете «vCPU», что бы это ни значило. Например, в разных инстансах используются разные модели CPU.

Есть и другой подход: определять оптимальное количество потоков эмпирически, во время исполнения. В показанном выше примере мы измеряли оптимальное количество потоков для конкретного фрагмента кода. Если у вас длительная задача по обработке данных, долго исполняющую один и тот же код в нескольких потоках, то можно сделать так же. То есть потратить немного времени в начале процесса, чтобы эмпирически замерить оптимальное количество потоков, возможно, с какими-то эвристиками для компенсации шума.

Если в среде исполнения вы используете эмпирические измерения, то вам не нужно беспокоиться о том, почему оптимально конкретное количество потоков. Вне зависимости от оборудования, конфигурации операционной системы или облачной среды вы будете использовать оптимальный уровень параллельности.

Комментарии (22)


  1. Casus
    29.12.2023 11:15
    +5

    Если очень нужен питон и по другому ни как, то конечно можно.. но вообще дичь, просто неиспользуйте питон там где не надо.

    А то как в известной мысли про молоток и гвозди.


    1. Andrey_Solomatin
      29.12.2023 11:15

      Вы точно читали статью?

      Все вычисления далаются не на Питоне. В общем Питон вообще можно убрать из заголовка, это будет справедливо для всех языков.


      1. Casus
        29.12.2023 11:15
        +5

        С чего бы для всех?

        С go пропорции рутин и тредов вообще мало кого волнуют, а тех кого волнует не редко пишут на c++/rust.

        Проблем с определением количества ресурсов нет, с cgroups тоже, пример: go, jvm, c# и куча прочих.

        А если вы про сишную либу под капотом.. ну давайте поговрим о трединге/асинке в php? Там тоже ведь сишные либы подкапотом...


        1. Andrey_Solomatin
          29.12.2023 11:15

          С go пропорции рутин и тредов вообще мало кого волнуют


          В Golang по тем же критериям можно выбирать значение для GOMAXPROCS.

          Размер ThreadPool в джаве и С++ тоже нужно как-то устанавливать. Думаете там всё по другому? И они умеют найти нужное количество процессоров изнутри докера?

          А если вы про сишную либу под капотом

          Если эти либы использовать эффективно, то есть один раз положить туда дынные и один раз достать то скорость будет близка к скорости самой либы. Вне зависимости от языка который это всё оркестрирует. Так всё машинное обучение на Питоне работает.

          В любой задаче, где вам нужно будет перемалывать числа, вам придётся подбирать количество процессоров под ваш кейс. Чем более эффективен язык, тем с большей погрешностью это можно делать.


  1. theurus
    29.12.2023 11:15
    -1

    Захотелось мне однажды найти все счастливые ip адреса. Написал на питоне программу которая перебирала 4млрд адресов и считала суммы чисел внутри адреса.

    Запустил на одном ядре, понял что не дождусь.

    Запустил на всех ядрах, понял что не дождусь.

    Попросил чатгпт переписать на си. Сишная версия отработала на одном ядре за 5 секунд.


    1. bolk
      29.12.2023 11:15

      А что такое «счастливые ip-адреса»? ) сумма первых шести и последних шести совпадает?


      1. Zara6502
        29.12.2023 11:15

        это так же как астрология.

        на авито вон продают купюры со "счастливыми" номерами типа 57034723, видимо они счастливые для того кто их продаст.


      1. theurus
        29.12.2023 11:15

        Да что то типа того. Увидел у одного хостера предложение получить счастливый адрес и захотелось узнать а сколько их вообще.


        1. bolk
          29.12.2023 11:15

          Количество счастливых билетов можно получить без перебора. Тут, думаю, можно применить те же методы.


    1. MountainGoat
      29.12.2023 11:15
      +3

      Вы меня сподвигли на эксперименты.

      Наивнейшая реализация
      octet = list(range(1,255))
      
      counter = 0
      
      for i1 in octet:
          print (f"{i1}", end='\r')
          for i2 in octet:
              for i3 in octet:
                  for i4 in octet:
                      if i1+i2 == i3+i4:
                          counter += 1
      
      print (counter)

      Заняла 4 минуты 13 секунд. Попробовал использовать модуль array - время совсем не изменилось. Удивился, думал хоть немного спасёт.

      Переписываем, чуть-чуть включив мозги.

      Вариант с мозгами
      from collections import defaultdict 
      from timeit import default_timer as timer
      
      start_t = timer()
      
      octet = list(range(1,255))
      
      variantos = defaultdict(int)
      for i1 in octet:
          print (f"{i1}", end='\r')
          for i2 in octet:
              summ = i1+ i2
              variantos[summ] += 1
      
      counter = 0
      for k, v in variantos.items():
          counter += v*v
              
      print (counter)
      end_t = timer()
      print(end_t - start_t)

      Причём первый цикл наверное не нужен - я просто не уверен, что список возможных сумм непрерывный, а проверять влом.

      Получилось 0.012 секунд. Ой.

      Результат, кстати, 10924794


      1. squaremirrow
        29.12.2023 11:15

        Как насчет такой тупой реализации, обернутой в нумбу?

        Тупой вариант с нумбой
        import numba as nb
        import numpy as np
        
        from timeit import default_timer as timer
        
        start_t = timer()
        
        @nb.njit
        def fast_counter():
            octet = np.arange(1, 255, dtype=np.uint8)
            counter = 0
            for i1 in octet:
                #print (f"{i1}", end='\r')
                for i2 in octet:
                    for i3 in octet:
                        for i4 in octet:
                            if i1+i2 == i3+i4:
                                counter += 1
            return counter
        
        fast_counter()
        end_t = timer()
        print(end_t - start_t)


        1. MountainGoat
          29.12.2023 11:15
          +1

          0.7611 секунд. И ещё ждать, пока нумба установится...

          0.6482 секунд, если заменить dtype=np.uint8 на dtype=np.int32

          0.5804 секунд, если не использовать numpy и вернуть list как было.


      1. 9982th
        29.12.2023 11:15

        Вы считаете сумму октетов, а не сумму цифр в октетах, это заметно быстрее.


        1. MountainGoat
          29.12.2023 11:15

          del


        1. 9982th
          29.12.2023 11:15

          Не так уж и заметно, оказывается. Если рассчитать сумму цифр возможных значений октета заранее, то выходит 4:28 на моей машине.

          Hidden text
          sum_of_digits = [sum(int(x) for x in str(number)) for number in range(256)]
          counter = 0
          octet = tuple(range(0, 256))
          for i1 in range(1, 256):
              print (f"{i1}", end='\n')
              for i2 in octet:
                  for i3 in octet:
                      for i4 in octet:
                          if sum_of_digits[i1] + sum_of_digits[i2] == sum_of_digits[i3] + sum_of_digits[i4]:
                              counter += 1
          print(counter)

          Если переписать на C, то получается (на более слабой машине)

          real    0m9.139s
          user    0m9.133s
          sys     0m0.005s


      1. Sly_tom_cat
        29.12.2023 11:15

        Ну если еще чуть подумать, то variations можно еще быстрее собрать (О(n) вместо О(n^2)).


        1. Sly_tom_cat
          29.12.2023 11:15

          Хотя если еще подумать, то и собирать variations не зачем там вообще один цикл на самом деле нужен:

          l = 256
          t = 0
          for s in range(1, l):
              t += s*s * 2
          t += l*l
          print(t)

          Да, и ответ у вас неверный. Вы почему-то считаете, что октет может иметь значения от 1 до 254 (включительно) хотя он от 0 до 255. И хотя некоторые счастливые IP v4 и зарезервированы как служебные и никто их выдать не может, но абстрагируюясь от этого ответ будет 11184896. Зарезервированные счастливые можно при желании вычесть.

          Варианты: ваш "умный", мой с O(n) при построении variations и тот что выше:
          11184896
          0.010685138004191685
          11184896
          0.00014450500020757318
          11184896
          4.1819999751169235e-05


  1. Andrey_Solomatin
    29.12.2023 11:15
    +12

    Вам интересно, почему быстрая функция намного быстрее? Возможно, стоит прочитать книгу об оптимизации низкоуровневого кода, над которой я работаю.



    Два раза заполнить массив и один раз заполнить массив. Что здесь низкоуровневого?

    Может сначала надо научиться пользоваться оптимизациями numpy?

    Зачем вам умный контейнер для быстрых вычислений над элементами, если вы делаете всё снаружи?


    Если вам надо сделать один массив из другого, да еще и одинаковой размерности, то надо все делать внутри.

    from timeit import timeit
    
    import numpy as np
    
    def f_bad(img, noise_threshold=5):
        result = np.empty(arr.shape, arr.dtype)
        for i in range(result.shape[0]):
            result[i] = 0 if img[i] < noise_threshold else img[i]
        return result
    
    def f_good(img, noise_threshold=5):
        return np.where(img < noise_threshold, 0, arr)
    
    
    if __name__ == "__main__":
        arr = np.array(list(range(100)))
        print(timeit("f_bad(arr)", globals=globals())) # 15.4994
        print(timeit("f_good(arr)", globals=globals())) # 1.2924
    
    


  1. Dominux
    29.12.2023 11:15
    +4

    При выполнении параллельной программы, активно задействующей CPU, нам часто необходимо, чтобы пул потоков или процессов имел размер, сопоставимый с количеством ядер CPU на машине

    Распараллеливание CPU-bound задач сводится к их непосредственному запуску процессов на разных ядрах CPU. Потоки, в данном случае, не подходят в принципе, т.к. процесс исполняется на одном ядре, и сколько потоков не выделяй -- они все будут исполняться в рамках одного.

    Конкретно Python располагает не малым арсеналом встроенных механизмов распараллеливания задач на разных ядрах. Так, например, могу порекомендовать посмотреть в сторону ProcessPoolExecutor, т.к. он предоставляет весьма удобный интерфейс для управления пулом процессов на высоком уровне

    Количество ядер, которое можно эффективно использовать, зависит от того кода, который напишете вы!

    Интересно, а как интерпретатор должен догадаться за вас, что, когда и как ему нужно распараллеливать. Это уже какое-то чтение мыслей получается

    Прибавив ещё и то, что автор берет numpy и не знает, что это обёртка над C-библиотекой, которая спрашивает о выделении процессов у ОС, а не у CPython, и выходит, что статья написана в стиле "Я тут зашел в ваш Python и чет он ничего не может"

    P.s.: считаю, что на хабре кроме редактуры на грамматику необходимо также ввести цензор на ошибочные или глупые статьи, а то пока мы имеем вот такие статьи, где автор искренне негодует по поводу того, что интерпретатор не может сам догадаться и исполнить его код так, как он хочет, а потоки почему-то не исполняются в рамках разных ядер


    1. slonopotamus
      29.12.2023 11:15
      +3

      Потоки, в данном случае, не подходят в принципе, т.к. процесс исполняется на одном ядре, и сколько потоков не выделяй -- они все будут исполняться в рамках одного.

      Эммм... Нет. В питоне многопоточность убита GIL'ом, но во-первых в данном примере его выключили, а во-вторых, в других языках GIL'а может и не быть вовсе. При этом использование потоков вместо процессов позволяет создавать более быстрые программы, за счёт отсутствия необходимости пересылать данные между адресными пространствами.


  1. atd
    29.12.2023 11:15
    +1

    Замеры были на 12700k /thread

    ну и вообще, переводите ещё и комменты к оригинальной статье, там автору уже напихали в панамку за бенчмарк непонятно чего непонятно как


  1. anonymous
    29.12.2023 11:15

    НЛО прилетело и опубликовало эту надпись здесь