В этой статье мы рассмотрим следующую задачу: поиск и поддержание медианы среди целых чисел, которые последовательно попадают на обработку. В этом посте мы поставим задачу, разберём все необходимые вводные, предложим и оценим сложность решения.

Постановка задачи


На вход алгоритму подаётся поток целых чисел, т.е. количество чисел N может быть неизвестно, но мы будем считать, что массив задан наперёд и его длина очень большая. Требуется разработать алгоритм, который определяет медиану текущего массива, т.е. считанного из исходного к данному моменту. При этом требуется, чтобы сложность такого алгоритма была O(Nlog(N)).


Медиана ряда чисел


Начнём с базовых понятий. Медианой называется число, стоящее в упорядоченном ряде чисел посередине. Например в ряду: 1, 2, 3, 7, 9 — тройка является медианой. Если количество чисел чётное, то за медиану принимают среднее двух стоящих в центре чисел.

Либо можно выбирать элемент под номером k/2, если k чётное и (k+1)/2, если нечетное.



Наивный подход


Давайте обсудим бейзлайновое решение, при котором медиану можно получить за O(N^2).

Пусть каждое новое число из потока мы будем вставлять в массив так, чтобы массив оставался упорядоченным. Затем будем выбирать элемент из середины и добавлять его в список медиан.

medians = []
items_read = []
with open('./Median.txt', 'r') as f:
    for line in f:
        num = int(line.strip())
        for idx in range(len(items_read)):
            if num < items_read[idx]:
                items_read.insert(idx, num)
                break
        if len(items_read) % 2 == 0:
            median_pos = len(items_read) // 2
        else:
            median_pos = len(items_read) + 1 // 2
        medians.append(median_pos)

Как упоминалось выше, этот алгоритм будет иметь квадратичную сложность, поскольку для каждого из N элементов потока, мы выполняем линейную работу по поиску места и вставке элемента в массив.


Улучшить этот результат нам поможет структура данных — куча.

Куча. Min-heap, max-heap


Рассмотрим кучу на примере min-heap. Min-heap — это бинарное дерево, обладающее двумя следующими свойствами:

  1. ключи любого узла этого дерева всегда меньше, чем ключи его двух дочерних узлов,
  2. такое дерево является полным, т.е. у него должны быть заполнены все уровни, за исключением последнего.

image
Пример кучи (источник)

Аналогично образом задаётся max-heap, нужно заменить «меньше» на «больше» в первом свойстве.
При решении задачи мы хотим воспользоваться операциями, которые благодаря построению кучи, могут быть выполнены быстрее, чем за линейное время.

Первая из этих операций: взятие минимума (максимума) и удаление


Работая с кучей, операцию взятия минимума можно осуществить за константное время. Поскольку минимум всегда хранится в корне дерева, то узнать его значение не составляет труда. Если же мы хотим удалить минимум и назначить на его место следующий по величине элемент, то нам потребуется вызвать метод extract, чья временная сложность тоже меньше линейной и равна O(log(N)).

Метод extract внутри себя запускает следующий процесс: сначала элемент с самого последнего уровня ставится в корень дерева, затем на корне дерева стартует метод bubble_down, который уровень за уровнем (а таких всего log(N) в полном дереве) опускает новый корневой узел.
Код реализации на языке Python смотри ниже.

Вторая операция: добавление элемента


Чтобы добавить произвольный элемент в кучу требуется выставить новый элемент на правильное место, не утратив 2 свойства кучи. Для этого новый элемент добавляется на последний уровень, а затем методом bubble_up поднимается в сторону корня, пока над ним не окажется элемент меньший него или он не станет корнем. Сложность этой операции также равна O(log(N))

Код, в котором мы определим необходимую функциональность с возможностью определения min и max-heap:

class Heap(object):
    def __init__(self, array, type):
        super(Heap, self).__init__()
        self.type = type
        self.data = []
        self.heapify(array)
    
    def heapify(self, array):
        for item in array:
            self.insert(item)

    def bubble_down(self, array, index=0):
        left = 2 * index + 1 
        right = 2 * index + 2
        size = len(array)
        least = index
        if self.type == 'min':
            if left < size and array[left] < array[least]:
                least = left
            if right < size and array[right] < array[least]:
                least = right
            if least != index:
                tmp_val = array[least]
                array[least] = array[index]
                array[index] = tmp_val
                self.bubble_down(array, index=least)
        else:
            if left < size and array[left] > array[least]:
                least = left
            if  right < size and array[right] > array[least]:
                least = right
            if least != index:
                tmp_val = array[least]
                array[least] = array[index]
                array[index] = tmp_val
                self.bubble_down(array, index=least)
    
    def bubble_up(self, array, index):
        parent = (index - 1) // 2
        if self.type == 'min':
            if array[index] < array[parent] and parent >= 0:
                tmp_val = array[parent]
                array[parent] = array[index]
                array[index] = tmp_val
                self.bubble_up(array, index=parent)
        else:
            if array[index] > array[parent] and parent >= 0:
                tmp_val = array[parent]
                array[parent] = array[index]
                array[index] = tmp_val
                self.bubble_up(array, index=parent)

    def extract(self):
        root = self.data.pop(0)
        self.data.insert(0, self.data.pop(-1))
        self.bubble_down(self.data, 0)
        return root

    def insert(self, element):
        self.data.append(element)
        self.bubble_up(self.data, index=len(self.data)-1)

if __name__ == "__main__":
    a = [3,99,4,88,0,5,1,2]
    b = Heap(a, type='max')
    print(b.data)

Оптимальное решение


Теперь перейдем непосредственно к реализации алгоритма контроля медианы, основанном на использовании кучи. Мы будем использовать две кучи, одну минимальную, другую максимальную. Идея заключается в следующем: давайте разделим поток значений на верхнюю часть, содержащую большие значения и нижнюю, содержащую меньшие значения. Первую реализуем на основе min-heap, чтобы легко получать минимальный элемент, который лежит на разделе, а вторую на основе max-heap.

Всякий раз, когда мы читаем из потока очередное число, будем добавлять его в верхнюю часть, если оно больше наименьшего из этой половины и в нижнюю часть, если верно обратное. Затем, осуществив вставку, будем балансировать две части, чтобы они содержали по половине из введенных значений.

Код:

from heap import Heap


top_nums = Heap([], 'min')
bot_nums = Heap([], 'max')
medians = []
with open('./Median.txt', 'r') as f:
    for line in f:
        num = int(line.strip())
        if len(top_nums.data):
            top_smallest = top_nums.data[0]
            if num > top_smallest:
                top_nums.insert(num)
            else:
                bot_nums.insert(num)
        else:
            bot_nums.insert(num)
        while len(top_nums.data) > len(bot_nums.data):
            bot_nums.insert(top_nums.extract())
        while len(top_nums.data) + 1 < len(bot_nums.data):
            top_nums.insert(bot_nums.extract())
        medians.append(bot_nums.data[0])

Каждую итерацию внешнего цикла, мы делаем несколько шагов сложностью O(log(N)), посколько операции вставки и получения элемента из кучи ограничены этой сложностью. По этой причине итоговая сложность не превышает O(Nlog(N)).

Заключение


В этой статье на примере задачи мы обсудили преимущества кучи по сравнению со списком. Познакомились с временной сложностью операций над этой структурой данных. Реализовали код этой структуры, необходимый для эффективного выполнения задачи по поиску медианного элемента в потоке чисел.

В преддверии старта курса «Алгоритмы и структуры данных» приглашаем всех желающих на бесплатный двухдневный интенсив по теме: Алгоритм сжатия данных — код Хаффмана.