Вашему вниманию простой C++ template аллокатора с потокобезопасным циклическим буфером.

Вся реализация в одном заголовочном .h файле: [fast_mem_pool.h]

Фишки, чем этот аллокатор лучше сотни подобных — под катом.

Вот как работает мой велосипед.

1) В Release сборке нет ни одного мьютекса и ни одного цикла ожидания на atomic — но при этом аллокатор циклический, и непрерывно регенерирует ресурсы по мере их высвобождения потоками. Как он это делает?

Каждая аллокация оперативной памяти, которую FastMemPool отдаёт через fmalloc, на самом деле больше на заголовок:

  struct AllocHeader {
//  метка своих аллокаций: tag_this = this + leaf_id
    uint64_t  tag_this  {  2020071700  };  
// размер аллокации:
    int  size;  
// быстрый доступ к листу аллокации:
    int  leaf_id  {  -2020071708  };  
  };

Этот заголовок можно всегда получить из указателя, которым владеет пользователь, отмотав вниз от указателя (res_ptr) размер sizeof(AllocHeader):

image

По содержимому заголовка AllocHeader метод ffree(void *ptr) распознаёт свои аллокации и узнаёт, в какой из листов циклического буфера возвращается память:

  void  ffree(void  *ptr)
  {
    char  *to_free  =  static_cast<char  *>(ptr)  
         -  sizeof(AllocHeader);
    AllocHeader  *head  =  reinterpret_cast<AllocHeader  *>(to_free);

Когда в аллокатор обращаются за выделением памяти, тот смотрит на текущем листе массива листов может ли отрезать требуемый объем + размер заголовка sizeof(AllocHeader).

В аллокаторе заранее зарезервировано Leaf_Cnt листов памяти, каждый лист размером Leaf_Size_Bytes (тут всё традиционно). В поисках возможности аллокации метод fmalloc(std::size_t allocation_size) пройдёт круг по листам массива leaf_array, и если везде всё занято, то при при условии что разрешен флаг Do_OS_malloc, возьмёт память у операционной системы больше необходимого размера на sizeof(AllocHeader) — вне зависимости взята память из внутреннего циклического буфера или из OS, аллокатор всегда создаёт заголовок со служебной информацией. Если в аллокаторе память закончилась и флаг Do_OS_malloc == false, то fmalloc вернёт nullptr — такое поведение полезно чтобы контролировать нагрузку (например пропускать кадры с видеокамеры когда модули обработки кадров не успевают за FPS камеры).

Как реализована цикличность


Циклические аллокаторы рассчитаны на цикличность задач — задачи не должны длиться вечно. Например, это могут быть:

  • аллокации под сессии пользователей
  • обработка кадров видеопотока для видео аналитики
  • жизнь боевых юнитов в игре

Так как в массиве leaf_array листов памяти может быть любое количество, в пределе можно сделать по странице на теоретически возможное количество боевых юнитов в игре, так что с условием выбывания юнитов мы гарантированно получаем свободный лист памяти. На практике для видео аналитики мне обычно достаточно 16 больших листов, из которых несколько первых листов жертвуются на длительные нецикличные аллокации при инициализации детектора.

Как реализована потокобезопасность


Массив листов аллокаций работает без мьютексов… Защита от ошибок типа «data race» сделана следующим образом:

      char  *buf;
      // available == offset наоборот
      std::atomic<int>  available  {  Leaf_Size_Bytes  };
      // allocated == контроль деаллокаций
      std::atomic<int>  deallocated  {  0  };

Каждый лист памяти имеет 2 счётчика:

— available проинициализирован размером листа Leaf_Size_Bytes. При каждой аллокации происходит уменьшение этого счётчика, и этот же счётчик служит смещением относительно начала листа памяти == память выделяется с конца буфера:

result_ptr  =  leaf_array[leaf_id].buf + available_after;

— deallocated проинициализирован { 0 } нулём, и при каждой деаллокации на данном листе (о том на каком из листов или из OS идёт деаллокация я узнаю из AllocHeader) счётчик увеличивается на высвобожденный объём:

const int  deallocated  =  leaf_array[head->leaf_id].deallocated.fetch_add(real_size, std::memory_order_acq_rel)  +  real_size;

Как только счётчики вот так (deallocated == (Leaf_Size_Bytes — available)) совпадают, это означает что всё что было аллоцировано теперь освобождено и можно сбросить лист в исходное состояние, но тут тонкий момент: а что будет если уже после принятия решения о сбросе листа в исходное, кто-то аллоцирует с листа ещё один маленький кусочек памяти… Чтобы подобное исключить используется проверка compare_exchange_strong:

if (deallocated  == (Leaf_Size_Bytes - available))
{  // всё что было аллоцировано теперь возвращено,
  // попытаемся, осторожно, сбросить Leaf
  if (leaf_array[head->leaf_id].available
      .compare_exchange_strong(available,  Leaf_Size_Bytes))
  {
    leaf_array[head->leaf_id].deallocated  -=  deallocated;
  }
}

Сброс листа памяти в исходное состояние происходит только если в момент сброса сохранилось тот же состояние счётчика available, что было на момент принятия решения. Та-даа!!!

Приятным бонусом будет то что по заголовку AllocHeader каждой аллокации можно отлавливать следующие баги:

  • повторная деаллокация
  • деаллокация чужой памяти
  • buffer overflow
  • обращение к чужой области памяти

На этих возможностях реализована вторая фишка.

2) В Debug компиляции предоставляется точная информация где была предыдущая деаллокация при повторной деаллокации: имя файла, номер строки кода, название метода. Реализовано это в виде декораторов вокруг базовых методов (fmallocd, ffreed, check_accessd — у debug версии методов на конце буква d):

/**
 * @brief FFREE  -  функция освобождения аллокации вместо free
 * @param iFastMemPool  -  экземпляр FastMemPool в котором проводили аллокацию
 * @param ptr  -  указатель на аллокацию через fmaloc
 */
#if defined(Debug)
#define FFREE(iFastMemPool, ptr)    (iFastMemPool)->ffreed (__FILE__, __LINE__, __FUNCTION__, ptr)
#else
#define FFREE(iFastMemPool, ptr)    (iFastMemPool)->ffree (ptr)
#endif

Используются макросы препроцессора:

  • __FILE__ — файл исходника c++
  • __LINE__ — номер линии в файле исходника c++
  • __FUNCTION__ — имя функции где это произошло

Эта информация сохраняется в виде соответствия указателя на аллокацию и информации об аллокации в медиаторе:

  struct AllocInfo {
// кто произвёл операцию: файл, номер строки кода, в каком методе:
    std::string  who;  
//  true - аллоцировано,  false - деаллоцировано:
    bool  allocated  {  false  };  
  };
  std::map<void *,  AllocInfo>  map_alloc_info;
  std::mutex  mut_map_alloc_info;

Так как для отладки скорость уже не так важна, был использован мьютекс защищающий стандартный std::map. Параметр шаблона (bool Raise_Exeptions = DEF_Raise_Exeptions) влияет на то вызывать ли Exception на ошибку.

Для тех кто хочет максимального комфорта в Release сборке, можно установить флаг DEF_Auto_deallocate, тогда все OS malloc аллокации будут записаны (уже под мьютексом в std::set<>) и в деструкторе FastMemPool высвобождены (использование в качестве трекера аллокаций).

3) Для исключения ошибок типа «buffer overflow» рекомендую использовать проверку FastMemPool::check_access перед тем как начать работать с аллоцированной памятью. В то время как операционная система жалуется только когда залезаешь в чужую оперативную память, функция check_access (или макрос FCHECK_ACCESS) по заголовку AllocHeader вычисляет будет ли выход за пределы данной аллокации:

  /**
   * @brief check_access  -  проверка возможности доступа к целевой области памяти
   * @param base_alloc_ptr - предполагаемый адрес базовой аллокации из FastMemPool
   * @param target_ptr  -  начало целевой области памяти
   * @param target_size  -  размер структуры, к которой нужен доступ
   * @return - true если целевая область памяти принадлежит базовой аллокации из FastMemPool
   */
  bool  check_access(void  *base_alloc_ptr,  void  *target_ptr,  std::size_t  target_size)

// Пример использования:
  if (FCHECK_ACCESS(fastMemPool, elem.array, 
      &elem.array[elem.array_size - 1], sizeof (int))) 
  {
    elem.array[elem.array_size - 1] = rand();
  }

Зная указатель начальной аллокации всегда можно получить заголовок, из заголовка узнаем размер аллокации, а дальше рассчитываем будет ли целевой элемент находится в пределах начальной аллокации. Проверить достаточно 1 раз перед началом цикла обработки на максимальных теоретически возможных доступах. Очень даже может быть что предельные значения пробьют границы аллокации (например в расчетах закладываешься что какая-то переменная может гулять только в определенном диапазоне в силу физики процесса, и поэтому не делаешь проверки на пробитие границы аллокации).

Лучше 1 раз проверить, чем потом неделю убить на поиски того кто изредка пишет в твои структуры случайные данные…

4) Настройка Default кода шаблона в compile time через CMake.

В CmakeLists.txt указываются настраиваемые параметры, например:

set(DEF_Leaf_Size_Bytes "65536" CACHE PATH "Size of each memory pool leaf")
message("DEF_Leaf_Size_Bytes: ${DEF_Leaf_Size_Bytes}")
set(DEF_Leaf_Cnt "16" CACHE PATH "Memory pool leaf count")
message("DEF_Leaf_Cnt: ${DEF_Leaf_Cnt}")

Так параметры становится очень удобно редактировать в QtCreator:

image

или CMake GUI:

image

Далее параметры передаются при компиляции в код так:

set(SPEC_DEFINITIONS
      ${CMAKE_SYSTEM_NAME}
      ${CMAKE_BUILD_TYPE}
      ${SPEC_BUILD}
      SPEC_VERSION="${Proj_VERSION}"
      DEF_Leaf_Size_Bytes=${DEF_Leaf_Size_Bytes}
      DEF_Leaf_Cnt=${DEF_Leaf_Cnt}
      DEF_Average_Allocation=${DEF_Average_Allocation}
      DEF_Need_Registry=${DEF_Need_Registry}
  )
#через
target_compile_definitions(${TARGET} PUBLIC ${TARGET_DEFINITIONS})

и в коде переопределяют в Default значения шаблона:

#ifndef DEF_Leaf_Size_Bytes
  #define DEF_Leaf_Size_Bytes  65535
#endif


template<int Leaf_Size_Bytes = DEF_Leaf_Size_Bytes, 
    int Leaf_Cnt = DEF_Leaf_Cnt,
    int Average_Allocation = DEF_Average_Allocation,
    bool Do_OS_malloc = DEF_Do_OS_malloc,
    bool Need_Registry = DEF_Need_Registry, 
    bool Raise_Exeptions = DEF_Raise_Exeptions>
class FastMemPool
{
// ..
};

Так шаблон аллокатора можно комфортно настраивать мышкой включая/выключая галочки CMake параметров.

5) Для того чтобы можно было использовать аллокатор в STL контейнерах в том же .h -нике в шаблоне FastMemPoolAllocator частично реализованы возможности std::allocator:

// можно инжектировать как compile time шаблон :
std::unordered_map<int,  int, std::hash<int>,
  std::equal_to<int>,
  FastMemPoolAllocator<std::pair<const int,  int>> >   umap1;

// можно инжектировать как runtime объект :
std::unordered_map<int,  int>  umap2(
   1024, std::hash<int>(),
   std::equal_to<int>(),
   FastMemPoolAllocator<std::pair<const int,  int>>());

Примеры использования можно посмотреть тут: test_allocator1.cpp и test_stl_allocator2.cpp.

Например работа конструкторов и деструкторов на аллокации:

bool test_Strategy()
{
  /*
   * Инжектируем Стратегию аллокации в Runtime
   *  (будем использовать конкретный экземпляр такого типа)
 */
  using MyAllocatorType = FastMemPool<333, 33>;
// instance of:
  MyAllocatorType  fastMemPool;  
// inject instance:
  FastMemPoolAllocator<std::string,
     MyAllocatorType > myAllocator(&fastMemPool); 
  // аллоцируем кусок памяти на 3 строки:
  std::string* str = myAllocator.allocate(3);
  // даём отработать конструкторам каждой строки: 
  myAllocator.construct(str, "Mother ");
  myAllocator.construct(str + 1, " washed ");
  myAllocator.construct(str + 2, "the frame");

//как-то используем
  std::cout << str[0] << str[1] << str[2]; 

  // вызываем деструкторы:
  myAllocator.destroy(str);
  myAllocator.destroy(str + 1);
  myAllocator.destroy(str + 2);
  // возвращаем память:
  myAllocator.deallocate(str, 3);
  return  true;
}

6) Бывает в большом проекте делаешь какой-то модуль, тщательно всё протестировал — работает как швейцарские часы. Твой модуль включают в состав Детектора, ставят на бой — и там иногда, раз в день, библиотека начинает падать в дамп. Запустив дамп на дебагере обнаруживаешь что в одном из циклов обхода указателей вместо nullptr в твой указатель кто-то записал цифру 8 — перейдя на этот указатель ты естественно вызвал гнев операционной системы.

Как сузить диапазон возможных виновников? Исключить из подозреваемых свои структуры очень просто — их необходимо передвинуть в RAM в другое место (туда где диверсант не бомбит):

image

Как это легко сделать с помощью FastMemPool? Очень просто: в FastMemPool аллокация происходит откусыванием с конца страницы памяти — запросив страницу памяти больше чем Вам необходимо для работы, Вы гарантируете что начало страницы памяти останется полигоном для глючных бомбардировок. Например:

FastMemPool<100000000, 1, 1024, true, true>  bulletproof_mempool;
void *ptr = bulletproof_mempool.fmalloc(1234567);
// ..
// делаем что-то полезное для людей c ptr
// ..
bulletproof_mempool.ffree(ptr);

Если и на новом месте кто-то бомбит твои структуры — то это скорей всего ты сам…

Иначе если библиотека стабилизируется, то команда получает сразу несколько подарков:

  • твой алгоритм опять работает как швейцарские часы
  • глючный кодер может теперь безопасно бомбить пустую область памяти (пока все его ищут), библиотека стабильно работает.
  • полигон бомбардировки можно мониторить на изменение памяти — поставить капканы на глючного кодера.



Итого, какие преимущества даёт именно этот велосипед:

  • потокобезопасная быстрая аллокация и деаллокация памяти
  • функция проверки доступа (даёт понимание своя/чужая память даже в рамках одного процесса)
  • защита от двойной деаллокации, возможность в Debug узнать, где именно была сделана первая деаллокация
  • если заканчивается свой пул памяти, то может взять у операционной системы и при этом сохранить функциональность проверки доступа/защитить от двойной деаллокации
  • если заканчивается свой пул памяти, то можно НЕ брать дополнительно у операционной системы (возвращать nullptr), оставаясь в рамках аллокаций циклического буфера — это очень полезно когда требуется оставаться в рамках выделенных ресурсов, например при анализе поступающих видео кадров (когда обработка каждого кадра идёт медленнее чем FPS камеры, FastMemPool становится своего рода регулятором-предохранителем).

В нашей компании для установки анализа 3D геометрии листов металла потребовалась многопоточная обработка видеокадров (50FPS). Листы проезжают под камерой и по отражению лазера я строю 3D карту листа. Для обеспечения максимальной скорости работы с памятью и безопасности был использован FastMemPool. Если потоки не справляются с поступающими кадрами, то сохранение кадров для будущей обработки обычным способом ведёт к неконтролируемому расходу оперативной памяти. С FastMemPool при переполнении будет просто возвращён nullptr при аллокации и кадр будет пропущен — на итоговом 3D изображении такой дефект в виде скачка ступенькой показывает что необходимо добавить потоков CPU в обработку.

Безмьютексная работа потоков с циклическим аллокатором памяти и стеком задач позволила обрабатывать поступающие кадры очень быстро, без потерь кадров и без переполнения RAM. Сейчас этот код работает в 16 потоков на CPU AMD Ryzen 9 3950X, сбоев в работе класса FastMemPool не выявлено.

Упрощённый пример-схему работы процесса видеоаналитики с контролем от переполнения RAM можно увидеть в исходнике test_memcontrol1.cpp.

И на десерт: в этом же примере-схеме используется безмьютексный стек:

using  TWorkStack = SpecSafeStack<VideoFrame>;
//..
  // Video frames exchanger:
TWorkStack  work_stack;
//..
work_staff->work_stack.push(frame);
//..
VideoFrame * frame = work_staff->work_stack.pop();

Работающий демо-стенд со всеми исходниками тут на gihub.