текст обновлен 12.03.16 07:53

При планировании любой задачи мы стремимся как можно точнее конкретизировать запросы, определить исходные данные и по возможности избавиться от любой неопределенности, мешающей просчитать конечный результат. Однако при разработке высокоуровневой логики не всегда уделяется внимание таким простым казалось бы вещам, как размещение данных в памяти, менеджмент потоков, обрабатывающих наш функционал, особенности реализации динамических массивов или бинарный интерфейс процедур. Когда написанная программа предельно лаконична и оптимизирована, но при этом работает не так быстро, как хотелось бы, закономерно возникает вопрос: «а что еще можно улучшить?» Насколько можно доверять низкоуровневому инструментарию, написанному профессиональными программистами, безусловно разбирающимися в своем деле, но при этом ни черта не понимающими в тех идеях, что вы хотите реализовать? Фрагментация, зацикленность, прерывания, события, объекты, уведомления, каждое новое знакомство с Си-шными или WinAPI-шными библиотеками подталкивает к очевидной мысли: «зачем такая громоздкая реализация?» Почему нельзя просто сделать менеджер кучи выделяющий память за строгое количество шагов? Использовать real-time статистику при работе с разделяемыми данными, вместо сложной системы семафоров и уведомлений? Наращивать динамические массивы без переразмещения и обращаться к случайной ячейке за одинаковое время после любого количества реформаций? Миссия не кажется невыполнимой. Самое время поэкспериментировать.

Предлагаю широкому вниманию процедурную реализацию менеджера кучи, способного выделять диапазоны памяти заказных размеров. Цикл поиска при этом не превышает нормированных пределов. Работа с памятью состоит всего из двух процедур: memAlloc и memRecycle, выполняющих связанные функции. Потокобезопасность поддерживается с помощью дополнительного инструментария, в свою очередь состоящего еще из нескольких процедур. О распараллеливании немного поподробнее: принцип базируется на ассоциированной блокировке разделяемых данных, схожих с блокировкой шины методами Interlocked, однако без блокировки шины. Выбор такого подхода прост — блокировка шины работает немного медленнее. Весь процесс сводится к созданию позиционных стопоров, являющихся по сути диапазонами памяти, содержащими однобайтовые метки состояний каждого из потоков в доступном пуле. Размер выводится из этого соотношения. Поскольку я использую восьмипотоковый пул (а больше мне не нужно), то позиционные стопоры у меня занимают 8 байт (64 бита). Перед перезаписью разделяемой информации стопор блокируется исполняющим потоком, записывая метку в байт под смещением своего номера в пуле. Другие потоки не будут работать с разделяемыми данными пока стопор не обнулится, выполняя Sleep, либо откладывая задачу, либо считая овечек в цикле, на выбор программиста.

Забегая немного вперед скажу что в сравнении со стандартными malloc/free, новоизобретенным колесам велосипеда удается работать быстрее от нескольких десятков до сотни раз. Оценить результаты многопоточных тестов можно в конце статьи. Довольно ощутимая разница особенно ярко проявляет себя при работе с масштабными потоками данных, поступающих с разных направлений одновременно. Оптимизация казалось бы рутинного процесса аллоцирования памяти оказывается особенно актуальной при работе с логическими и векторными сетями, когда разнотипные данные, принадлежащие связанным множествам не могут быть записаны в последовательные массивы. Размещение таких данных осуществляется иерархически и каждый диапазон модифицируется, переразмещается и удаляется независимо. Только представьте себе чтение текста из 70 тыс строк и сохранение его логической структуры и вы сразу поймете, что процедура распределения памяти — это основа. Тот фундамент на котором должен строиться грамотный алгоритм.

Особенности реализации
Основной особенностью алгоритма выделения памяти в данном менеджере является перераспределение вычислений. Часть операций, необходимых для поиска последовательного диапазона внутри фрагментированной памяти, выполняется как предподготовка при запуске функции отработки memRecycle. Идея этого заключена в делении возвращаемых отрезков на размерные группы, количество которых определяется программистом на этапе запуска процедуры-конструктора. Именно цепочка отработки проверяется первой при запросе нового диапазона. Именно количество размерных групп является величиной цикла поиска перед выделением нового диапазона. memAlloc ищет группу, соответствующую размеру запрошенного отрезка + 1, после чего берет указатель на первый в цепочке диапазон, поскольку он безусловно больше запрашиваемого на одну ступень. При отсутствии в цепочках отработки диапазона необходимого размера, память берется из самой большой верхней группы, в которой хранится неразмеченый запас, величина которого также указывается при запуске конструктора.

Таким образом для получения одного отрезка длины не менее запрошенной, обработчик делает однократный цикл с фиксированным количеством шагов (по размерным группам) в поисках отработанных диапазонов и берет первый попавшийся нужной размерности + 1. Для возврата отрезка обратно memRecycle выполняет еще меньше действий, всего лишь удлиняя цепочку нужной размерной группы еще одним указателем. Без циклов. Этой нехитрой последовательностью действий достигается высокая производительность: что такое для современных процессоров цикл из двух сотен проверок опустошения указателя, когда они способны исполнять циклы из миллионов шагов менее чем за 1 миллисекунду.



Реализация потокобезопасности является вариацией на тему программно-транзакционной памяти и осуществляется ступенчатым процессом, представляющим собой одну неразрывную логическую последовательность. Условие для получения эксклюзивного доступа к разделяемому диапазону памяти достигается тривиальной установкой битовой метки в логически связанный позиционный стопор, в котором для каждого потока из пула выделен собственный байт. ASM-процедура перед обращением к разделяемой памяти проверяет стопор на нуль, при соответствии записывая метку в байт со смещением номера потока. После чего стопор подвергается проверке по маске, в которой должен присутствовать только один бит под нужным смещением. Несоответствие означает одновременное редактирование стопора параллельными потоками, процедура возвращает нуль. Каждый поток, получивший нуль при попытке перезаписи разделяемых данных уходит в цикл ожидания, реализация которого может подбираться на свой вкус, после чего следует повторная попытка. При соответствии маски процедура возвращает единичку, что свидетельствует об эксклюзивном доступе к данным.

Стоит отметить, что при работе вне пула, в потоке не имеющем номера из пула, все же используется блокировка шины. Важным моментом в свете этого решения является количество потоков, допущенных для работы с разделяемыми данными и соблюдение программистом корректности масштабирования алгоритма. Поскольку в данной реализации оно ограничено оптимальным с точки зрения операционных издержек числом 8, а также используются стопоры размером 8х8=64 бита, то все инструкции используют qword ptr для чтения или обработки. Создание оперативного пула для выполнения задач требует минимальной настройки исходных данных, а именно: присвоение уникальных номеров от нуля до значения workPoolMax, определяемого программистом и согласуемого с размером стопоров. Если планируется доступ к разделяемым данным вне пула, например из main-потока, висящего в ожидании пользовательского ввода или других, необходимо присвоить им номера меньше нуля (или больше workPoolMax, с добавлением соответствующих условий в процедуры) и вместе с этим сократить размер рабочего пула, чтобы не превышать общее кол-во потоков, допущенных к работе с разделяемыми данными.



Листинг ассемблерных процедур:

cnsBit PROC				;check and set bit
	cmp rdx,0			;первичная проверка на нумерацию потока
	jl @negtv			;отрицательные значения не относятся к нумерованному пулу и обрабатываются отдельно

	cmp qword ptr [rcx],0		;проверка метки на нуль
	jne @fin			;выход при несоответствии

	mov r8,rdx			;подготовка номера потока для смещения
	mov rax,8			;подготовка множителя, биты устанавливаются в начале каждого байта
	mul rdx				;умножение для получения итогового смещения

	mov byte ptr [rcx+r8],1		;установка значения по нужному адресу
	bts rdx,rax			;установка бита в маске по необходимому смещению

	cmp qword ptr [rcx],rdx		;проверка метки по маске
	jnz @end			;выход при несоответствии

	@scess:				;блок соответствия
	mov rax,1			;собственный бит присутствует в диапазоне, сторонние биты отсутствуют
	ret

	@end:				;блок несоответствия
	mov byte ptr [rcx+r8],0		;обнуление байта в стопоре

	@fin:				;блок безусловного выхода
	mov rax,0			;отсутствует собственный бит, либо присутствуют сторонние биты
	ret

	@negtv:				;блок для ненумерованных потоков
	xor rax,rax			;сравниваемый операнд равен нулю
	mov rdx, -1			;стопор будет заполнен битами по всему объему
	lock cmpxchg qword ptr[rcx],rdx	;compare exchange
	jnz @fin
	jmp @scess
cnsBit ENDP


setBit PROC				;set bit
	cmp rdx,0			;первичная проверка на нумерацию потока
	jl @negtv			;отрицательные значения не относятся к нумерованному пулу и обрабатываются отдельно

	mov byte ptr [rcx+rdx],1	;установка значения по нужному адресу
	@fin:
	ret

	@negtv:
	mov rdx, -1			;стопор будет заполнен битами по всему объему
	lock xchg qword ptr [rcx],rdx	;установка значения по нужному адресу
	ret
setBit ENDP


remBit PROC				;remove bit
	cmp rdx,0			;первичная проверка на нумерацию потока
	jl @negtv			;отрицательные значения не относятся к нумерованному пулу и обрабатываются отдельно

	mov byte ptr [rcx+rdx],0	;удаление бита из метки
	@fin:
	ret

	@negtv:
	xor rax,rax
	lock xchg qword ptr [rcx],rax	;установка значения по нужному адресу
	ret
remBit ENDP


Клоны Interlocked процедур — threadExchange, threadCompareExchange и threadExchangeAdd выполняют те же функции, что и оригиналы. Наверно их реализация не заслуживает внимания, однако кому-то возможно пригодится:

threadRepeatCounterArray = [9];
*threadRepeatCounter = &threadRepeatCounterArray[1] ;		//счетчик повторов
INT64 threadSleepTimer = 1000;
INT64 pError = 0;						//флаг ошибки


void threadStopperHold(INT64 threadNum, INT64 *stopper)
{
	INT64 ret = 0;
	for (;;) {
		if (cnsBit((INT64)stopper, threadNum) == 1)
			break;
		threadRepeatCounter[threadNum] += 1;
		for (int i = 0; i < threadSleepTimer * (threadNum + 1); i++)
			Sleep(0);
	}
	return;
}
INT64 threadExchangeAdd(INT64 threadNum, INT64 *counter, INT64 *stopper, INT64 value)	//итерация счетчиков. переданное значение прибавляется к находящемуся в целевом диапазоне, изначальное возвращается.
{
	INT64 ret = 0;
	for (;;) {
		if (cnsBit((INT64)stopper, threadNum) == 1) {
			ret = *counter;
			*counter += value;
			*stopper = 0;
			break;
		}
		threadRepeatCounter[threadNum] += 1;
		for (int i = 0; i < threadSleepTimer * (threadNum + 1); i++)
			Sleep(0);
	}
	return ret;
}


INT64 threadExchange(INT64 threadNum, INT64 *pointer, INT64 *stopper, INT64 value)		//замена целевого значения на переданное. изначальное возвращается.
{
	INT64 ret = 0;
	for (;;) {
		if (cnsBit((INT64)stopper, threadNum) == 1) {
			ret = *pointer;
			*pointer = value;
			*stopper = 0;
			break;
		}
		threadRepeatCounter[threadNum] += 1;
		for (int i = 0; i < threadSleepTimer * (threadNum + 1); i++)
			Sleep(0);
	}
	return ret;
}


INT64 threadCompareExchange(INT64 threadNum, INT64 *pointer, INT64 *stopper, INT64 value, INT64 compareVal)	//замена целевого значения на переданное при условии соответствия компаранду. изначальное возвращается.
{
	INT64 ret = 0;
	for (;;) {
		if (cnsBit((INT64)stopper, threadNum) == 1) {
			ret = *pointer;
			if (*pointer == compareVal) {
				*pointer = value;
			}
			*stopper = 0;
			break;
		}
		threadRepeatCounter[threadNum] += 1;
		for (int i = 0; i < threadSleepTimer * (threadNum + 1); i++)
			Sleep(0);
	}
	return ret;
}

INT64 threadPointerScroll(INT64 threadNum, INT64 *pointer, INT64 *stopper, INT64 **ptrValue)	//прокрутка указателя-счетчика. целевое значение меняется на значение, находящееся в данный момент по переданному указателю. изначальное возвращается.
{
	INT64 ret = 0;
	for (;;) {
		if (cnsBit((INT64)stopper, threadNum) == 1) {
			if (*ptrValue == 0)
				goto error;
			ret = *pointer;
			*pointer = **ptrValue;
			*stopper = 0;
			break;
		}
		threadRepeatCounter[threadNum] += 1;
		for (int i = 0; i < threadSleepTimer * (threadNum + 1); i++)
			Sleep(0);
	}
	return ret;
error:
	*stopper = 0;
	return &pError;
}


Пошаговое описание алгоритма
memoryAllock выделяет указанный отрезок из общего диапазона для размещения данных. Процедура работает с последовательным диапазоном, а также цепочкой непоследовательных отрезков, выделенных в ходе работы и разбитых на размерные группы. Группы в свою очередь делятся на две категории: малых и больших размеров. Деление необходимо для повышения точности выборки, поскольку разномерные данные, не превышающие размера одной страницы целесообразно разделять с малой градацией, масштабные структуры напротив с большей. Цепочка размерных групп не является непрерывной и представляет собой некоторое множество цепочек с персональными точками входа. Также цепочки отрезков для размещения разделены на блоки фиксированной длины (1000 шт) и представляют собой цепочки блоков, содержащих массив указателей на отрезки. Такая распределенная структура позволяет избежать излишнего многократного обращения к разделяемым данным при работе в параллельных потоках, поскольку счетчики, стопоры, публичные указатели и т.д. существуют отдельно в каждом блоке. Процедура позволяет независимо от размера диапазона памяти или цепочки отрезков производить выделение памяти или освобождение за фиксированное количество шагов. При наличии освобожденных ранее отрезков функция прежде всего предварительно проверяет среди них подходящий по размеру.

Указатели на отрезки, расположенные в блоках размерных групп, использутся одноразово. Сами блоки также извлекаются из цепи после полной однократной отработки. Recycle-блоки с указателями представляют собой цепочку из последовательно поступивших по времени указателей на освобожденные отрезки памяти, разгруппированные по размеру. Блоки ограничены по количеству хранимых указателей и в случае переполнения создаются дополнительные блоки, встраивающиеся в цепочку между текущим заполненным и следующим по размеру. Цепочка является динамической и в случае опустошения блока при повторных размещениях, блок вырезается из цепи. Изначально цепь состоит лишь из запаса неразмеченной памяти, и растет по мере возврата отработанных отрезков функцией Recycle.

Служебные данные отрезка: первые 8 байт — размер, последние 24 байта — указатель на начало отрезка, указатель на блок отработки и позицию в блоке. Каждый блок указателей устанавливает ссылку на себя в точку входа, если является первым в своей размерной группе, создавая цепь. При высвобождении всех указателей в текущей размерной группе, точка входа обнуляется. Работа с цепочкой начинается с нужной точки входа и дальнейшие перемещения осуществляются при помощи смежных указателей между блоками. При поиске необходимого отрезка памяти для размещения данных, функция Alloc проверяет точку входа округляя до следующей по размеру группы, таким образом для соответствия запросу будет достаточно первого отрезка в очереди без необходимости циклического поиска.

Описание структур:
отрезок выделяемой памяти
4 * INT64 + n
[-1] — заявленный размер отрезка
[n+1] — указатель на начало отрезка
[n+2] — указатель на блок отработки для сброса при дефрагментации
[n+3] — указатель на собственную позицию в блоке отработки

блок отработки с одноразовыми указателями:
(4 + n) * INT64
[-4] — указатель на следующий блок
[-3] — счетчик для извлекаемых указателей
[-2] — счетчик добавляемых указателей
[-1] — восьмипозиционный стопор
[0-999] — указатели на отрезки памяти одной размерной группы


INT64 pageSize;
SYSTEM_INFO sSysInfo;
INT64 SIZEOF_INT64 = sizeof(INT64);

//глобальные параметры
INT64 *baseMemStore = 0;		//указатель на неразмеченый запас
INT64 *memRecycleStore = 0;		//служебный диапазон
INT64 *memRecyclePoint = 0;		//точки входа в размерные группы
INT64 memRecycleBlock = 0;		//указатель на цепочку неактуальных блоков отработки
INT64 memClear = 0;			//модификатор для побайтового обнуления выделенного отрезка
INT64 memNoDefrag = 0;			//модификатор для переработки без дефрагментации
INT64 memFragmentCountArray[9];		//количество фрагментированных отрезков памяти для статистики
INT64 *memFragmentCount = &memFragmentCountArray[1];
INT64 memBlockLimit = 1000;		//максимальный лимит блока для отработанных отрезков памяти
INT64 memGroupLowLimit = 100;		//лимит малых размерных групп. градация: memGroupPartition / memGroupLowLimit
INT64 memGroupHighLimit = 100;		//лимит больших размерных групп. градация: (memStoreSize - memGroupPartition) / memGroupHighLimit
INT64 memGroupPartition = 3200;		//граница разделения групп. также используется при фрагментации как показатель превышения.
INT64 memGroupLimit = 0;		//предел цикла поиска, высчитывается по формуле memGroupLowLimit + memGroupHighLimit
INT64 memStoreSize = 1024 * 256 * 4096;	//предельный размер кучи


void memConstruct() {
	memRecycleStore = (INT64*)VirtualAlloc(0, 4 * 256 * 4096, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE);
	memRecyclePoint = (INT64)memRecycleStore + 16;		//счетчик, стопор и заглушка для функции Recycle
	memGroupLimit = memGroupLowLimit + memGroupHighLimit;
	*memRecycleStore = (INT64)memRecycleStore + 24 + memGroupLimit * SIZEOF_INT64;
	//запрашивается виртуальный диапазон заданных размеров + служебные данные
	baseMemStore = (INT64*)VirtualAlloc(0, memStoreSize + 24 + 32, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE);
	*baseMemStore = (INT64)baseMemStore + 24;		//счетчик выделяемой памяти, стопор и заглушка для функции Recycle
}


INT64 memAlloc(INT64 threadNum, INT64 size, INT64 param)
{
	INT64 workPtr = 0, *chainPtr = 0, sizeGroup = 0;
	INT64 tempVal = 0;

	//проверка подмножеств малых и больших отрезков
	if (size < memGroupPartition)
		sizeGroup = size / (memGroupPartition / memGroupLowLimit);			
	else sizeGroup = size / ((memStoreSize - memGroupPartition) / memGroupHighLimit) + memGroupLowLimit;

//поиск среди групп размерности
groupSearch:
	//блок проверки точек входа. при получении опустошенных цепочек процедура возвращает управление этому блоку.
	for (int i = sizeGroup + 1; i < memGroupLimit; i++) {
		sizeGroup = i;
		if (memRecyclePoint[i] != 0)
			break;
	}

//поиск среди блоков отработки
getBlock:
	//сохранение указателя на группу размерности
	chainPtr = memRecyclePoint[sizeGroup];
	//если группы подходящей размерности отсутствуют, новый отрезок берется из запаса.
	if ((INT64)chainPtr == 0)
		goto getStore;

	//если блок одноразовых указателей достиг предела, он вырезается.
	if (chainPtr[-3] >= memBlockLimit) {
		//процедура прокручивает цепь на одну позицию
		tempVal = threadCompareExchange(threadNum, &memRecyclePoint[sizeGroup], &memRecycleStore[1], chainPtr[-4], (INT64)chainPtr);
		//поток, вырезавший одноразовый блок отработки из цепи, встраивает его в цепочку неактуальных блоков.
		if (tempVal == (INT64)chainPtr) {
		blockRecycle:
			//цепочка имеет одну точку входа, встраивание происходит от начала цепи. значение сохраняется для сверки.
			chainPtr[-4] = tempVal = memRecycleBlock;
			//в случае параллельной перезаписи, процедура повторится.
			if (threadCompareExchange(threadNum, &memRecycleBlock, &memRecycleStore[1], &chainPtr[-4], tempVal) != tempVal)
				goto blockRecycle;
		}
	}

	//поиск в цепочке указателей
	tempVal = chainPtr[-3];
	//проверка соответствия числа помещенных в блок указателей и извлеченных с контролем превышения
	if (tempVal < chainPtr[-2] && tempVal < memBlockLimit) {
		//итерация счетчика извлеченных указателей
		tempVal = threadCompareExchange(threadNum, &chainPtr[-3], &chainPtr[-1], tempVal + 1, tempVal);
		//указатель по полученному номеру сохраняется
		workPtr = chainPtr[tempVal];
		if (tempVal == 0 || tempVal >= memBlockLimit || (INT64)workPtr == 0) {
			//в случае несоответствия номера или неактуальности указателя чтение цепочки продолжится.
			workPtr = 0;
			goto getBlock;
		}

		//фрагментация отрезка памяти
		//несоответствие доступного размера отрезка и заявленного требует дополнительной обработки
		tempVal = *(INT64*)(workPtr - 8);	
		//если отрезок слишком велик, процедура образует два отрезка. Ненужный будет возвращен в свою группу размерности.
		if (tempVal - size > memGroupPartition) {
			*(INT64*)(workPtr + size + 24) = tempVal - size - 32;
			*(INT64*)(workPtr + tempVal) = (INT64)workPtr + size + 32;
			*(INT64*)(workPtr + tempVal + 8) = *(INT64*)(workPtr + tempVal + 16) = 0;
			//отделенный лишний отрезок уходит на переработку
			memRecycle(threadNum, (INT64)workPtr + size + 32, &memNoDefrag);
		}
		//размер найденного отрезка может оказаться больше заявленного, происходит индексирование.
		else size = *(INT64*)(workPtr - 8);

//индексирование количества фрагментированных отрезков для статистики
memFragmentCount[threadNum]--;
	}

	//работа с неразмеченым запасом
	//если рабочий указатель пуст, новый отрезок берется из запаса.
	if ((INT64)workPtr == 0) {
	getStore:
		//если просканированы не все регулярные точки входа, процедура возвращается на дополнительную проверку.
		if (sizeGroup < memGroupLimit - 1)
			goto groupSearch;
		//если в запасе не остается достаточного пространства, процедура возвращает ошибку размещения.
		if (baseMemStore[0] + size > (INT64)baseMemStore + memStoreSize + 24 + 32)
			return 0;
		//при отсутствии доступных отрезков в отработке, он берется из запаса неразмеченной памяти.
		workPtr = threadExchangeAdd(threadNum, &baseMemStore[0], &baseMemStore[1], size + 32) + 8;
	}

	//настройка отрезка
	//указатели на блок отработки сбрасываются
	*(INT64*)(workPtr + size + 8) = *(INT64*)(workPtr + size + 16) = 0;
	//указатель на начало отрезка
	*(INT64*)(workPtr + size) = (INT64)workPtr;
	//рабочий размер отрезка
	*(INT64*)(workPtr - 8) = size;

	//побайтовое обнуление рабочей области, при наличии необходимой команды.
	if (param == &memClear) {
		for (INT64 i = 0; i < size; i++)
			*(INT8*)(workPtr + i) = 0;
	}

	//возвращается указатель на нулевую точку рабочей области
	return workPtr;
}


void memRecycle(INT64 threadNum, INT64 ptr, INT64 param)
{
	INT64 workPtr = ptr, *chainPtr = 0, tempSize = 0, sizeGroup = 0;
	INT64 *tempPtr = 0, tempVal = 0;

	//в случае передачи в процедуру предварительно фрагментированного отрезка, дефрагментация пропускается
	if (param == &memNoDefrag)
		goto getBlock;

	//дефрагментация освобожденных ранее отрезков
	for (;;) {
		//сохранение указателя на блок отработки в позади идущем отрезке. его наличие означает что отрезок свободен.
		tempPtr = *(INT64*)(workPtr - 16);
		//сверка указателей блока отработки и начала текущего отрезка. при соответствии отрезок считается отработанным.
		if ((INT64)tempPtr == 0 || *tempPtr != *(INT64*)(workPtr - 32))
			break;
		tempVal = threadExchange(threadNum, (INT64)tempPtr, &memRecycleStore[1], 0);
		//если указатель на отрезок в блоке отработки был обнулен в параллельном потоке, процедура завершает поиск
		if (tempVal == 0)
			break;
		//при объединении отрезков, размер текущего прибавляется к следующему ниже в диапазоне
		tempSize = *(INT64*)(workPtr - 8);
		workPtr = *(INT64*)(workPtr - 32);
		*(INT64*)(workPtr - 8) += (tempSize + 32);

//индексирование количества фрагментированных отрезков для статистики
memFragmentCount[threadNum]--;
	}
	//итоговый размер обрабатываемого отрезка дополнительно сохраняется после всех преобразований
	tempSize = *(INT64*)(workPtr - 8);

	for (;;) {
		//сохранение указателя на блок отработки во впереди идущем отрезке. его наличие означает что отрезок свободен.
		tempVal = *(INT64*)(workPtr + tempSize + 24);
		//проверка заполнения следующего блока
		if (tempVal == 0)
			break;
		//размер впереди идущего отрезка прибавляются сразу, все дальнейшие вычисления идут с необходимым смещением.
		tempSize += tempVal + 32;
		tempPtr = *(INT64*)(workPtr + tempSize + 16);
		//сверка указателей блока отработки и начала текущего отрезка. при соответствии отрезок считается отработанным.
		if ((INT64)tempPtr == 0 || *tempPtr != *(INT64*)(workPtr + tempSize))
			break;
		tempVal = threadExchange(threadNum, (INT64)tempPtr, &memRecycleStore[1], 0);
		//если указатель на отрезок в блоке отработки был обнулен в параллельном потоке, процедура завершает поиск
		if (tempVal == 0)
			break;
		//при объединении заранее проиндексированный размер фиксируется в обрабатываемом отрезке
		*(INT64*)(workPtr - 8) = tempSize;

//индексирование количества фрагментированных отрезков для статистики
memFragmentCount[threadNum]--;
	}

//настройка блоков указателей
getBlock:
	//итоговый размер обрабатываемого отрезка дополнительно сохраняется после всех преобразований
	tempSize = *(INT64*)(workPtr - 8);
	//проверка подмножеств малых и больших отрезков
	if (tempSize < memGroupPartition)
		sizeGroup = tempSize / (memGroupPartition / memGroupLowLimit);
	else sizeGroup = tempSize / ((memStoreSize - memGroupPartition) / memGroupHighLimit) + memGroupLowLimit;
	//превышение пределов размерностей переводит указатель на крайнюю группу
	sizeGroup = (sizeGroup >= memGroupLimit) ? memGroupLimit - 1 : sizeGroup;	
	
	chainPtr = tempPtr = memRecyclePoint[sizeGroup];
	//если точка входа указывает на существующую цепочку, цикл делает прыжок к ближайшему блоку со свободными позициями.
	while ((INT64)chainPtr != 0) {
		if (chainPtr[-2] < memBlockLimit - 1)
			goto recycle;
		chainPtr = tempPtr = chainPtr[-4];
	}

//создание блока отработки при переполнении или отсутствии таковых в выбранной группе размерности
createBlock:
	//при наличии функция повторно использует потерявшие актуальность блоки отработки, в противном случае берет из запаса.
	if (memRecycleBlock != 0) {
		tempPtr = threadPointerScroll(threadNum, &memRecycleBlock, &memRecycleStore[1], &memRecycleBlock);
		if ((INT64)tempPtr == &pError)
			goto createBlock;
		//если попытка взять отработанный блок оказалась успешной, указатель на него прокручивается до нужной позиции.
		tempPtr = (INT64)tempPtr + 4 * SIZEOF_INT64;
	}
	else tempPtr = threadExchangeAdd(threadNum, &memRecycleStore[0], &memRecycleStore[1], (memBlockLimit + 4) * SIZEOF_INT64) + 4 * SIZEOF_INT64;
	//служебные данные нового блока отработки сбрасываются
	tempPtr[0] = tempPtr[-1] = tempPtr[-2] = tempPtr[-3] = tempPtr[-4] = 0;

//отработка отрезка
recycle:
	//удержание стопора в блоке отработки
	threadStopperHold(threadNum, &tempPtr[-1]);
	tempVal = tempPtr[-2];
	if (tempVal < memBlockLimit) {
		//после необходимых преобразований указатель на начало отрезка модифицируется
		tempPtr[tempVal] = *(INT64*)(workPtr + tempSize) = workPtr;
		//указатель на блок отработки и позицию для сброса при дальнейшей дефрагментации
		*(INT64*)(workPtr + tempSize + 8) = (INT64)tempPtr;
		*(INT64*)(workPtr + tempSize + 16) = &tempPtr[tempVal];
		tempPtr[-2] = tempVal + 1;
	}
	//освобождение стопора блока
	remBit(&tempPtr[-1], threadNum);
	//если на момент записи блок оказался переполнен, создается новый.
	if (tempVal >= memBlockLimit)
		goto createBlock;

	//реструктурирование цепочки размерных групп
	//при создании нового блока указателей, необходимо встроить его в цепь
	if ((INT64)tempPtr != (INT64)chainPtr) {
	makeChain:
		tempVal = &memRecyclePoint[sizeGroup];
		chainPtr = *(INT64*)tempVal;
		//цепь прокручивается к крайнему блоку
		while ((INT64)chainPtr != 0) {
			//если в цепи встретился добавляемый блок, значит встраивание уже произошло успешно.
			if (chainPtr[-4] == (INT64)tempPtr)
				goto finish;
			tempVal = &chainPtr[-4];
			chainPtr = *(INT64*)tempVal;
		}
		tempVal = threadCompareExchange(threadNum, tempVal, &memRecycleStore[1], (INT64)tempPtr, 0);
		//в случае увеличения цепочки параллельным потоком процедура делает еще круг и повторяет встраивание
		if (tempVal != 0)
			goto makeChain;
	}

finish:
//индексирование количества фрагментированных отрезков для статистики
memFragmentCount[threadNum]++;
	return;
}


INT64 memPtrCheck(INT64 *ptr)	//процедура проверки адреса на соответствие диапазону памяти кучи
{
	if ((UINT64)ptr > (UINT64)baseMemStore && (UINT64)ptr <= (UINT64)*baseMemStore)
		return *ptr;
	return &pInvalidAddress;
}


Многопоточное тестирование
Чтобы выяснить как менеджер ведет себя в реальных условиях запустим цикл из 10000 шагов с выделением отрезков памяти случайного размера, генерируемого функцией rand(). Напишем процедуру timeTestSub() и запустим в восьми потоках одновременно. Точное время выполнения попробуем замерить методами QueryPerformanceCounter/Frequency. Дабы быть уверенным, что потоки честным образом выполнялись одновременно используем счетчики пересечений, фиксирующие количество перезапусков threadExchange-процедур. А чтобы наблюдать сложность структуры и фрагментированность кучи используем счетчик цепи, показывающий количество фрагментов, находящихся в отработке.

LARGE_INTEGER timerFrequency;
double timeResult[8];
double timeSum = 0;
INT64 testPtr[8][100];
HANDLE ghEvents[8];
HANDLE hHeap;
INT64 testStpr = 0;
INT64 testVal = 0;
INT64 repeatCounter = 0;
INT64 fragmentCounter = 0;
INT64 timeTestFailure = 0;

void timeTestSub(INT64 threadNum)
{
	INT64 switcher = 1;
	LARGE_INTEGER timerStart;
	LARGE_INTEGER timerStop;
	QueryPerformanceCounter(&timerStart);

	for (int i = 0; i < 100; i++) {
		for (int i2 = 0; i2 < 100; i2++) {
			if (switcher > 0) {
				testPtr[threadNum][i2] = memAlloc(threadNum, rand(), 0);
				//testPtr[threadNum][i2] = malloc(rand());	//путем переключения строк тестируем каждый метод
				//testPtr[threadNum][i2] = (INT64)HeapAlloc(hHeap, 0, rand());
			}
			else {
				memRecycle(threadNum, testPtr[threadNum][i2], 0);
				//free(testPtr[threadNum][i2]);
				//HeapFree(hHeap, 0, (LPVOID)testPtr[threadNum][i2]);
			}
		}
		switcher *= -1;
	}

	QueryPerformanceCounter(&timerStop);
	timeResult[threadNum] = timerStop.QuadPart - timerStart.QuadPart;
	timeResult[threadNum] = timeResult[threadNum] / timerFrequency.QuadPart;
	SetEvent(ghEvents[threadNum]);
}

int main()
{
	hHeap = HeapCreate(0, 1024 * 256 * 4096, 0);
	QueryPerformanceFrequency(&timerFrequency);
	for (int i = 0; i < 8; i++)
		ghEvents[i] = CreateEvent(NULL, FALSE, FALSE, NULL);
	for (int i = 0; i < 8; i++) {
		_beginthreadex(0, 0, timeTestSub, (void*)i, 0, 0);
	}
	WaitForMultipleObjects(8, ghEvents, TRUE, 10000);
	for (int i = 0; i < 8; i++) {
		if (timeResult[i] == 0)
			timeTestFailure += 1;			//ошибка! один из потоков не завершен.
		timeSum += timeResult[i];
		repeatCounter += threadRepeatCounter[i];
		fragmentCounter += memFragmentCount[i];
	}
}


Комментарии (72)


  1. slonopotamus
    03.03.2016 20:56
    +9

    Чем оно лучше malloc/free из libc, tcmalloc, nedmalloc, hoard, jemalloc и остальных? Чем хуже и так понятно.


    1. Tatuin
      03.03.2016 21:17
      -2

      Посмотрим что на счет кучи говорит нам википедия: "Информация о свободных и занятых областях кучи может храниться в списках различных форматов. От выбранного формата списка напрямую зависит производительность функций, подобных malloc() и free(), так как большую часть времени эти функции тратят на поиск по списку подходящих областей." и т.д.
      Есть подозрение, что википедия пытается поведать о существовании разницы в особенностях реализации списков указателей и поиска свободных областей. Однако существует мнение, что некоторые люди, способные увидеть другой смысл в этом определении.


      1. stack_trace
        03.03.2016 23:49
        +1

        Что разница есть и так всем понятно. Вопрос стоял о том, в чём эта разница заключается. Ну и вообще, интересна область применения.


        1. Tatuin
          05.03.2016 11:26

          Текст дополнен, добавлены разъяснения и многопоточные тесты с выделением отрезком случайных размеров.


      1. babylon
        04.03.2016 00:41

        Татя, отличная штукенция. Что будет происходить условно при array.pop[obj]? Как поведет себя менеджер? Что он заметит или не заметит?


  1. Godless
    03.03.2016 21:36
    +3

    мне кажется заслуживает внимания, но! я тоже очень люблю и уважаю асм, думаю было бы круто оформить эти куски сишным кодом. Ведь при должном аккуратном коде он превратится в ни чуть не худший асм. Проблемные инструкции можно и вынести, предварительно подумав о других платформах окромя X86_64. красиво оформить в виде либы выкласть какой-нить гитсвнхг.


    1. arabesc
      04.03.2016 05:31

      Как-то так:

      int cnsBit(INT64* stopper, INT64 threadNum)
      {
          if (threadNum < 0)
              return (0 != InterlockedCompareExchange64(stopper, threadNum, 0)) ? 0 : 1;
      
          if (*stopper != 0)
              return 0;
      
          ((INT8*)stopper)[threadNum] = 1;
      
          if (*stopper  !=  1 << 8*threadNum)
          {
              ((INT8*)stopper)[threadNum] = 0;
              return 0;
          }
      
          return 1;
      }


    1. Tatuin
      05.03.2016 11:30
      +1

      К сожалению язык C сильно ограничен в тех возможностях, в которых силен ASM. На C слишком сложно бывает реализовать неразрывные в плане параллельного пересечения логические последовательности.


      1. beeruser
        05.03.2016 18:45

        >> На C слишком сложно бывает реализовать неразрывные в плане параллельного пересечения логические последовательности.

        Логические последовательности? Процессор выполняет их в другом порядке всё равно.
        Если вы о последовательности операций с памятью, для этого существует ключевое слово volatile.
        Почему вы его не используете?

        Что вообще значит «неразрывные»? В любой момент может произойти переключение контекста, которое разорвёт вашу операцию.


        1. arabesc
          05.03.2016 18:58

          Если вы о последовательности операций с памятью, для этого существует ключевое слово volatile.
          Это инструкция компилятору, но не процессору. Семантика Release/Acquire для volatile специфична для компилятора, например, MSVC в режиме генерации кода для не ARM, и в стандарте не оговорена. Опять же, модель памяти C/C++ до недавнего времени не учитывала многопоточность, поэтому, полезность volatile здесь стремится к нулю.


        1. Tatuin
          05.03.2016 19:38

          Речь идет лишь о логической последовательности, которая может быть нарушена при интерпретации с одного языка на другой. Поэтому если есть желание, чтобы ступенчатый алгоритм не преподносил неожиданностей, лучше писать его на ассемеблере.


      1. Godless
        09.03.2016 09:01
        +1

        Вы никогда не смотрели во что превращается сишный код в отладчике? Попробуйте. В нежных руках Си очень аккуратно превращается в асм. Бесспорно есть разница, но посмотрите для себя велика ли она. Почувствуйте как локальные переменные ложатся на стек, как тонны указателей аккуратно перемещаются простыми mov, объекты копируются целыми push push call, if(){} превращается в аккуратный cmp+jnz, и т.д.

        Я Вас понимаю, что repnz scasb эффективнее for() if(), но в сегодняшних реалиях, действительно ли несколько микросекунд важнее переносимости?

        ЗЫ: кстати оптимизатор не так глуп как кажется. Иногда он очень грамотно сворачивает целые блоки, но да, постоянно на него надеяться не стоит, ваш Кэп.


  1. Chumicheff
    03.03.2016 23:44
    -3

    Код — просто загляденье и радует глаз. Комментарии по делу. "Ляпота!" одним словом)


  1. Kolyuchkin
    04.03.2016 00:03

    Автору, большое спасибо, за проделанную работу, но… У меня вопрос: «Сможет ли Ваш менеджер кучи „подружиться“ с DMA, например? DMA ведь плевать на правильное выделение-освобождение памяти...»


    1. Tatuin
      04.03.2016 11:42

      что имеется ввиду под DMA — direct memory access или dynamic memory? В каком контексте с DMA нужно подружиться ?


      1. Kolyuchkin
        05.03.2016 16:50

        DMA — direct memory access. Можно ли Вашим менеджером безопасно выделить память из кучи для взаимодействия с устройством через DMA? Я спрашиваю в том контексте, не поломается ли куча из-за «неадекватного» (в процессе отладки самого устройства и драйверов для него) поведения устройства?


        1. Tatuin
          06.03.2016 11:30

          В данной реализации скорее нет, поскольку служебные данные отрезков хранятся непосредственно в одном диапазоне рядом с ними. При случайном затирании ломается вся структура. Однако ничто не мешает писать служебные данные в отдельный диапазон, например туда где размещается цепочка блоков отработки. Для этого нужно будет подвергнуть алгоритм незначительной модификации, без смены основной парадигмы.


          1. Kolyuchkin
            07.03.2016 01:27

            Спасибо за ответ) Я предполагал такое решение, хотя оно потребует дополнительного расходования памяти.


  1. semenyakinVS
    04.03.2016 00:18
    +1

    Автору за проделанную работу спасибо. Выглядит масштабно, код оформлен красиво… Да и ещё и потокобезопасность организована. В общем, видно что автор старался.

    Теперь о расстроившем… Просьба как-нибудь тезисно выделять в статье преимущества (и недостатки, если таковые имеются) своего решения относительно существующих. По пунктам. Если возможны какие-то численные метрики для оценки решения — очень круто было бы видеть графики. Я ни в коем случае не хочу никого обидеть, но иногда отпадает всякое желание читать большую статью когда в ней не выделено — хотя бы в начале — о чём она и с какой целью её писал автор.

    Спасибо за понимание и ещё раз спасибо за статью!

    П.С.: Преимущества в стиле "написал чтобы научиться" или "написал из любви к искусству" тоже катят и часто вполне заслуживают статьи — если по решению можно действительно научить или оно действительно красиво.


    1. Tatuin
      04.03.2016 11:33
      +1

      текст дополнен, добавлены разъяснения и тесты.


      1. semenyakinVS
        04.03.2016 13:26

        Ага, спасибо. Результаты действительно впечатляющие — оптимизация больше чем в сто раз!

        По поводу форматирования статьи… Как мне кажется (сужу по себе), люди прежде чем читать внимательно, хотят обзорно от начала до конца просмотреть что здесь будет. Попробуйте с позиции такого читателя просмотреть текст — вам станет ясно что я имею в виду.

        По предложениям ещё. Если будет возможность, на мой взгляд, было круто, если бы вы ещё выделили как-нибудь жирным шрифтом результат (а ещё лучше, в сравнительную табличку положили бы его). Плюс разъяснения о том, почему хотелось такое сделать тоже как-то в отдельный блок в начале вынесите. Что-то вроде такого, по пунктам:

        1. Существующие аллокаторы плохо работают с многопоточными вызовами (ссылка на пруф).
        2. Контроль шагов вызова.
          и.т.д.

        И, если в идеале, тезисно в конце опишите (так же по пунктам) почему так круто вышло. Ваша статья от этого намного лучше станет, гарантирую. Главное, текст можно не менять, только форматирование сделать лучше, структурнее — и всё заиграет новыми красками.

        Ещё раз спасибо за понимание. Я ни в коем случае не хочу придираться из самоцели придраться, сама статья очень крутая (я такую вряд ли смог бы написать, во всяком случае...)!


  1. degs
    04.03.2016 00:54
    +2

    У вас описывается одновременно и алгоритм и его реализация, получается ни то ни се, очень путано.
    Опишите лучше детально алгоритм (и да, присоединяюсь к остальным, сравните с существующими), а уже потом можно и о реализации поговорить.


    1. Tatuin
      04.03.2016 11:30

      текст дополнен, добавлены разъяснения и тесты.


  1. mejedi
    04.03.2016 01:47
    +7

    Все это крайне сомнительно. На современных процессорах (>486) блокировка шины используется только в исключительных ситуациях (невыравненный доступ, либо диапазон адресов с отключенным кешем, в userspace программах не встречается).

    Implementing Scalable Atomic Locks for Multi-Core Intel® EM64T and IA32 Architectures

    In the days of Intel 486 processors, the lock prefix used to assert a lock on the bus along with a large hit in performance. Starting with the Intel Pentium Pro architecture, the bus lock is transformed into a cache lock. A lock will still be asserted on the bus in the most modern architectures if the lock resides in uncacheable memory or if the lock extends beyond a cache line boundary splitting cache lines. Both of these scenarios are unlikely, so most lock prefixes will be transformed into a cache lock which is much less expensive.

    Синхронизация кешей в x86 multicore системах основана на bus snooping (прослушивание транзакций с памятью других ядер) и MOESI протоколе.

    В предложенном алгоритме, происходит изменение байта по смещению 0..7 в куске памяти, куда пишут несколько потоков конкурентно. Один байт в память записать нельзя, гранулярность обмена с памятью — 32 байта также как и гранулярность блокировок кеша, и значит MOESI протокол все равно задействован. То есть производительность не лучше, чем в случае прямого использования атомарных инструкций.

    Также стоит отметить, что для проверки состояния "блокировок" используется чтение после записи. Работает это потому, что x86 не переупорядочивает записи (с разных ядер) между собой, и записи с чтениями. На других архитектурах это не так, например на Arm это портировать нельзя, хотя аналоги всех инструкций присутствуют.

    Наконец, хочу отметить, что начиная с Haswell, появилась поддержка software transactional memory. Это исключительно крутая штука! Проблема атомарных инструкций — в том, что атомарно можно поменять всего 8 байт по выравненному адресу. Например, во многих lock-free структурах нужно атомарно изменить сразу несколько указателей, причем в памяти они лежат совсем не подряд.

    Собственно идея software transactional memory проста — объединение нескольких операций с памятью в транзакцию на уровне железа. Транзакция откатывается, если возникают конфликты с другими потоками. При этом все изменения откатываются и происходит переход на метку, заданную при старте транзакции. Есть ограничения на число операций (не помню сколько), и page fault приводит к безусловному откату транзакции.

    Мне очень хотелось написать, что автор застрял в эре 486 и игнорирует последние 25 лет прогресса в области процессоростроения, но вместо этого, я попрошу предоставить бенчмарк, который подтвердит преимущество предложенного аналога атомарных инструкций.


    1. arabesc
      04.03.2016 04:18

      Наконец, хочу отметить, что начиная с Haswell, появилась поддержка software transactional memory.
      Если речь про TSX, то оно оказалось бажное, отключено в обновлениях микрокода.


    1. arabesc
      04.03.2016 05:28

      Также стоит отметить, что для проверки состояния «блокировок» используется чтение после записи. Работает это потому, что x86 не переупорядочивает записи (с разных ядер) между собой, и записи с чтениями.
      Справедливо для записи/чтения по одинаковым адресам.
      Intel® 64 and IA-32 Architectures Software Developer’s Manual
      8.2.3.4 Loads May Be Reordered with Earlier Stores to Different Locations
      The Intel-64 memory-ordering model allows a load to be reordered with an earlier store to a different location.
      However, loads are not reordered with stores to the same location.
      Если адрес INT64 *stopper выровнен, а код выполняется в x64, может, пронесёт, иначе — факир был пьян и фокус неудался.
      Кроме того, нельзя забывать про модель памяти языка, которая, если однопоточная, вполне может допускать оптимизации по переупорядочиванию инструкций компилятором, лишь бы поведение не менялось с точки зрения выполнения однопоточной программы.

      Короче говоря, от автора требуется доработка решения до более менее переносимого или существенные оговорки и огораживания по условиям применения кода.


      1. beeruser
        05.03.2016 17:07

        >> Справедливо для записи/чтения по одинаковым адресам.
        >> Loads May Be Reordered with Earlier Stores to Different Locations
        То что вы привели называется memory disambiguation и это техника спекулятивная — она не меняет логику работы с точки зрения программы.
        Порядок _записи_ в память не меняется от этого и модель strict-ordering не нарушается.
        В отличии от weak-ordered архитектур (все RISC), которые могут писать данные в память в произвольном порядке.


        1. arabesc
          05.03.2016 17:46

          это техника спекулятивная — она не меняет логику работы с точки зрения программы
          Нужно уточнить, что не меняет логику работы с точки зрения однопоточной программы.
          Проблема для мультитреда на примере алгоритма Петерсона рассматривается, например, здесь: Who ordered memory fences on an x86?


    1. Tatuin
      05.03.2016 12:41

      О когерентности кэша к сожалению знаю слишком мало. Дело в том, что при попытке одновременной перезаписи разделяемых переменных параллельными потоками у меня происходит затирание данных. Как в этом случае задействовать функционал когерентности мне неизвестно. Вследствие чего приходится использовать различные методы разделения доступа. Блокировка шины — один из них (хотя методов безусловно больше). Дело в том что блокировка шины работает немного (совсем чуть чуть) медленее, нежели мой метод, использующий строгую неразрывную логическую последовательность написанную на ASM. Конечно это не атомарная операция, однако алгоритм не позволяет вклиниться одновременно более чем одному потоку в запись разделяемых данных, отправляя в ожидание и перезапуск все пересекающиеся потоки. Несмотря на то, что на перезапуски безусловно тратится драгоценное время, это допущение не является недостатком, поскольку все равно работает быстрее InterlockedExchange-методов в одинаковых условиях. Тесты я приводил интересующемуся человеку в комментариях ниже.
      Если вы знаете о существовании более производительных методов, либо возможности задействования встроенных аппаратных средств для защиты разделяемых данных, безусловно интересно было бы послушать.


      1. mejedi
        06.03.2016 13:10
        +1

        На счет когерентности кеша. Призываю задуматься вот о чем. Процессор работает с памятью не напрямую, это медленно, а через кеш. Если запрошенный адрес присутствует в кеше, данные будут прочитаны оттуда. То же самое с записью — изменения происходят сначала в кеше и к памяти "применяются" не сразу.

        Допустим есть 8 ядер, каждое со своим кешем. Это 8 независимых кешей. Возможна такая ситуация. Ядро #1 пишет по адресу $100 значение 1, а потом ядро #2 читает по тому же адресу значение 42, а вовсе не 1 — потому что ранее у ядра #2 закешировалось именно это значение, и запись ядра #1 никак не повлияла на кеш ядра #2.

        Собственно "когерентность", или согласованность — это обещание, что такие ситуации не случаются, и все N кешей согласованны между собой, базовое свойство архитектуры, включено всегда. Реализовано это может быть по-разному, например через MOESI протокол. Базовая идея — в любой момент времени адрес X может быть либо не входит ни в один кеш, либо закеширован 1 или более ядрами в shared-режиме, либо закеширован ровно 1 ядром в exclusive-режиме. В shared режиме разрешены только чтения, в exclusive — еще и запись, за детальным описанием — в википедию.

        Допустим, N ядер конкурентно модифицируют и читаю один и тот же адрес (помним, что гранулярность кеша — около 32 байт). Чтобы модифицировать значение по адресу, нужно перевести свою ячейку кеша в E режим, при этом, она будет удалена из кешей других ядер. Потом, тоже самое делает другое ядро, и теперь наше чтение тормозит — мы потеряли линейку кеша.

        Этот пинг-понг — не то, чтобы очень эффективный. Аппаратный CAS (compare and swap), он за одну итерацию MESI протокола отработает, даже если одновременно конкурируют over-дофига потоков.


  1. DustCn
    04.03.2016 02:51
    +2

    Шустрость не показана…


    1. Tatuin
      04.03.2016 11:28

      текст дополнен, добавлены разъяснения и тесты.


      1. arabesc
        04.03.2016 13:56
        +2

        Как изменится производительность, если самописные клоны interlocked функций заменить на стандартные?
        К слову, когда речь идет о миллисекундах, GetTickCount() не самый удачный инструмент измерения времени, лучше использовать, например, QueryPerformanceFrequency/Counter.


        1. Tatuin
          05.03.2016 12:06

          Наверно проще будет проверить производительность непосредственно Interlocked в сравнении с клонами на основе стопоров, не задействуя при этом аллоцирование памяти. Приведу вам результаты многопоточных тестов, добавленных в конце статьи. Запустим тестовую процедуру в восьми потоках одновременно. Точное время выполнения попробуем замерить методами QueryPerformanceCounter/Frequency. Разница будет лишь в том, что вместо цикла malloc/free используется такой цикл:

          for (int i = 0; i < 1000000; i++) {
              InterlockedExchangeAdd64(&testVal, 1);
              //threadExchangeAdd(threadNum, &testVal, &testStpr, 1); //путем переключения строк тестируем каждый метод
          }

          результаты таковы:
          threadExchangeAdd
          время: кол-во перезапусков:
          0.73081663904651450 146
          0.78443012725229555 186
          0.80859421647281526 174
          0.85626781261663143 208
          0.82452533427415353 142
          средние значения:
          0,800926825932482 171,2

          InterlockedExchangeAdd64
          время: кол-во перезапусков:
          1.0145192069211530 0
          1.0156000842383457 0
          1.2575424109109752 0
          1.1261166428909948 0
          1.0498880847990273 0
          средние значения:
          1,092733285952099 0


          1. Tatuin
            05.03.2016 12:12

            также важно добавить что в процедуре threadExchangeAdd ожидание в виде Sleep(1) заменено на цикл вида:

            INT64 threadSleepTimer = 1000;
            for (int i = 0; i < threadSleepTimer * (threadNum + 1); i++)
                Sleep(0);

            Поскольку ожидание при перезапуске длиной в 1 миллисекунду это огромный интервал времени.


            1. arabesc
              05.03.2016 15:10

              Это не совсем честно, т.к. одна из множества конкурирующих interlocked функций гарантированно захватит свободный стопор, а вот конкурирующие за свободный стопор клоны могут все скопом уйти в sleep(0/1), оставив стопор свободным.


          1. arabesc
            05.03.2016 15:14

            Разница порядка ~ 30%, для масштаба нужно типичное время одной аллокации/деаллокации, может, на его фоне это ни о чем. Тем более, что все равно в текущем виде реализация клонов не обеспечивает надежной работы.


          1. mejedi
            06.03.2016 13:32

            Сколько потоков способно выполняться одновременно в тестовой системе, есть ли там 8 честных ядер?

            Возможно, люди захотят повторить этот тест, где исходники?


            1. Tatuin
              06.03.2016 13:47

              Тест в конце статьи. Разница лишь в том, что вместо цикла malloc/free используется такой цикл:

              for (int i = 0; i < 1000000; i++) {
                  InterlockedExchangeAdd64(&testVal, 1);
                  //threadExchangeAdd(threadNum, &testVal, &testStpr, 1); //путем переключения строк тестируем каждый метод
              }

              Тесты проводились на Intel core i5-3330 (4 ядра, Hyper-threading). Однако при проверке на планшете acer w700 (corei3 2 ядра), замечено что Interlocked на нем работает немного быстрее, нежели предложенный мной метод. На 4-х ядерном же наоборот. Наблюдается ситуация описанная на вики в статье Программная_транзакционная_память:
              "Однако на практике SТМ-системы проигрывают в производительности мелкомодульным системам, основанным на блокировках, на небольшом количестве процессоров (от 1 до 4 в зависимости от приложения). Это связано в первую очередь с накладными расходами на поддержку лога и с затрачиваемым временем на совершение транзакций. Но даже в этом случае прозводительность отличается не более, чем в 2 раза.[3] Сторонники STM считают, что такие потери оправданы концептуальными преимуществами SТМ."


              1. mejedi
                06.03.2016 14:28

                Тест в конце статьи.

                Ок, а нельзя это как то более удобно оформить? Архив или (лучше) репозиторий на гитхабе.


  1. chabapok
    04.03.2016 15:52

    Если нормальных тестов нет, то скорей всего что-то где-то сломано. Причем, написание нормальных тестов гораздо сложней и масштабней, обычно. Тем более, многопоточность и какой-то новый хитрый алгоритм. Такое без нормальных тестов считается нерабочим заведомо.


    1. arabesc
      04.03.2016 16:41

      Тесты не панацея, они не могут доказать отсутствие ошибок в недетерминированной системе, каковой является многопоточный алгоритм с гонками. Если только не опускаться на уровень эмуляции процессора с комбинаторным перебором возможных состояний, что крайне трудоёмко. Обычный анализ кода здесь выглядит более эффективным решением.


      1. chabapok
        04.03.2016 17:13
        +1

        Зато могут доказать наличие.


        1. arabesc
          05.03.2016 15:41

          А могут и не доказать, что тогда, пускать в продакшн?
          Проблему в cnsBit нашел за несколько минут через анализ кода. Условия возникновения проблемы довольно специфические, если про них не знать и не затачивать под них тест, то алгоритм может работать довольно долгое время без проблем, тест ничего не покажет, кроме того, что алгоритм имеет рабочие инварианты. Только кому нужен стохастический менеджер памяти, который в редких случаях превращает данные в тыкву?


    1. Tatuin
      05.03.2016 11:14

      добавлены многопоточные тесты с выделением отрезком случайных размеров.


      1. chabapok
        05.03.2016 13:23

        Так просто? Нет, конечно. Все гораздо сложней. Во-первых, это тесты производительности, а речь о том, корректно ли оно работает. Хотя производительность тоже интересна, но быстрый неработающий алгоритм никому не нужен.

        А именно надо проверять, что:

        • один и тот же блок памяти ни при каких обстоятельствах не может быть отдан двум запрашивающим клиентам.
        • отданный аллокатором блок должен быть всегда в пределах пула.
        • нет навсегда потерянных блоков, или если есть, то не больше граничного количества.
          Так же желательно, чтобы двойной free одного и того же блока не вызывал багов. Хотя тут возможны шаги вправо-влево.

        При этом, самое сложное кроется в условии "ни при каких обстоятельствах". Тесты должны покрывать в идеале все возможные случаи и как-то ловить плохие ситуации. Это большая задача, навскидку я не знаю как ее нормально решить. Возможно, как-то записывать лог каждого потока, а после отработки теста анализировать были ли пересечения.

        При этом, надо оттестить случаи, когда при фрагментированной куче одновременно несколько потоков просят выделить память, несколько потоков просят ее освободить, причем часть освобождаемых блоков была выделена в других потоках. Гонять тест надо долго, и желательно чтобы каждый раз относительный сдвиг исполнения между потоками был разным, и сегментация была разной.

        Кстати, ваш тест даже не гарантирует, что потоки исполнялись параллельно. То, что вы запустили несколько потоков — это еще не значит, что их выполнение было одновременным. Надо как-то мерять степень "одновременности" исполнения. У процессора есть хардварные счетчики, может как-то через них. И еще, хороший аллокатор учитывает и минимизирует false sharing.

        Так что, нормально написать тесты — задача больше и сложней, чем кажется навскидку.

        Возможно, большое время аллокации через malloc связано с тем, что ОС должна еще и мапить физическую память в логическую, а аллокатор в юзер-спейсе уже этого не делает. Или что-нибудь еще. Там куча разных эффектов при работе с памятью бывает. Например, транспонирование матрицы 512х512 в ~3 раза дольше чем 513х513, если верить stackoverflow. Почему? Я не разбирался. Но суть в том, что если есть существенный провал производительности, то это обычно или баг (в том числе баг в более быстрой версии), или какой-то хитрый эффект, про который мы не догадываемся. И пока природа тормозов не ясна, что-то однозначно утверждать — рано.


        1. grossws
          07.03.2016 14:39

          Дополню:

          Возможно, как-то записывать лог каждого потока, а после отработки теста анализировать были ли пересечения.

          А баг в этот момент вполне может исчезнуть, так что тоже не гарантирует корректность.

          Возможно, большое время аллокации через malloc связано с тем, что ОС должна еще и мапить физическую память в логическую, а аллокатор в юзер-спейсе уже этого не делает.

          Если брать linux, то malloc — тоже userspace аллокатор. Просто при нехватки свободной памяти он дёргает sbrk или mmap, чтобы получить больше, и это, естественно, дорогая операция.


  1. ion2
    04.03.2016 16:26

    У вас в тесте размер выделяемого блока линейно растёт с приращением в 1 килобайт. Интересно посмотреть результаты с произвольными размерами блоков. И самое главное — тест однопоточный. Не известно какие результаты будут при большем числе потоков.


    1. Tatuin
      05.03.2016 11:15

      добавлены многопоточные тесты с выделением отрезком случайных размеров.


      1. arabesc
        05.03.2016 22:02

        Вместо malloc / free интересно увидеть результаты сравнения с HeapCreate(0, 1024 * 256 * 4096, 0) / HeapAlloc / HeapFree.
        Возможно, тоже самое сделает опция /HEAP для кучи процесса по умолчанию, тогда можно и malloc / free.


  1. arabesc
    04.03.2016 18:43
    +4

    cnsBit в предложенном виде не исключает состояние гонки между положительным и отрицательным threadNum.
    Особенно прекрасна (конкурентная) ситуация:
    cnsBit(stopper, 0x00)
    cnsBit(stopper, 0x01)
    cnsBit(stopper, 0x02)
    cnsBit(stopper, 0x03)
    cnsBit(stopper, 0x04)
    cnsBit(stopper, 0x05)
    cnsBit(stopper, 0x06)
    cnsBit(stopper, 0x07)
    cnsBit(stopper, <любое_отрицательное_число>)
    когда последний cnsBit захватит stopper, а предыдущие, успевшие увидеть stopper свободным, лок по частям отменят записью нуля на выходе, заходи, кто хочешь. cnsBit с положительным threadNum необязательно должно быть 8, достаточно покрыть ненулевые байты в отрицательном threadNum.

    Извините, но в топку такой велосипед.


    1. Tatuin
      04.03.2016 19:05

      Немного поясню последовательность действий. Двухступенчатая установка метки состоит из строго алгоритма. Логическая неразрывность подразумевает отработку типовой ситуации и исключение незапрограммированных действий.

      Нумерованный поток, входящий в пул:
      1) проверка стопора на нуль. только свободный стопор дает разрешение на дальнейшие действия.
      2) установка метки под смещением
      3) проверка стопора по маске в которой установлен только один бит с нужным смещением. наличие других битов приводит к отмене. собственный бит обнуляется. процедура возвращает нуль, что подразумевает ожидание и перезапуск.

      Ненумерованный поток:
      1) проверка номера потока. несоответствие переводит процедуру в блок InterlockedCompareExchange.
      2) инструкция cmpxchg означает не что иное, как InterlockedCompareExchange. методом блокировки шины в стопор записывается отрицательный номер потока (заполняя все биты диапазона) с предварительной проверкой на нуль. если стопор уже заполнен другими битами, cmpxchg вернет False и процедура cnsBit вернет нуль. Ожидание, перезапуск.


      1. arabesc
        04.03.2016 19:32
        +3

        Нумерованный поток 0x07 проверяет стопор, видит, что он свободен, переходит к продолжению лочки, но в этот момент засыпает.
        Одновременно ненумерованный поток, например, 0x8100000000000000 тоже видит пока еще свободный стопор и атомарно захватывает его.
        Просыпается нумерованный поток, тупо перезаписывает 0x81 на 0x01, успешно тестирует свой битик и считает, что лок его.
        Пусть ненумерованный поток имеет вид 0x85000000000000C6, а конкурируют с ним 0x00 и 0x07. Тогда ни 0x00 ни 0x07 сразу лок не захватят, но обнулят первый и последний байт стопора (ветка "@end:; блок несоответствия"), сделав его преждевременно свободным. Дальше по индукции.

        Двухступенчатая установка метки состоит из строго алгоритма.
        Алгоритм неатомарный и пересекается по записи данных с другими потоками, отсюда рейс и неработоспособность в общем случае.


        1. Tatuin
          04.03.2016 20:38

          Чуть подробнее с иллюстрацией промежуточных значений. Пусть в гонке потоков участвуют поток №(7) и поток №(-1). Битовым значением INT64 threadNum равное (-1) будет 0xffff ffff ffff ffff. Т.е. поле заполненное битами. Представим две ситуации, что поток(7) немного запаздывает, но проходит первый шаг проверки. В это время поток(-1) при помощи блокировки шины заполняет стопор битами по всей длине с его предварительной проверкой на нуль. Поскольку поток(7) опаздывает и еще не поставил свой бит, поток(-1) получает эксклюзивный доступ к данным и начинает работу с ними. Просыпаетс поток(7), ставит свой бит под нужным смещением, затирая биты и так находящиеся там. Но на втором этапе проверки по маске выясняется, что стопор полностью забит и не соответствует маске с единичным битом под нужным смещением. Далее выход, перезапуск, до тех пор пока поток(-1) не выполнит свои действия и не обнулит стопор.

          Другой вариант поток(7) торопится и почти одновременно проходит первую ступень с потоком(-1). Ставит свой бит под нужным смещением (в стопоре хранится значение 0x0100 0000 0000 0000) и дальше либо успевает проверить соответствие маски до блокировки шины, либо нет. Поток(-1) с помощью инструкции cmpxchg пытается заполнить стопор битами, с предварительной проверкой на нуль. Поскольку стопор уже не равен нулю инструкция cmpxchg возвращает False, стопор не заполняется битами и поток(-1) возвращает нуль. Ожидание, перезапуск. Успевший поток(7) работает с разделяемыми данными, не успевший поток(7) перезапускается вместе с остальными, предварительно обнуляя свой бит.


          1. arabesc
            04.03.2016 20:45

            Пусть в гонке потоков участвуют поток №(7) и поток №(-1).
            Почему выбираете удобные значения, а не те, которые иллюстрируют проблему?
            Хотите -1, ok, но тогда пусть с этим потоком конкурируют другие восемь, от 0x00 до 0x07, а за ними любой другой. Вот теперь рассматривайте первый вариант.


            1. Tatuin
              05.03.2016 12:28

              Дело в том, что для пул потоков обычно создается для исполнения большинства процедур в программе, а вне пула работают только Main-поток, при необходимости поток прослушки сетевого интерфейса, также какой-нибудь низкоприоритетный планировщик и т.п.
              При работе вне пула имеет смысл сортировать такие потоки как: -1, -2, -3, и т.д. Мне кажется, что не имеет смысла рассматривать гипотетические ситуации с неверными исходными данными, когда главным условием использования любого метода есть точная и правильная настройка исходных данных. В самой сути использования позиционных стопоров нет противоречий поскольку это простая процедура, как раз два три. Главным условием, как указано в листинге самой процедуры, является установка бита в метку под смещением для потока из пула и полное заполнение стопора битами для потока вне пула.


              1. arabesc
                05.03.2016 15:30
                +1

                Мне кажется, что не имеет смысла рассматривать гипотетические ситуации с неверными исходными данными
                Во-первых, даже на так называемых верных исходных данных могут возникать ошибочные ситуации, выше написано как. Подозрительно, что отказываетесь это признать и исправить, вводя читателей в заблуждение и навязывая ошибочное решение. Уж не работает ли этот код в каком-нибудь ответственном месте?
                Во-вторых, решение предлагается более менее универсальное, какие данные возникнут у пользователя, не предугадать, у одного ненумерованные потоки такие же нумерованные, только отрицательные, у другого это будет сложносочиненный хендл с установленным старшим битом, где, надо заметить, читать обратно такой хендл нельзя, т.к. cnsBit может его испортить.
                Любые ограничения по поводу правильности входных данных хорошо бы явно описать, хотя, в данном случае погоды это не делает.

                В самой сути использования позиционных стопоров нет противоречий поскольку это простая процедура, как раз два три.
                Претензия не к стопорам, а к способу их выставления, который не гарантирует отсутствие гонок с последующей порчей данных.


              1. arabesc
                05.03.2016 16:15

                Вот синтетический пример, позволяющий воспроизвести ошибку в cnsBit:

                int cnsBit(INT64* stopper, INT64 threadNum)
                {
                    if (threadNum < 0)
                    {
                        Sleep(50); // force context switch
                        return (0 != InterlockedCompareExchange64(stopper, threadNum, 0)) ? 0 : 1;
                    }
                
                    if (*stopper != 0)
                        return 0;
                
                    Sleep(100); // force context switch
                
                    ((INT8*)stopper)[threadNum] = 1;
                
                    if (*stopper  !=  1 << 8*threadNum)
                    {
                        ((INT8*)stopper)[threadNum] = 0;
                        return 0;
                    }
                
                    return 1;
                }

                Sleep(...) здесь эмулируют точки останова в коде при переключении потоков. Конкретное время задержки может потребоваться отрегулировать. thread 0 должен проснуться раньше остальных, thread 1..8 должны успеть увидеть стопор свободным, но не должны успеть его модифицировать перед thread 0.

                Далее, пусть есть серия следующих одновременных (из разных потоков) вызовов cnsBit:

                INT64 stopper = 0;
                
                // thread 0
                int result = cnsBit(&stopper, -1);
                assert(1 == result);
                
                // thread 1
                int result = cnsBit(&stopper, 0);
                assert(0 == result);
                
                // thread 2
                int result = cnsBit(&stopper, 1);
                assert(0 == result);
                
                // thread 3
                int result = cnsBit(&stopper, 2);
                assert(0 == result);
                
                // thread 4
                int result = cnsBit(&stopper, 3);
                assert(0 == result);
                
                // thread 5
                int result = cnsBit(&stopper, 4);
                assert(0 == result);
                
                // thread 6
                int result = cnsBit(&stopper, 5);
                assert(0 == result);
                
                // thread 7
                int result = cnsBit(&stopper, 6);
                assert(0 == result);
                
                // thread 8
                int result = cnsBit(&stopper, 7);
                assert(0 == result);
                
                // wait for threads 0..8 completion
                
                // thread 0
                assert(0 != stopper);

                Ожидание в последнем ассерте не выполнится, стопор будет свободным, хотя должен быть захвачен 0-м потоком.


                1. Tatuin
                  06.03.2016 04:54
                  -2

                  Попробуйте перед обнулением собственной метки потока проверять переполнение битами. Должно помочь в данной ситуации. например:

                      if (*stopper  !=  1 << 8*threadNum)
                      {
                          if (*stopper != (-1)) ((INT8*)stopper)[threadNum] = 0;
                          return 0;
                      }


                  1. arabesc
                    06.03.2016 07:51
                    +3

                    Как это должно работать по-вашему?
                    Что-то очень странное предлагаете. Вместо подтверждения и исправления ошибки ищете костыли.

                    // (1)
                    if (*stopper != (-1))
                        // (2)
                        ((INT8*)stopper)[threadNum] = 0;

                    Понимаете, что в точках (1) и (2) stopper может быть изменён из других потоков на что угодно, проверка ничего не даст?


                    1. Tatuin
                      06.03.2016 10:48
                      -2

                      Именно по этой причине, алгоритм подразумевающий строгое движение от одних исключающих условий к другим условиям лучше писать на ASM. Проведем тест и запустим одновременно 9 потоков с передачей им номеров от (-1) до 7. Для проверки пересечений и перезапуска threadExchange-функций будем итерировать счетчики перезапусков. Контрольным значением, сигнализирующим о затирании разделяемых данных сделаем переменную testVal. Именно ее будут последовательно индексировать в цикле запущенные параллельно потоки. Несоответствие общей величины циклов и значение testVal будет свидетельствовать о факте затирания.

                      цикл 10000 шагов: время = 0.064928848197125896; кол-во перезапусков при пересечении = 19; testVal = 90000;
                      цикл 100000 шагов: время = 0.11636663276946491; кол-во перезапусков при пересечении = 17; testVal = 900000;
                      цикл 1000000 шагов: время = 1.0332009992926836; кол-во перезапусков при пересечении = 184; testVal = 9000000;
                      цикл 10000000 шагов: время = 9.3464465265121142; кол-во перезапусков при пересечении = 1486; testVal = 90000000;
                      цикл 100000000 шагов: время = 93.325206283581792; кол-во перезапусков = 18563; testVal = 900000000;

                      Попробуем разобраться почему не происходит затирания и каждый поток работает с данными эксклюзивно. Для этого опять пошагово заглянем в диапазон стопора и проследим за битовыми состояниями меток в разных ситуациях. Рассмотрим запуск 9 потоков с присвоенными номерами от (-1) до 7. Пусть все потоки пройдут первую ступень одновременно и перейдут к редактированию своих байтов. Поток (-1) при этом с помощью блокировки шины гарантированно захватит стопор и заполнит его битами по всей длине, переходя к эксклюзивной работе с данными. Получим первое редактирование стопора, который обретет значение 0xffff ffff ffff ffff. Остальные потоки поставят биты в свои байты под смещением, затерев находившиеся там биты и не изменив общего значения никоим образом. Проверка по маске каждого из потоков провалится и они перейдут в блок отмены изменений. Далее усложним ситуацию и представим, что поток(-1) закончил работу с разделяемыми данными и приступил к освобождению стопора. Обнуление при этом происходит сегментно, стопор содержит промежуточное значение 0xffff ffff ffff 0000. В этой ситуации потоки из пула перед сбросом своего байта проверяют стопор на заполнение битами и получают не полностью заполненный диапазон. Это разрешает сбросить свой байт и вернуть неудачу. Если поток(-1) все еще работает с данными и стопор полностью заполнен битами, это запрещает сбрасывать свой байт. Каждый поток из пула возвращает неудачу без обнуления своего байта в стопоре, после чего следует ожидание и повторные попытки. Иными словами поток не из пула имеет безусловный приоритет. Когда редактирование стопора осуществляется потоками из пула, оно разрешено совместно и одновременно, но за исключением тех случаев, когда стопор занят из потока вне пула. При этом в ситуации когда поток(-1) освобождает стопор сегментно считается, что он уже не занят и разрешает нумерованным потокам редактировать байты в нем.
                      Может создаться впечатление, что если 8 потоков с нумерами от 0 до 7 одновременно поставят биты в свои байты, то это заблокирует стопор и не даст потокам сбросить свои метки. Однако это не так. При блокировке стопора потоком (-1) вне пула значение стопора = 0xffff ffff ffff ffff, при одновременной блокировке стопора 8-ю потоками из пула значение стопора = 0x0101 0101 0101 0101.
                      ps: К сожалению на C реализовать правильную работу данной процедуры не получится именно по той причине, что это подразумевает возможность возникновения при интерпретации дополнительных условностей. Работать по замыслу она будет только на ASM.


                      1. arabesc
                        06.03.2016 16:48
                        +1

                        Остальные потоки поставят биты в свои байты под смещением, затерев находившиеся там биты и не изменив общего значения никоим образом. Проверка по маске каждого из потоков провалится и они перейдут в блок отмены изменений.

                        Напрасно ожидаете такое поведение, тихо изменив реализацию с:

                        mov byte ptr [rcx+r8],1 ;установка бита по нужному адресу
                        bts rdx,rax ;установка бита в маске по необходимому смещению

                        cmp qword ptr [rcx],rdx ;проверка метки по маске
                        jnz @end ;выход при несоответствии

                        @scess: ;блок соответствия
                        mov rax,1 ;собственный бит присутствует в диапазоне, сторонние биты отсутствуют
                        ret

                        @end: ;блок несоответствия
                        mov byte ptr [rcx+r8],0 ;удаление бита из метки

                        на:

                        mov bl, byte ptr[rcx+r8] ;сохранение значения под нужным смещением в регистр
                        bts bx,0 ;установка бита в регистр
                        mov byte ptr [rcx+r8],bl ;установка значения по нужному адресу
                        bts rdx,rax ;установка бита в маске по необходимому смещению

                        cmp qword ptr [rcx],rdx ;проверка метки по маске
                        jnz @end ;выход при несоответствии

                        @scess: ;блок соответствия
                        mov rax,1 ;собственный бит присутствует в диапазоне, сторонние биты отсутствуют
                        ret

                        @end: ;блок несоответствия
                        cmp qword ptr [rcx],-1 ;проверка на заполнение битами
                        je @fin ;немедленный выход при заполнении
                        mov byte ptr [rcx+r8],0 ;обнуление байта в стопоре
                        Одновременно добавили жесткое ограничение на единственный т.н. «ненумерованный» поток -1. В то мремя, как простой переход на interlocked функции вообще снимет ограничения на потоки в этом месте. Правда, при такой демонстрации вопиющего непонимания проблемы гонок есть серьёзные опасения за остальной код.

                        Проблема в том, что на стадии проверок:
                        mov bl, byte ptr[rcx+r8] ;сохранение значения под нужным смещением в регистр
                        cmp qword ptr [rcx],-1 ;проверка на заполнение битами
                        значение в памяти может быть любым и активно меняться, только что перед сохранением оно ноль, а после сохранения уже не ноль, перед проверкой на -1 не равно, а сразу после равно.

                        Обнуление при этом происходит сегментно, стопор содержит промежуточное значение 0xffff ffff ffff 0000.
                        Это с чего так думаете? Выровненное машинное слово пишется в память атомарно, да и при неатомарном сохранении нет никаких оснований думать, что повезет попасть именно на промежуточное значение.

                        К сожалению на C реализовать правильную работу данной процедуры не получится именно по той причине, что это подразумевает возможность возникновения при интерпретации дополнительных условностей. Работать по замыслу она будет только на ASM.
                        Нет, C vs Asm здеь не при чем, просто не понимаете, как оно работает.


                        1. Tatuin
                          06.03.2016 19:00
                          -2

                          Не обращайте пристального внимания на частности. Они могут изменяться, но основная парадигма остается. Представьте себе, что для работы программы вы создаете некоторое множество потоков + 1 main присутствует изначально, но обычно висит в ожидании пользовательского ввода. При этом часть потоков вы отделяете для оперативного выполнения возникающих задач и называете его рабочим пулом. Присваиваете им номера от 0 до 7, таким образом чтобы любые процедуры выполняющиеся в их рамках могли получать доступ к номеру. Остальным потокам, в том числе и main, присваиваете номера -1, -2, -3 и тд. Можно конечно присвоить 8, 9, 10, но тогда при проверке принадлежности рабочему пулу придется добавить верхнюю границу: if threadNum < 0 || threadNum > workPoolMax; Я считаю это излишним и не входящие в пул потоки просто получают отрицательную нумерацию. Причем поскольку разделяют данные они с помощью InterlockedExchange, то нет большой разницы какие номера им присвоены. Можно даже присвоить номер 0x81. Главное условие — не должно существовать потоков с одинаковым номером, при этом пропуски в нумерации потоков, не входящих в рабочий пул не критичны. Главное правильно отделять одни потоки от других.
                          При работе со стопорами в рамках рабочего пула у каждого потока есть свой байт и ведут ли они одновременную запись в стопор или нет, чужие данные они не затирают. Чуть сложнее ситуация становится, когда вклинивается левый поток не из пула, что само по себе является управляемой ситуацией и при желании может быть исключено. Наличие в программе потоков не входящих в рабочий пул не делает идею со стопорами не рабочей, однако безусловно влияет на итоговую реализацию кода. Также само количество потоков идейно никак не ограничивается.
                          Представим что у нас в наличии есть крутой core i7 с 8-ю ядрами и Hyper-Threading, что дает на 16 виртуальных ядер. И мы создаем пул из 16 потоков + еще 3 на прослушку портов, низкоприоритетную планировку задач, и ожидание пользователя. Под стопоры будем выделять не 8x8=64 бит, а уже 8x16=128 бит. Присваиваем рабочему пулу номера от 0 до 15, остальным присваиваем 666, 66010 и 100500. Модифицируем процедуру check and set bit таким образом, что потоки имеющие номера threadNum < 0 && threadNum > workPoolMax, считаются не входящими в пул. И вместо проверки qword ptr нам придется запускать цикл для проверки всех 16 байт в диапазоне стопора. Поскольку нам не удастся проверять и присваивать значения за одну asm-инструкцию, разумнее будет выделить для не входящих в пул потоков отдельный байт (или даже два для выравнивания). Поэтому если мы собираемся разрешить работу с разделяемыми данным потокам не входящим в рабочий пул, увеличиваем размер стопоров сразу до 10x16=160 байт. Последние 2 байта будут ничьими и принадлежат сразу всем не входящим в пул потокам и блокироваться инструкцией lock cmpxchg будут именно эти 2 байта.
                          В основе позиционирования потоков лежит простой абстрактный принцип, основывающийся на трехступенчатом подходе: 1) перед обращением к данным, процедура смотрит не занято ли место; 2) поток залезает через маленькую дырку в большой диапазон; 3) поток смотрит не оказался ли в большом диапазоне с ним кто-нибудь еще. В зависимости от этих условий происходит либо перезапуск, либо эксклюзивный доступ к данным. В этой абстрактной парадигме нет противоречий и претензии к одной строчке кода не отменяют работоспособности идеи.
                          Иными словами, если мыслить сферически, то никакие аппаратные ограничения не мешают нам, используя методику программно-транзакионной памяти, написать неблокирующий потокобезопасный алгоритм. Также по моему мнению ничто не мешает совместить в одной процедуре блокирующие и не блокирующие механизмы. И тривиальный метод позиционных диапазонов для нумерованных потоков это как раз тот случай. Не бойтесь изменять алгоритм, ведь главное понимать как работает принцип. А реализация должна подгоняться под конкретные условия, включающие требования оптимизации, требования задачи и прочие требования.


                          1. arabesc
                            06.03.2016 19:37
                            +1

                            Какая-то графомания.

                            Они могут изменяться, но основная парадигма остается.
                            Красивой парадигме мешает кривая реализация.

                            Чуть сложнее ситуация становится, когда вклинивается левый поток не из пула, что само по себе является управляемой ситуацией и при желании может быть исключено.
                            Виляете, как уж на сковородке.

                            Наличие в программе потоков не входящих в рабочий пул не делает идею со стопорами не рабочей, однако безусловно влияет на итоговую реализацию кода.
                            Именно, в текущем виде реализация не может считаться надёжной.

                            Также само количество потоков идейно никак не ограничивается.
                            Представим что у нас в наличии есть крутой core i7 с 8-ю ядрами и Hyper-Threading, что дает на 16 виртуальных ядер.
                            Не суть важно. Для гонки с фатальными последствиями достаточно одного ядра и нескольких переключений контекста исполнения в неподходящий для алгоритма момент.

                            Под стопоры будем выделять не 8x8=64 бит, а уже 8x16=128 бит.
                            Не сможете атомарно проверить 128-битный стопор.

                            И вместо проверки qword ptr нам придется запускать цикл для проверки всех 16 байт в диапазоне стопора.
                            Не будет работать без использования барьеров.

                            В основе позиционирования потоков лежит простой абстрактный принцип, основывающийся на трехступенчатом подходе: 1) перед обращением к данным, процедура смотрит не занято ли место;
                            Пусть не занято, причем, с точки зрения сразу нескольких потоков, т.к. клон InterlockedExchange неатомарный.

                            2) поток залезает через маленькую дырку в большой диапазон;
                            И такой поток необязательно один.

                            3) поток смотрит не оказался ли в большом диапазоне с ним кто-нибудь еще.
                            Смотрите сюда, процессор не гарантирует, что изменения из других потоков по другим адресам будут видны.

                            В зависимости от этих условий происходит либо перезапуск, либо эксклюзивный доступ к данным.
                            Достаточно, что есть шанс гонки с порчей данных из-за неэксклюзивного доступа. Реализация нерабочая.

                            В этой абстрактной парадигме нет противоречий и претензии к одной строчке кода не отменяют работоспособности идеи.
                            Идея имеет шанс заработать при использовании нормальных interlocked-функций. Вы пытаетесь их перехитрить, это не работает.

                            Иными словами, если мыслить сферически, то никакие аппаратные ограничения не мешают нам, используя методику программно-транзакионной памяти, написать неблокирующий потокобезопасный алгоритм.
                            Очень даже мешают. Реализация такого потокобезопасного алгоритма пока вами не представлена.

                            И тривиальный метод позиционных диапазонов для нумерованных потоков это как раз тот случай.
                            Что подразумевается под термином тривиальный?

                            А реализация должна подгоняться под конкретные условия, включающие требования оптимизации, требования задачи и прочие требования.
                            Представленная реализация даже при озвученных ограничениях не гарантирует надёжной работы алгоритма. Будет печально, если кто-то бездумно скопирует её в ответственный код.


                            1. Tatuin
                              06.03.2016 19:55

                              То есть в рамках сферического концептуализма в вакууме вы согласны, что редактирование потоками байтов в общем диапазоне, но под персональным смещениями не ведет к потере данных? И вас смущает только теоретическая возможность пересечения 9 потоков? Хорошо, тогда ограничьте рабочий пул 7-ю потоками с номерами от 0 до 6. При этом main-потоку присвойте номер 7. Теперь воспользуйтесь моей изначальной реализацией, но в начале check and set bit мысленно представьте, что условие отсеивания threadNum < 0 дополнено условием threadNum > workPoolMax. Величина стопора при этом пусть остается 64 бита. А после этого мысленно воспроизведите свой синтетический тест создающий нежелательную разблокировку. Заметьте как без изменения реализации, сложная неоднозначная ситуация меняется на противоположную. С 8-ю потоками становится легче? Восемь я считаю это очень удобное число, гораздо удобнее девяти.


                              1. arabesc
                                06.03.2016 20:20
                                +1

                                То есть в рамках сферического концептуализма в вакууме вы согласны, что редактирование потоками байтов в общем диапазоне, но под персональным смещениями не ведет к потере данных?
                                Не в курсе такого концептуализма. В статье предлагается вполне конкретный код на x64 asm, который якобы может заменить собой interlocked-операции. Но он не может, т.к. допускает потоконебезопасные инварианты.
                                Для описания сферических концепций в вакууме используйте, пожалуйста, что-нибудь более абстрактное, без претензий на детали реализации, которые на деле оказываются рабочими не в полной мере, что ставит их ценность и оправданность под вопрос.

                                И вас смущает только теоретическая возможность пересечения 9 потоков? Хорошо, тогда ограничьте рабочий пул 7-ю потоками с номерами от 0 до 6.
                                Меняете правила по ходу игры? На текущий момент код реализации cnsBit в статье остаётся потоконебезопасным, хотя описание алгоритма продолжает утверждать обратное.


                                1. Tatuin
                                  06.03.2016 20:29

                                  Как уже было указано мной в начале статьи, путем проб и ошибок удалось установить, что оптимальное количество потоков, работающих с разделяемыми данными — это 8. Большее кол-во накладывает лишь операционные издержки и создает сложности реализации. Соответственно оптимальный размер позиционного стопора — 64 бита. При том, если в рабочем пуле 8 потоков, то нецелесообразно допускать к разделяемым данным потоки вне пула. Если планируется, что они все же будут модифицировать общие данные, то рабочий пул стоит сократить. Причем вытеснение потоков из пула может быть динамическим, с помощью модифицирования workPoolMax. Крайний в пуле поток может проснуться и обнаружить, что теперь он работает с данными только через lock инструкции. В вашем примерах вы любите брать 9 потоков. Я в статье настаиваю на восьми. Если 9 для вас принципиально, то придется перестроить правила и реализацию именно под 9 одновременных потоков, соответственно увеличить диапазон стопора и блоки его обработки.


                                  1. arabesc
                                    06.03.2016 22:05

                                    При том, если в рабочем пуле 8 потоков, то нецелесообразно допускать к разделяемым данным потоки вне пула.
                                    Так ограничьте реализацию в соответствии с этой новой целесообразностью.
                                    Разработчик взаимодействует с кодом через протокол интерфейса, определяющий возможные инварианты состояний, а не через мутное понятие целесообразности.

                                    В вашем примерах вы любите брать 9 потоков.
                                    Это ведь допустимый инвариант? Но он небезопасный, как и другие, где необязательно 9 потоков, что зависит от спектра допустимых значений threadNum.

                                    Я в статье настаиваю на восьми.
                                    Но не ограничиваете ими реализацию.


                                    1. Tatuin
                                      07.03.2016 05:27

                                      Добавлены пояснения по настройке исходных данных в статью. Также добавлены тесты HeapAlloc/HeapFree.


                                      1. arabesc
                                        07.03.2016 05:40

                                        Также добавлены тесты HeapAlloc/HeapFree.
                                        Благодарю. Странно, что HeapAlloc получился заметно медленнее malloc, который суть тот же HeapAlloc с кучей по умолчанию.


                                  1. arabesc
                                    07.03.2016 00:16
                                    +2

                                    Если 9 для вас принципиально, то придется перестроить правила и реализацию именно под 9 одновременных потоков, соответственно увеличить диапазон стопора и блоки его обработки.
                                    Есть основание считать, что оно все равно не будет безопасно работать, для любого количества потоков.
                                    8.2.3.5 Intra-Processor Forwarding Is Allowed
                                    The memory-ordering model allows concurrent stores by two processors to be seen in different orders by those two processors; specifically, each processor may perceive its own store occurring before that of the other.

                                    The memory-ordering model imposes no constraints on the or der in which the two stores appear to execute by the two processors. This fact allows processor 0 to see its store before seeing processor 1's, while processor 1 sees its store before seeing processor 0's. (Each processor is self consistent.)…
                                    In practice, the reordering in this example can arise as a result of store-buffer forwarding. While a store is temporarily held in a processor's store buffer, it can satisfy the processor's own loads but is not visible to (and cannot satisfy) loads by other processors.
                                    Т.е. без использования средств синхронизации процессоры свободны не видеть записи других процессоров, что делает невозможным надёжную реализацию исключительного доступа предложенным способом.


  1. Ivan_83
    07.03.2016 03:03

    INT64 — бррр!
    uint64_t, size_t (SIZE_T в венде) — оно автоматом меняется в зависимости от архитектуры, кроме того знак тут вообще нигде не нужен.
    Не стоит использовать виндовые типы в верхнем регистре, потому что код будет менее переносим.

    Ещё если нужна скорость можно делать свои надстройки над HeapAlloc(), как минимум для того чтобы закреплять за каждый потоком отдельную кучу для выделения памяти, освобождаемую память ставить в очередь на освобождение через SList с атомарным доступом на основе интелокед.
    Те выделение памяти вообще без блокировок.