Прочитав эту статью, я вспомнил, как писал внешнюю сортировку, которая использовала O(1) внешней памяти. Функция получала бинарый файл и максимальный размер памяти, которую она могла выделить под массив:

void ext_sort(const std::string filename, const size_t memory)

Я использовал алгоритм из Effective Performance of External Sorting with No Additional Disk Space:

  1. Разделим файл на блоки, которые помещаются в доступную память. Обозначим эти блоки Block_1, Block_2, …, Block_(S-1), Block_S. Установим P = 1.
  2. Читаем Block_P в память.
  3. Отсортируем данные в памяти и запишем назад в Block_P. Установим P = P + 1, и если P ? S, то читаем Block_P в память и повторяем этот шаг. Другими словами, отсортируем каждый блок файла.
  4. Разделим каждый блок на меньшие блоки B_1 и B_2. Каждый из таких блоков занимает половину доступной памяти.
  5. Читаем блок B_1 блока Block_1 в первую половину доступной памяти. Установим Q = 2.
  6. Читаем блок B_1 блока Block_Q во вторую половину доступной памяти.
  7. Объеденим массивы в памяти с помощью in-place слияния, запишем вторую половину памяти в блок B_1 блока Block_Q и установим Q = Q + 1, если Q ? S, читаем блок B_1 блока Block_Q во вторую половину доступной памяти и повторяем этот шаг.
  8. Записываем первую половину доступной памяти в блок B_1 блока Block_1. Так как мы всегда оставляли в памяти меньшую половину элементов и провели слияние со всеми блоками, то в этой части памяти хранятся M минимальных элементы всего файла.
  9. Читаем блок B_2 блока Block_S во вторую половину доступной памяти. Установим Q = S ?1.
  10. Читаем блок B_2 блока Block_Q в первую половину доступной памяти.
  11. Объеденим массивы в памяти с помощью in-place слияния, запишем первую половину доступной памяти в блок B_2 блока Block_Q и установим Q = Q ?1. Если Q ? 1 читаем блок B_2 блока Block_Q в первую половину доступной памяти и повторяем этот шаг.
  12. Записываем вторую половину доступной памяти в блок B_2 блока Block_S. Аналогично шагу 8, тут хранятся максимальные элементы всего файла.
  13. Начиная от блока B_2 блока Block_1 и до блока B_1 блока Block_S, определим новые блоки в файле и снова пронумеруем их Block_1 to Block_S. Разделим каждый блок на блоки B_1 и B_2. Установим P = 1.
  14. Читаем B_1 и B_2 блока Block_P в память. Объеденим массивы в памяти. запишем отсортированный массив назад в Block_P и установим P = P +1. Если P ? S, повторяем этот шаг.
  15. Если S > 1, возвращаемся к шагу 5. Каждый раз мы выделяем M минимальных и максимальных элементов, записываем их в начало и конец файла соответственно, а потом делаем то же самое с оставшимися элементами, пока не дойдем до середины файла.

Преимущество такого алгоритма, кроме отсутствия буфера на диске, это то, что с диска мы читаем данные относительно большими порциями, что ускоряет алгоритм.

Реализуем алгоритм на C++.

Для начала определим количество блоков и размер блока в байтах и в элементах и выделим память:

	const size_t type_size = sizeof(T);
	const uint64_t filesize = file_size(filename);
	std::fstream data(filename, std::ios::in | std::ios::out | std::ios::binary);
	const uint64_t chunk_number = filesize / memory;
	const size_t chunk_size = memory / type_size - (memory / type_size) % 2, chunk_byte_size =
		chunk_size * type_size, half_chunk_byte_size = chunk_byte_size / 2, half_chunk_size = chunk_size / 2;

	std::vector<T> *chunk = new std::vector<T>(chunk_size);

Теперь пункты 2-3 — сортируем каждый блок:

	for (uint64_t i = 0; i < chunk_number; ++i) {
		data.seekg(chunk_byte_size * i);
		data.read((char *) chunk->data(), chunk_byte_size);
		flat_quick_sort(chunk->begin(), chunk->end());
		data.seekp(chunk_byte_size * i);
		data.write((char *) chunk->data(), chunk_byte_size);
	}

Саму сортировку мы напишем чуть позднее.

Приступим к слияниям. Нижняя половина:

	int64_t s = chunk_number, start = 0;
	while (s > 0) {
		data.seekp(half_chunk_byte_size * start);
		data.read((char *) chunk->data(), half_chunk_byte_size);
		for (int64_t q = 1; q < s; ++q) {
			data.seekg(half_chunk_byte_size * start + chunk_byte_size * q);
			data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
			in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
			data.seekp(half_chunk_byte_size * start + chunk_byte_size * q);
			data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
		}
		data.seekp(half_chunk_byte_size * start);
		data.write((char *) chunk->data(), half_chunk_byte_size);

И аналогично верхняя:

		data.seekp(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
		data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
		for (int64_t q = s - 2; q >= 0; --q) {
			data.seekg(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
			data.read((char *) chunk->data(), half_chunk_byte_size);
			in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
			data.seekp(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
			data.write((char *) chunk->data(), half_chunk_byte_size);
		}
		data.seekg(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
		data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);

Перераспределяем блоки, завершаем цикл и не забываем освободить память:

		--s;
		++start;
		for (int64_t p = 0; p < s; ++p) {
			data.seekp(half_chunk_byte_size * start + chunk_byte_size * p);
			data.read((char *) chunk->data(), chunk_byte_size);
			in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
			data.seekg(half_chunk_byte_size * start + chunk_byte_size * p);
			data.write((char *) chunk->data(), chunk_byte_size);
		}
	}

	delete chunk;
}

Функция полностью
template<typename T>
void ext_sort(const std::string filename, const size_t memory) {

	const size_t type_size = sizeof(T);
	const uint64_t filesize = file_size(filename);
	std::fstream data(filename, std::ios::in | std::ios::out | std::ios::binary);
	const uint64_t chunk_number = filesize / memory;
	const size_t chunk_size = memory / type_size - (memory / type_size) % 2, chunk_byte_size =
		chunk_size * type_size, half_chunk_byte_size = chunk_byte_size / 2, half_chunk_size = chunk_size / 2;

	std::vector<T> *chunk = new std::vector<T>(chunk_size);
	for (uint64_t i = 0; i < chunk_number; ++i) {
		data.seekg(chunk_byte_size * i);
		data.read((char *) chunk->data(), chunk_byte_size);
		flat_quick_sort(chunk->begin(), chunk->end());
		data.seekp(chunk_byte_size * i);
		data.write((char *) chunk->data(), chunk_byte_size);
	}

	int64_t s = chunk_number, start = 0;
	while (s > 0) {
		data.seekp(half_chunk_byte_size * start);
		data.read((char *) chunk->data(), half_chunk_byte_size);
		for (int64_t q = 1; q < s; ++q) {
			data.seekg(half_chunk_byte_size * start + chunk_byte_size * q);
			data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
			in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
			data.seekp(half_chunk_byte_size * start + chunk_byte_size * q);
			data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
		}
		data.seekp(half_chunk_byte_size * start);
		data.write((char *) chunk->data(), half_chunk_byte_size);

		data.seekp(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
		data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
		for (int64_t q = s - 2; q >= 0; --q) {
			data.seekg(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
			data.read((char *) chunk->data(), half_chunk_byte_size);
			in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
			data.seekp(half_chunk_byte_size * (start + 1) + chunk_byte_size * q);
			data.write((char *) chunk->data(), half_chunk_byte_size);
		}
		data.seekg(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size);
		data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
		--s;
		++start;
		for (int64_t p = 0; p < s; ++p) {
			data.seekp(half_chunk_byte_size * start + chunk_byte_size * p);
			data.read((char *) chunk->data(), chunk_byte_size);
			in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size);
			data.seekg(half_chunk_byte_size * start + chunk_byte_size * p);
			data.write((char *) chunk->data(), chunk_byte_size);
		}
	}

	delete chunk;
}


Осталось реализовать функции flat_quick_sort и in_place_merge. Идею (и большую часть реализации) flat quick sort я взял в статье хабраюзера ripatti. Я только заменил median of medians (посчитал это оверкиллом в среднем случае) на median-of-nine и добавил сортировку вставками для маленьких частей массива.

flat_quicksort.h
#ifndef EXTERNAL_SORT_FLAT_QUICKSORT_H
#define EXTERNAL_SORT_FLAT_QUICKSORT_H

template<class ForwIt>
void insertion_sort(ForwIt be, ForwIt en) {
	for (ForwIt ii = be + 1; ii != en; ii++) {
		ForwIt jj = ii - 1;
		auto val = *ii;
		while (jj >= be and *jj > val) {
			*(jj + 1) = *jj;
			--jj;
		}
		*(jj + 1) = val;
	}
}

template<class ForwIt>
ForwIt median_of_3(ForwIt it1, ForwIt it2, ForwIt it3) {
	return (*it1 > *it2) ?
	       (*it3 > *it2) ? (*it1 > *it3) ? it3 : it1 : it2 :
	       (*it3 > *it1) ? (*it2 > *it3) ? it3 : it2 : it1;
}

template<class ForwIt>
ForwIt choose_pivot(ForwIt be, ForwIt en) {
	ptrdiff_t s = (en - be) / 8;
	ForwIt mid = be + (en - be) / 2;
	ForwIt best1 = median_of_3(be, be + s, be + 2 * s), best2 = median_of_3(mid - s, mid, mid + s), best3 = median_of_3(
		en - 2 * s, en - s, en);
	return median_of_3(best1, best2, best3);
}

// search for the end of the current block
template<class ForwIt>
ForwIt block_range(ForwIt be, ForwIt en) {
	ForwIt it = be;
	while (it != en) {
		if (*be < *it)
			return it;
		++it;
	}
	return it;
}

// warning: after the partition outer pivot may point to random element
template<class ForwIt>
std::pair<ForwIt, ForwIt> partition_range(ForwIt be, ForwIt en, ForwIt pivot) {
	std::pair<ForwIt, ForwIt> re;
	ForwIt j = be;
	for (ForwIt i = be; i != en; ++i)
		if (*i < *pivot) {
			if (pivot == i) pivot = j; else if (pivot == j) pivot = i;
			std::swap(*j, *i);
			++j;
		}
	re.first = j;
	for (ForwIt i = j; i != en; ++i)
		if (*pivot == *i) {
			if (pivot == i) pivot = j; else if (pivot == j) pivot = i;
			std::swap(*j, *i);
			++j;
		}
	re.second = j;
	return re;
}
// makes largest element the first
template<class ForwIt>
void blockify(ForwIt be, ForwIt en) {
	for (ForwIt it = be; it != en; ++it)
		if (*be < *it)
			std::swap(*be, *it);
}

template<class ForwIt>
void flat_quick_sort(ForwIt be, ForwIt en) {
	ForwIt tmp = en; // the current end of block
	while (be != en) {
		if (std::is_sorted(be, tmp)) {
			be = tmp;
			tmp = block_range(be, en);
			continue;
		}
		if (tmp - be < 32)
			insertion_sort(be, tmp);
		else {
			ForwIt pivot = choose_pivot(be, tmp);
			std::pair<ForwIt, ForwIt> range = partition_range(be, tmp, pivot);
			blockify(range.second, tmp);
			tmp = range.first;
		}
	}
}

#endif //EXTERNAL_SORT_FLAT_QUICKSORT_H


Со слиянием было сложнее. Сначала я использовал заглушку, использующую O(n) памяти:

template<typename T>
void merge(std::vector<T> &chunk, size_t s, size_t q, size_t r) {
	std::vector<T> *chunk2 = new std::vector<T>(q - s + 1);
	auto it2 = chunk2->begin(), it1 = chunk.begin() + q + 1, it = chunk.begin() + s;
	std::copy(it, it1, it2);
	while (it2 != chunk2->end() && it1 != chunk.begin() + r + 1) {
		if (*it1 > *it2) {
			*it = *it2;
			++it2;
		} else {
			*it = *it1;
			++it1;
		}
		++it;
	}
	if (it1 == chunk.begin() + r + 1)
		std::copy(it2, chunk2->end(), it);
	else
		std::copy(it1, chunk.begin() + r + 1, it);
	delete chunk2;
}

Когда я захотел заменить заглушку in-place версией, оказалось, что быстрые алгоритмы in-place слияния в большинстве своем достаточно запутанные (посмотрите, например, On optimal and efficient in place merging). Мне надо было что-то попроще, и я выбрал алгоритм из статьи A simple algorithm for in-place merging:

in-place merge
template<typename Iter>
void merge_B_and_Y(Iter z, Iter y, Iter yn) {
	for (; z < y && y < yn; ++z) {
		Iter j = std::min_element(z, y);
		if (*j <= *y)
			std::swap(*z, *j);
		else {
			std::swap(*z, *y);
			++y;
		}
	}
	if (z < y)
		flat_quick_sort(z, yn);
}

template<typename Iter>
Iter find_next_X_block(Iter x0, Iter z, Iter y, size_t k, size_t f, Iter b1,
                       Iter b2, auto max) {
	auto min1 = max, min2 = max;
	Iter m = x0 + (ptrdiff_t) floor((z - x0 - f) / k) * k + f, x = x0;
	if (m <= z)
		m += k;

	for (auto i = m; i + k <= y; i += k) {
		if (i != b1 && i != b2) {
			Iter j = (i < b1 && b1 < i + k) ? m - 1 : i + k - 1;
			if (*i <= min1 && *j <= min2) {
				x = i;
				min1 = *i;
				min2 = *j;
			}
		}
	}
	return x;
}

template<typename Iter>
void in_place_merge(Iter x0, Iter y0, Iter yn, int64_t k, bool rec) {
	if (k == -1)
		k = (int64_t) sqrt(yn - x0);
	size_t f = (y0 - x0) % k;
	Iter x = (f == 0) ? y0 - 2 * k : y0 - k - f;
	auto t = *x, max = *std::max_element(x0, yn);
	*x = *x0;
	Iter z = x0, y = y0, b1 = x + 1, b2 = y0 - k;
	int i = 0;
	while (y - z > 2 * k) {
		++i;
		if (*x <= *y || y >= yn) {
			*z = *x;
			*x = *b1;
			++x;
			if ((x - x0) % k == f) if (z < x - k)
				b2 = x - k;
			x = find_next_X_block(x0, z, y, k, f, b1, b2, max);
		} else {
			*z = *y;
			*y = *b1;
			++y;
			if ((y - y0) % k == 0)
				b2 = y - k;
		}
		++z;
		*b1 = *z;
		if (z == x)
			x = b1;
		if (z == b2)
			b2 = yn + 1;
		++b1;
		if ((b1 - x0) % k == f)
			b1 = (b2 == yn + 1) ? b1 - k : b2;
	}
	*z = t;
	if (rec)
		merge_B_and_Y(z, y, yn);
	else  {
		flat_quick_sort(z, y);
		in_place_merge(z,y,yn,(int64_t)sqrt(k),true);
	}
}


Но на моем компьютере замена merge на in-place merge замедляла алгоритм почти на порядок. Возможно я ошибся в реализации или просто выбрал медленный алгоритм в погоне за простотой. Времени разбираться, как всегда, не было, к тому же gprof почему-то падал. И тут меня осенило. Если мы выделям M байт динамической памяти, то не важно, как мы её используем, мы все равно получаем O(1). Тогда просто выделим 2/3 под данные, а треть — под буфер слияния. Замедление будет гораздо меньше. И правда:
Алгоритм Время (75MB int64 в 7,5MB памяти) Скорость (75MB int64 в 7,5MB памяти) Время (7,5MB int64 в 75KB памяти) Скорость (7,5MB int64 в 75KB памяти) Время (750MB int64 в 75MB памяти) Скорость (750MB int64 в 75MB памяти)
In-place merge 6.04329 s 1 241 045 B/s 24.2993 s 3 086 508 B/s - -
Merge 0.932663 s 8 041 489 B/s 2.73895 s 27 382 756 B/s 47.7946 s 15 691 689 B/s
Алгоритм SLY_G 1.79629 s 4175272 B/s 3.84775 s 19 491 910 B/s 39.77 s 18 858 436 B/s
К сожалению, на больших объемах алгоритм замедляется, что ожидаемо, ведь мы не используем никакого буфера вообще. Тем не менее, скорость алгоритма достаточно адекватная, и, я уверен, может быть улучшена.

Все исходники лежат здесь.

Комментарии (9)


  1. Ramires
    09.10.2015 14:07

    Если я верно понял, это вариация "In-place merge sort"?

    Да, надо читать теги.


    1. Randl
      09.10.2015 14:15
      +1

      Не совсем. In-place merge sort это вариант сортировки слиянием, который не использует внешний буфер. При этом весь массив находится в памяти.
      Для внешней сортировки (когда данные не помещаются в памяти) нельзя использовать те же алгоритмы, это будет слишком медленно. Этот алгоритм — внешняя сортировка, точнее, её вариант, не требующий буфера на диске. Он может пригодится, например, если вам нужно отсортировать 980Гб данных на терабайтном диске (хотя это и займет кучу времени, судя по всему).


  1. Door
    09.10.2015 16:44
    +2

    А зачем вы делаете new std::vector<T>? Это никак не влияет на скорость/пам'ять?


    1. 3Hren
      09.10.2015 16:56
      +6

      Влияет и, более того, бессмысленно.

      Из серии Yo dawg, I heard you like to allocate on heap so we put your vector on heap so you can allocate on heap while allocating on heap.


  1. 3Hren
    09.10.2015 16:47
    -1

    Что будет, если между new и delete будет выброшено исключение?
    Что будет, если условие не выполняется?

    std::is_trivially_copyable<T>::value == false
    


  1. Mrrl
    09.10.2015 19:23

    Сколько получается операций слияния блоков? O(S^2), или, всё-таки, меньше?


    1. Randl
      09.10.2015 23:59

      O(S^2): Мы делаем S проходов, и для каждого K от 1 до S делаем S-1 слияний с первым блоком и столько же с последним. Получается S(S-1) слияний.


      1. Mrrl
        10.10.2015 08:00

        То есть, получается сложность O(N*S*log(N/S)). Многовато…
        С помощью блочного in-place merge улучшить её, конечно, не получится: при поиске следующего блока придётся проходить по всему файлу, а чтение первой и последней записи работает ненамного быстрее, чем чтение блока целиком.
        Впрочем, есть и другой, более простой вариант in-place merge sort. Правда, использовать его для файла не очень удобно.
        Но должно быть легко реализовать обычный quicksort: при разделении очередного фрагмента подгружаете в память его начало и конец, а потом двигаете их навстречу пока они не сомкнутся. Когда маленький фрагмент умещается в память — сортируете его там целиком. Получится O(S*log(S)) чтений/записей блока.


  1. Sonic_SE
    11.10.2015 20:49

    stxxl