В этой статье мы рассмотрим следующую задачу: поиск и поддержание медианы среди целых чисел, которые последовательно попадают на обработку. В этом посте мы поставим задачу, разберём все необходимые вводные, предложим и оценим сложность решения.
Постановка задачи
На вход алгоритму подаётся поток целых чисел, т.е. количество чисел может быть неизвестно, но мы будем считать, что массив задан наперёд и его длина очень большая. Требуется разработать алгоритм, который определяет медиану текущего массива, т.е. считанного из исходного к данному моменту. При этом требуется, чтобы сложность такого алгоритма была
Медиана ряда чисел
Начнём с базовых понятий. Медианой называется число, стоящее в упорядоченном ряде чисел посередине. Например в ряду: 1, 2, 3, 7, 9 — тройка является медианой. Если количество чисел чётное, то за медиану принимают среднее двух стоящих в центре чисел.
Либо можно выбирать элемент под номером , если чётное и если нечетное.
Наивный подход
Давайте обсудим бейзлайновое решение, при котором медиану можно получить за .
Пусть каждое новое число из потока мы будем вставлять в массив так, чтобы массив оставался упорядоченным. Затем будем выбирать элемент из середины и добавлять его в список медиан.
medians = []
items_read = []
with open('./Median.txt', 'r') as f:
for line in f:
num = int(line.strip())
for idx in range(len(items_read)):
if num < items_read[idx]:
items_read.insert(idx, num)
break
if len(items_read) % 2 == 0:
median_pos = len(items_read) // 2
else:
median_pos = len(items_read) + 1 // 2
medians.append(median_pos)
Как упоминалось выше, этот алгоритм будет иметь квадратичную сложность, поскольку для каждого из элементов потока, мы выполняем линейную работу по поиску места и вставке элемента в массив.
Улучшить этот результат нам поможет структура данных — куча.
Куча. Min-heap, max-heap
Рассмотрим кучу на примере min-heap. Min-heap — это бинарное дерево, обладающее двумя следующими свойствами:
- ключи любого узла этого дерева всегда меньше, чем ключи его двух дочерних узлов,
- такое дерево является полным, т.е. у него должны быть заполнены все уровни, за исключением последнего.
Пример кучи (источник)
Аналогично образом задаётся max-heap, нужно заменить «меньше» на «больше» в первом свойстве.
При решении задачи мы хотим воспользоваться операциями, которые благодаря построению кучи, могут быть выполнены быстрее, чем за линейное время.
Первая из этих операций: взятие минимума (максимума) и удаление
Работая с кучей, операцию взятия минимума можно осуществить за константное время. Поскольку минимум всегда хранится в корне дерева, то узнать его значение не составляет труда. Если же мы хотим удалить минимум и назначить на его место следующий по величине элемент, то нам потребуется вызвать метод extract, чья временная сложность тоже меньше линейной и равна .
Метод extract внутри себя запускает следующий процесс: сначала элемент с самого последнего уровня ставится в корень дерева, затем на корне дерева стартует метод bubble_down, который уровень за уровнем (а таких всего в полном дереве) опускает новый корневой узел.
Код реализации на языке Python смотри ниже.
Вторая операция: добавление элемента
Чтобы добавить произвольный элемент в кучу требуется выставить новый элемент на правильное место, не утратив 2 свойства кучи. Для этого новый элемент добавляется на последний уровень, а затем методом bubble_up поднимается в сторону корня, пока над ним не окажется элемент меньший него или он не станет корнем. Сложность этой операции также равна
Код, в котором мы определим необходимую функциональность с возможностью определения min и max-heap:
class Heap(object):
def __init__(self, array, type):
super(Heap, self).__init__()
self.type = type
self.data = []
self.heapify(array)
def heapify(self, array):
for item in array:
self.insert(item)
def bubble_down(self, array, index=0):
left = 2 * index + 1
right = 2 * index + 2
size = len(array)
least = index
if self.type == 'min':
if left < size and array[left] < array[least]:
least = left
if right < size and array[right] < array[least]:
least = right
if least != index:
tmp_val = array[least]
array[least] = array[index]
array[index] = tmp_val
self.bubble_down(array, index=least)
else:
if left < size and array[left] > array[least]:
least = left
if right < size and array[right] > array[least]:
least = right
if least != index:
tmp_val = array[least]
array[least] = array[index]
array[index] = tmp_val
self.bubble_down(array, index=least)
def bubble_up(self, array, index):
parent = (index - 1) // 2
if self.type == 'min':
if array[index] < array[parent] and parent >= 0:
tmp_val = array[parent]
array[parent] = array[index]
array[index] = tmp_val
self.bubble_up(array, index=parent)
else:
if array[index] > array[parent] and parent >= 0:
tmp_val = array[parent]
array[parent] = array[index]
array[index] = tmp_val
self.bubble_up(array, index=parent)
def extract(self):
root = self.data.pop(0)
self.data.insert(0, self.data.pop(-1))
self.bubble_down(self.data, 0)
return root
def insert(self, element):
self.data.append(element)
self.bubble_up(self.data, index=len(self.data)-1)
if __name__ == "__main__":
a = [3,99,4,88,0,5,1,2]
b = Heap(a, type='max')
print(b.data)
Оптимальное решение
Теперь перейдем непосредственно к реализации алгоритма контроля медианы, основанном на использовании кучи. Мы будем использовать две кучи, одну минимальную, другую максимальную. Идея заключается в следующем: давайте разделим поток значений на верхнюю часть, содержащую большие значения и нижнюю, содержащую меньшие значения. Первую реализуем на основе min-heap, чтобы легко получать минимальный элемент, который лежит на разделе, а вторую на основе max-heap.
Всякий раз, когда мы читаем из потока очередное число, будем добавлять его в верхнюю часть, если оно больше наименьшего из этой половины и в нижнюю часть, если верно обратное. Затем, осуществив вставку, будем балансировать две части, чтобы они содержали по половине из введенных значений.
Код:
from heap import Heap
top_nums = Heap([], 'min')
bot_nums = Heap([], 'max')
medians = []
with open('./Median.txt', 'r') as f:
for line in f:
num = int(line.strip())
if len(top_nums.data):
top_smallest = top_nums.data[0]
if num > top_smallest:
top_nums.insert(num)
else:
bot_nums.insert(num)
else:
bot_nums.insert(num)
while len(top_nums.data) > len(bot_nums.data):
bot_nums.insert(top_nums.extract())
while len(top_nums.data) + 1 < len(bot_nums.data):
top_nums.insert(bot_nums.extract())
medians.append(bot_nums.data[0])
Каждую итерацию внешнего цикла, мы делаем несколько шагов сложностью , посколько операции вставки и получения элемента из кучи ограничены этой сложностью. По этой причине итоговая сложность не превышает .
Заключение
В этой статье на примере задачи мы обсудили преимущества кучи по сравнению со списком. Познакомились с временной сложностью операций над этой структурой данных. Реализовали код этой структуры, необходимый для эффективного выполнения задачи по поиску медианного элемента в потоке чисел.
В преддверии старта курса «Алгоритмы и структуры данных» приглашаем всех желающих на бесплатный двухдневный интенсив по теме: Алгоритм сжатия данных — код Хаффмана.
middle
Можно просто собрать в массив и отсортировать его. Получаем те же O(n) по памяти и те же O(n*log n) по времени. При этом сортировка некоторыми алгоритмами будет в невырожденном случае опережать сортировку кучей, которую вы фактически предложили, за счёт меньшей константы (которая тоже имеет значение).
А потом идём в гугл и таки находим алгоритм с линейным средним временем выполнения…