Некоторое время назад мой коллега попросил помочь ему с одной проблемой. Проблему я ему решил, но кроме того, мне показалось, что на решении этой проблемы можно объяснить несколько алгоритмов и приёмов программирования. А также показать ускорение времени выполнения алгоритма с 25 сек до 40 мс.


Постановка задачи


Для личного проекта моему коллеге понадобился алгоритм нахождения для заданного видео пятидесяти максимально похожих видео. Похожесть предполагалось оценивать по количеству совпадающих выставленных тегов. Чем больше тегов у видео совпадает, тем более они похожи. Из этого можно сразу сделать несколько выводов:


  • все теги под видео можно объединить в одну группу;
  • таких групп точно будет не больше чем самих видео;
  • если видео похоже на другое видео из какой-то группы тегов, то оно в такой же степени похоже на другие видео из этой группы;

Выходит, что достаточно работать только с группами тегов. В первой версии коллега решил хранить теги в таблице тегов: у каждого видео есть ссылка на ID группы тегов, а сами группы представляют собой последовательность булевых значений, которые показывают выставлен ли соответствующий тег. На C# группа тегов выглядит вот так:


public class TagsGroup {
    bool[] InnerTags { get; }
}

Коллега предполагал, что на сайте у него будет не более одного миллиона видео, а различных тегов не более 4000, для круглого счёта можно взять 4096 = 2^12.
Тогда класс TagsGroup можно представить вот в таком виде:


public class TagsGroup {
    public const int TagsGroupLength = 4096;
    bool[] InnerTags { get; }

    public TagsGroup(bool[] innerTags) {
        if (innerTags == null)
            throw new ArgumentException(nameof(innerTags));
        if (innerTags.Length != TagsGroupLength) {
            throw new ArgumentException(nameof(innerTags));
        }
        InnerTags = innerTags;
    }
}

Теперь необходимо проверить две группы тегов на похожесть. В текущих условиях это превращается в простую проверку на совпадение true в соответствующих элементах массивов InnerTags у двух групп тегов:


public static int MeasureSimilarity(TagsGroup a, TagsGroup b) {
    int result = 0;
    for (int i = 0; i < TagsGroupLength; i++) {
        if (a.InnerTags[i] && a.InnerTags[i] == b.InnerTags[i])
            result++;
    }
    return result;
}

Теперь осталось только подсчитать похожесть нужной группы тегов с каждой существующей группой и выбрать пятьдесят наиболее похожих. Я поставил сам себе ещё условие обеспечить устойчивость выборки, т.е. в итоговой выборке будут пятьдесят групп тегов, у которых MeasureSimilarity выдал наибольший результат и при этом у групп тегов с одинаковым MeasureSimilarity меньший индекс будет у тех, у кого был меньший индекс в исходной существующей группе. Подробнее можно почитать, например, здесь: https://ru.wikipedia.org/wiki/Устойчивая_сортировка.
Для решения этой задачи я решил сделать класс SimilarTagsCalculator, вот его код:


SimilarTagsCalculator
class SimilarTagsCalculator {
    TagsGroup[] Groups { get; }

    public SimilarTagsCalculator(TagsGroup[] groups) {
        if (groups == null)
            throw new ArgumentException(nameof(groups));
        Groups = groups;
    }

    public TagsGroup[] GetFiftyMostSimilarGroups(TagsGroup value) {
        const int resultLength = 50;
//список, где хранятся наиболее похожие на данный момент группы тегов
        var list = new List<TagsSimilarityInfo>(resultLength);
//бежим по всем доступным группам тегов
        for (int groupIndex = 0; groupIndex < Groups.Length; groupIndex++) {
            TagsGroup tagsGroup = Groups[groupIndex];
//измеряем похожесть текущей группы с нужной
            int similarityValue = TagsGroup.MeasureSimilarity(value, tagsGroup);
//создаём инфо-объект сравнения
            TagsSimilarityInfo newInfo = new TagsSimilarityInfo(groupIndex, similarityValue);
//если текущая группа менее похожая, чем наименее похожая из найденных,
            if (list.Count == resultLength && list[resultLength - 1].CompareTo(newInfo) == -1) { 
                continue; //то нет смысла её добавлять
            }
//бинарным поиском ищем место, куда следует вставить инфо-объект сравнения
            int index = ~list.BinarySearch(newInfo);
            list.Insert(index, newInfo); //вставляем
            if (list.Count > resultLength) { 
//если количество элементов превысило нужное,
//то просто выталкиваем последний, т.к. он гарантированно хуже
                list.RemoveAt(resultLength);
            }
        }
        //конвертируем инфо-объекты в результат
        TagsGroup[] result = new TagsGroup[resultLength];
        for (int i = 0; i < resultLength; i++) {
            result[i] = Groups[list[i].Index];
        }
        return result;
    }
}

и структуру TagsSimilarityInfo:


TagsSimilarityInfo
struct TagsSimilarityInfo : IComparable<TagsSimilarityInfo>, IComparable {
    public int Index { get; }

    public int Similarity { get; }

    public TagsSimilarityInfo(int index, int similarity) {
        Index = index;
        Similarity = similarity;
    }

    public bool Equals(TagsSimilarityInfo other) {
        return Index == other.Index && Similarity == other.Similarity;
    }

    public override bool Equals(object obj) {
        return obj is TagsSimilarityInfo other && Equals(other);
    }

    public override int GetHashCode() {
        unchecked {
            return (Index * 397) ^ Similarity;
        }
    }

    public int CompareTo(TagsSimilarityInfo other) {
        int similarityComparison = other.Similarity.CompareTo(Similarity);
        return similarityComparison != 0 ? similarityComparison : Index.CompareTo(other.Index);
    }

    public int CompareTo(object obj) {
        if (ReferenceEquals(null, obj))
            return 1;
        return obj is TagsSimilarityInfo other ? CompareTo(other) : throw new ArgumentException($"Object must be of type {nameof(TagsSimilarityInfo)}");
    }
}

Я подготовил три бенчмарка для этого алгоритма:


  • полностью рандомный бенчмарк, т.е. количество выставленных тегов в группах случайно и группа тегов, с которой будем сравнивать, тоже случайна;
  • количество выставленных тегов в группах идёт по возрастающей, сравнивать будем с группой, в которой все теги выставлены. Получается самыми подходящими должны быть какие-то из последних групп тегов;
  • аналогичное, что и выше, но количество выставленных тегов идёт по убывающей. Самыми подходящими будут первые 50 групп тегов;

Вот результаты бенчмарка для миллиона групп:


BenchmarkDotNet=v0.11.5, OS=Windows 10.0.17134.765 (1803/April2018Update/Redstone4)
Intel Core i7-6700 CPU 3.40GHz (Skylake), 1 CPU, 8 logical and 4 physical cores
Frequency=3328126 Hz, Resolution=300.4694 ns, Timer=TSC
.NET Core SDK=3.0.100-preview5-011568
[Host]: .NET Core 3.0.0-preview5-27626-15 (CoreCLR 4.6.27622.75, CoreFX 4.700.19.22408), 64bit RyuJIT


Method Mean Error StdDev Allocated
RandomTest 25.054 s 0.1786 s 0.1670 s 1.53 KB
AscendantTest 4.180 s 0.0174 s 0.0162 s 1.53 KB
DescendantTest 4.147 s 0.0118 s 0.0104 s 1.53 KB

Разброс времени выполнения очень большой, к тому же 25 секунд — это очень долго, мой коллега не согласен столько ждать. Значит займёмся оптимизациями. Сейчас можно выделить три основных направления для ускорения программы:


  • метод MeasureSimilarity;
  • алгоритм в теле цикла в GetFiftyMostSimilarGroups;
  • сам цикл в GetFiftyMostSimilarGroups;

Будем рассматривать каждое из трёх направлений последовательно.


Предсказание ветвлений


Рассмотрим вначале метод MeasureSimilarity.


public static int MeasureSimilarity(TagsGroup a, TagsGroup b) {
    int result = 0;
    for (int i = 0; i < TagsGroupLength; i++) {
        if (a.InnerTags[i] && a.InnerTags[i] == b.InnerTags[i])
            result++;
    }
    return result;
}

В предыдущем бенчмарке очень большой разброс времени выполнения между рандомным тестом и любым из последовательных. Группы тегов для последовательных тестов создавались по такому принципу:


  • требуемое количество групп делилось на пакеты. Количество пакетов — максимальное количество тегов в группе;
  • для каждой группы в i-м пакете первые i тегов выставлялись;

Получается, что каждая группа тегов в этих тестах состоит из двух подряд идущих частей из выставленных и невыставленных тегов. В MeasureSimilarity есть все предпосылки для того, чтобы в текущих условиях предсказание ветвлений процессором оказывало значительный эффект. Чтобы это проверить достаточно написать бенчмарк, в котором сравнивается время работы MeasureSimilarity для случайных данных и для последовательных:


int GetSimilaritySum(TagsGroup[] tagsGroups) {
    int result = 0;
    foreach (TagsGroup tagsGroup in tagsGroups) {
        result += TagsGroup.MeasureSimilarity(tagsGroup, etalon);
    }
    return result;
}

[Benchmark]
public int Sorted() => GetSimilaritySum(sortedGroups);

[Benchmark]
public int Unsorted() => GetSimilaritySum(unsortedGroups);

Method Mean Error StdDev
Sorted 3.704 s 0.0411 s 0.0364 s
Unsorted 8.211 s 0.0381 s 0.0338 s

Тестировался миллион групп тегов, но в Sorted в каждой группе вначале шло несколько выставленных тегов, а потом невыставленные, а в Unsorted такое же количество выставленных тегов было случайно разбросано по всей группе.
Разница в 5 секунд впечатляет, и с этим надо что-то делать. Чтобы избавиться от влияния предсказания ветвлений и в целом ускорить метод, нужно избавиться от самих ветвлений. В MeasureSimilarity ветвление только одно — проверка на то, что в двух группах выставлены соответствующие теги. Давайте прикинем, в каких случаях условие будет истинным, для этого составим таблицу истинности условия:


a.InnerTags[i] b.InnerTags[i] Result
False False False
False True False
True False False
True True True

Таблица истинности полностью совпадает с логическим "И", т.е. результат истинный тогда и только тогда, когда оба тега истинны, значит условие можно сократить до: if (a.InnerTags[i] && b.InnerTags[i]). Но таким образом условие всё равно остаётся. На следующем этапе сделаем так, чтобы прибавление к result выполнялось всегда, для этого перепишем тело цикла вот так:


int t = a.InnerTags[i] && b.InnerTags[i] ? 1 : 0;
result += t;

Мы всё равно не избавились от условия и на самом деле даже сделали метод медленнее. Но зато теперь стало очевидным, что если тип у InnerTags изменить с bool на byte (1 для true, и 0 для false), то можно избавиться от условия в тернарном операторе. Тогда класс TagsGroup будет выглядеть вот так:


TagsGroup
public class TagsGroup {
    public const int TagsGroupLength = 4096;
    byte[] InnerTags { get; }

    public static int MeasureSimilarity(TagsGroup a, TagsGroup b) {
        int result = 0;
        for (int i = 0; i < TagsGroupLength; i++) {
            int t = a.InnerTags[i] & b.InnerTags[i];
            result += t;
        }
        return result;
    }

    public TagsGroup(bool[] innerTags) {
        if (innerTags == null)
            throw new ArgumentException(nameof(innerTags));
        if (innerTags.Length != TagsGroupLength) {
            throw new ArgumentException(nameof(innerTags));
        }
        InnerTags = new byte[TagsGroupLength];
        for (int i = 0; i < TagsGroupLength; i++) {
            InnerTags[i] = (byte) (innerTags[i] ? 1 : 0);
        }
    }
}

Вот результаты бенчмарка для обновленного MeasureSimilarity:


Method Mean Error StdDev
Sorted 3.180 s 0.0118 s 0.0111 s
Unsorted 3.236 s 0.0622 s 0.0764 s

было:
Method Mean Error StdDev
Sorted 3.704 s 0.0411 s 0.0364 s
Unsorted 8.211 s 0.0381 s 0.0338 s

а вот для обновленного главного бечмарка:


Method Mean Error StdDev Allocated
RandomTest 3.219 s 0.0492 s 0.0436 s 1.53 KB
AscendantTest 3.223 s 0.0117 s 0.0110 s 1.53 KB
DescendantTest 3.422 s 0.0697 s 0.0999 s 1.53 KB

было:
Method Mean Error StdDev Allocated
RandomTest 25.054 s 0.1786 s 0.1670 s 1.53 KB
AscendantTest 4.180 s 0.0174 s 0.0162 s 1.53 KB
DescendantTest 4.147 s 0.0118 s 0.0104 s 1.53 KB

По-моему, уже стало здорово. Для уверенных в том, что всё ускорение произошло только из-за того, что булевый тип был заменён на байт, я запустил бенчмарк для такого тела цикла:


int t = a.InnerTags[i] & b.InnerTags[i];
if (t == 1)
    result += t;

и вот какие получил результаты:


Method Mean Error StdDev
Sorted 3.760 s 0.0746 s 0.1541 s
Unsorted 8.628 s 0.1699 s 0.2382 s

Упаковывание данных


В каждой группе много тегов, и их количество никак нельзя уменьшить. Кроме того, необходимо сравнивать теги с одинаковыми индексами, и нельзя дать окончательный ответ, не проверив все теги. Значит, нам в любом случае придётся итерироваться по всей группе тегов. Было бы здорово суметь как-то распараллелить эту задачу, чтобы можно было за условно одну операцию обрабатывать несколько тегов. Можно сделать это через настоящее распараллеливание, а можно через специальную упаковку данных, чем и воспользуемся. Каждый тег сейчас представляет собой 1 или 0. В result просто накапливается результат операции "И". Но эту же логическую операцию можно применять и не только над однобитными числами. C# позволяет это делать без особых проблем вплоть до 64 битных чисел (можно и больше через BitArray, но это уже не то). Если представить две группы тегов как набор 64 битных чисел с выставленными соответствующими битами, то можно будет проводить операцию "И" над каждой такой группой 64 битных чисел. Непонятно только, что потом делать с результатом. Посмотрим ещё раз на тело цикла:


int t = a.InnerTags[i] & b.InnerTags[i];
result += t;

result увеличивается на 1 каждый раз, когда t == 1 и не изменяется когда t == 0. В итоге result станет равен тому, сколько раз результатом a.InnerTags[i] & b.InnerTags[i] была единица. Соответственно можно было бы сохранить все результаты a.InnerTags[i] & b.InnerTags[i] в какой-нибудь массив, и в result записать только количество единиц в этом массиве. При выполнении операции "И" над больше чем n-битными числами будет n-битный результат и достаточно будет только знать сколько выставленных бит среди n. Количество выставленных бит в числе неизменно, а значит можно предподсчитать эти числа. Предподсчитывать для 64 бит нет смысла, т.к. мы не найдём столько оперативной памяти. Для 32 бит уже можно найти пространство на современных компьютерах, но это по-прежнему очень много. Память под 16 бит найти несложно, но подсчёт будет относительно долгим. В качестве компромисса предподсчитаем для 8 битных чисел:


GenerateCountOfSettedBits
static readonly byte[] CountOfSettedBits = GenerateCountOfSettedBits();

static byte[] GenerateCountOfSettedBits() {
// в result для каждого i хранится сколько битов выставлено в i-м числе.
    byte[] result = new byte[256]; 
// вспомогательный массив, в нём будет представлено число i в виде битов, 
// будем к нему прибавлять единицу в столбик
    int[] b = new int[8];
// перебираем все восьмибитные числа
    for (int i = 1; i < 256; i++) {
//сбрасываем счётчик выставленных битов в текущем числе
        int settedBitsCount = 0;
//остаток, который надо прибавить к следующему биту
        int m = 1;
//бежим по битам
        for (int j = 0; j < 8; j++) {   
//прибавляем остаток к текущему биту
            b[j] += m;
//записываем в остаток, если число в текущем бите превысило 2.
            m = b[j] >> 1;
//записываем в текущий бит число без учёта остатка
            b[j] = b[j] & 1;        
//возможно, увеличиваем количество выставленных битов в текущем числе
            settedBitsCount += b[j];
        }
        result[i] = (byte) settedBitsCount; //записываем в ответ
    }
    return result;
}

теперь конструктор TagsGroup выглядит вот так:


const int BucketSize = 8;

public TagsGroup(bool[] innerTags) {
    if (innerTags == null)
        throw new ArgumentException(nameof(innerTags));
    if (innerTags.Length != TagsGroupLength) {
        throw new ArgumentException(nameof(innerTags));
    }
    int index = 0; //индекс среди исходных тегов
    InnerTags = new byte[TagsGroupLength / BucketSize];
//перебираем внутренние пакеты
    for (int i = 0; i < TagsGroupLength / BucketSize; i++) {
//перебираем теги в пределах пакета
        for (int j = 0; j < BucketSize; j++, index++) {
//увеличиваем текущий проект на 2, чтобы освободить место для нового
            InnerTags[i] <<= 1;
//выставляем бит в пакете
            InnerTags[i] += (byte) (innerTags[index] ? 1 : 0);
        }
    }
}

А MeasureSimilarity стал выглядеть вот так:


public static int MeasureSimilarity(TagsGroup a, TagsGroup b) {
    int result = 0;
    for (int i = 0; i < TagsGroupLength / BucketSize; i++) {
        int t = a.InnerTags[i] & b.InnerTags[i];
        result += CountOfSettedBits[t];
    }
    return result;
}

Можно запустить большой бенчмарк и убедиться, что всё стало лучше:


Method Mean Error StdDev Allocated
RandomTest 560.5 ms 8.285 ms 7.344 ms 1.53 KB
AscendantTest 570.1 ms 4.108 ms 3.431 ms 1.53 KB
DescendantTest 608.1 ms 5.691 ms 5.324 ms 1.53 KB

Можно ли сделать метод MeasureSimilarity ещё быстрее? Конечно! Для этого достаточно осознать тот факт, что регистры общего назначения сейчас в основном 64 битные, а мы гоняем в них восьмибитные данные. Для этого размер пакета, в которые пакуются исходные теги, увеличим до 64 бит и перепишем нужные методы:


const int BucketSize = 64;

ulong[] InnerTags { get; }

public static int MeasureSimilarity(TagsGroup a, TagsGroup b) {
    int result = 0;
    for (int i = 0; i < TagsGroupLength / BucketSize; i++) {
        ulong t = a.InnerTags[i] & b.InnerTags[i];
        for (int j = 0; j < BucketSize / 8; j++) {
            result += CountOfSettedBits[t & 255];
            t >>= 8;
        }
    }
    return result;
}

и получится:


Method Mean Error StdDev Allocated
RandomTest 533.3 ms 4.802 ms 4.492 ms 1.53 KB
AscendantTest 550.9 ms 5.435 ms 5.084 ms 1.53 KB
DescendantTest 567.6 ms 3.879 ms 3.439 ms 1.53 KB

Потом можно развернуть внутренний цикл:


MeasureSimilarity
public static int MeasureSimilarity(TagsGroup a, TagsGroup b) {
    int result = 0;
    for (int i = 0; i < TagsGroupLength / BucketSize; i++) {
        ulong t = a.InnerTags[i] & b.InnerTags[i];
        result += CountOfSettedBits[t & 255];
        t >>= 8;
        result += CountOfSettedBits[t & 255];
        t >>= 8;
        result += CountOfSettedBits[t & 255];
        t >>= 8;
        result += CountOfSettedBits[t & 255];
        t >>= 8;
        result += CountOfSettedBits[t & 255];
        t >>= 8;
        result += CountOfSettedBits[t & 255];
        t >>= 8;
        result += CountOfSettedBits[t & 255];
        t >>= 8;
        result += CountOfSettedBits[t & 255];
    }
    return result;
}

Method Mean Error StdDev Allocated
RandomTest 370.5 ms 2.802 ms 2.484 ms 1.53 KB
AscendantTest 395.8 ms 2.682 ms 2.509 ms 1.53 KB
DescendantTest 419.5 ms 3.352 ms 2.971 ms 1.53 KB

Можно ли ещё быстрее? Да! Если использовать нововведения из .NET Core 3.0. Хоть эта версия всё ещё в превью, но в ней с самого начала есть реализация некоторых интринсиков. В Intel Intrinsic Guide есть интринсик _mm_popcnt_u64. Который согласно описанию: "Count the number of bits set to 1 in unsigned 64-bit integer a, and return that count in dst.". Это же как раз то, чего мы пытаемся добиться! В .NET Core 3.0 Preview 5 этот интринсик реализован в System.Runtime.Intrinsics.X86.Popcnt.X64.PopCount(Как верно заметил в комментариях a-tk перед использованием интринсиков необходимо проверять, что процессор их поддерживает. В данном случае проверять условие System.Runtime.Intrinsics.X86.Popcnt.X64.IsSupported). С его использованием код метода MeasureSimilarity станет таким:


public static int MeasureSimilarity(TagsGroup a, TagsGroup b) {
    int result = 0;
    for (int i = 0; i < TagsGroupLength / BucketSize; i++) {
        ulong t = a.InnerTags[i] & b.InnerTags[i];
        result += (int) System.Runtime.Intrinsics.X86.Popcnt.X64.PopCount(t);
    }
    return result;
}

а время выполнения:


Method Mean Error StdDev Allocated
RandomTest 59.33 ms 1.148 ms 0.9585 ms 1.53 KB
AscendantTest 74.87 ms 1.479 ms 1.9748 ms 1.53 KB
DescendantTest 119.46 ms 2.321 ms 2.8509 ms 1.53 KB

Впечатляет.
Мне неизвестны способы, которые могут значительно ускорить MeasureSimilarity и при этом не сильно испортить читаемость. Думаю, с этим методом можно закончить.


Структуры данных


Разберёмся теперь с телом цикла в методе GetFiftyMostSimilarGroups:


GetFiftyMostSimilarGroups
public TagsGroup[] GetFiftyMostSimilarGroups(TagsGroup value) {
    const int resultLength = 50;
    List<TagsSimilarityInfo> list = new List<TagsSimilarityInfo>(50);
    for (int groupIndex = 0; groupIndex < Groups.Length; groupIndex++) {
        TagsGroup tagsGroup = Groups[groupIndex];
        int similarityValue = TagsGroup.MeasureSimilarity(value, tagsGroup);
        TagsSimilarityInfo newInfo = new TagsSimilarityInfo(groupIndex, similarityValue);
        if (list.Count == resultLength && list[resultLength - 1].CompareTo(newInfo) == -1) {
            continue;
        }
        int index = ~list.BinarySearch(newInfo);
        list.Insert(index, newInfo);
        if (list.Count > resultLength) {
            list.RemoveAt(resultLength);
        }
    }
    TagsGroup[] result = new TagsGroup[resultLength];
    for (int i = 0; i < resultLength; i++) {
        result[i] = Groups[list[i].Index];
    }
    return result;
}

Напомню коротко, что здесь происходит:


  • внутри list хранится отсортированный список пятидесяти наиболее подходящих групп тегов, фактически от меньшего к большему, если сравнивать TagsSimilarityInfo;
  • вставляем новую рассматриваемую группу в list с сохранением сортировки;
  • если элементов в list больше пятидесяти, то наименее похожую группу удаляем (её инфо-объект будет самым большим и будет находиться в самом конце list);

Т.е. получается, что нам надо очень быстро находить самый большой элемент коллекции, уметь быстро вставлять и удалять. Для решения таких задач существуют специальные структуры данных. Первое что приходит на ум — куча. У неё вставка выполняется за O(log N), получение максимума за O(1), удаление элемента за O(log N). Проблема только в том, что по куче нельзя проитерироваться по возрастанию элементов без её модификации. В BCL бинарной кучи нет, поэтому я написал её сам:


BinaryHeap
public class BinaryHeap<T>:IEnumerable<T> where T : IComparable<T> {
    readonly List<T> innerList;

    public BinaryHeap(int capacity) {
        innerList = new List<T>(capacity);
    }

    public int Count => innerList.Count;

    public T Max => innerList[0];

    public void Add(T value) {
        innerList.Add(value);
        int i = Count - 1;
        int parent = (i - 1) >> 1;

        while (i > 0 && innerList[parent].CompareTo(innerList[i]) == -1) {
            Swap(i, parent);

            i = parent;
            parent = (i - 1) >> 1;
        }
    }

    void Swap(int a, int b) {
        T temp = innerList[a];
        innerList[a] = innerList[b];
        innerList[b] = temp;
    }

    void Heapify(int i) {
        for (;;) {
            int leftChild = (i << 1) | 1;
            int rightChild = (i + 1) << 1;
            int largestChild = i;

            if (leftChild < Count && innerList[leftChild].CompareTo(innerList[largestChild]) == 1) {
                largestChild = leftChild;
            }

            if (rightChild < Count && innerList[rightChild].CompareTo(innerList[largestChild]) == 1) {
                largestChild = rightChild;
            }

            if (largestChild == i) {
                break;
            }
            Swap(i, largestChild);
            i = largestChild;
        }
    }

    public void RemoveMax() {
        innerList[0] = innerList[Count - 1];
        innerList.RemoveAt(Count - 1);
        Heapify(0);
    }

    public IEnumerator<T> GetEnumerator() {
        return innerList.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator() {
        return ((IEnumerable) innerList).GetEnumerator();
    }
}

соответствующую реализацию метода GetFiftyMostSimilarGroups можно будет найти в исходниках статьи (ссылка внизу).
Кроме кучи может подойти бинарное дерево поиска. Сбалансированное бинарное дерево поиска может обеспечить вставку за O(log N), получение максимума за O(log N), удаление элемента за O(log N). Плюс такой структуры в том, что по ней можно итерироваться в возрастающем порядке и, кроме того, красно-чёрное дерево поиска в BCL реализовано внутри SortedSet (в большом фреймворке получение максимума сильно медленнее, чем в .netcore 3.0, и аллоцирует память). Реализацию GetFiftyMostSimilarGroups для SortedSet можно будет найти в исходниках к статье.
Результаты бенчмарков всех трёх реализаций GetFiftyMostSimilarGroups:


Method Алгоритм сортировки Mean Allocated
RandomTest List 60.06 ms 1704 B
RandomTest SortedSet 65.46 ms 24384 B
RandomTest Heap 60.55 ms 2912 B
AscendantTest List 75.42 ms 1704 B
AscendantTest SortedSet 161.12 ms 9833424 B
AscendantTest Heap 86.87 ms 2912 B
DescendantTest List 119.23 ms 880 B
DescendantTest SortedSet 125.03 ms 3024 B
DescendantTest Heap 118.62 ms 2088 B

Исходная реализация с листом выигрывает почти везде по времени, и точно везде по памяти. Происходит это из-за того, что у алгоритма с листом вставка выполняется за O(log N) на поиск, и почти O(1) на вставку, т.к. копирование такого малого количества элементов происходит очень быстро, получение максимума за O(1), удаление элемента тоже за O(1), т.к. в .net удаление последнего элемента из листа заменяется на запись в последний элемент пустого значения (в .net core для структур даже ничего не записывается). Если бы требовалось выдать не 50, а допустим 1000 наиболее похожих групп, то, скорее всего, алгоритм с листом не подошёл бы. На самом деле всё это немножко спекулятивные рассуждения, т.к. можно каждый из алгоритмов ещё поднастроить.


Многопоточность


Теперь осталось попробовать улучшить сам цикл в GetFiftyMostSimilarGroups. На ум приходит только многопоточность. Идея в том, чтобы разбить весь список групп на несколько пакетов. В каждом пакете найти 50 наиболее похожих групп тегов, а потом среди них найти окончательные 50 самых похожих.
Многопоточная версия GetFiftyMostSimilarGroups выглядит вот так:


GetFiftyMostSimilarGroupsMultiThread
public TagsGroup[] GetFiftyMostSimilarGroupsMultiThread(TagsGroup value) {
    const int resultLength = 50;
//количество потоков, соответственно и количество пакетов
    const int threadsCount = 4;
//размер одного пакета
    int bucketSize = Groups.Length / threadsCount;
    var tasks = new Task<List<TagsSimilarityInfo>>[threadsCount];
    for (int i = 0; i < threadsCount; i++) {
        int leftIndex = i * bucketSize;             //начальный индекс в пакете
        int rightIndex = (i + 1) * bucketSize;      //конечный индекс в пакете
//Создаём и стартуем таску
        tasks[i] = Task<List<TagsSimilarityInfo>>.Factory.StartNew(() => GetFiftyMostSimilarGroupsMultiThreadCore(value, leftIndex, rightIndex));
    }
    Task.WaitAll(tasks);
//сюда сохраним результаты тасок
    var taskResults = new List<TagsSimilarityInfo>[threadsCount];
    for (int i = 0; i < threadsCount; i++) {
        taskResults[i] = tasks[i].Result;
    }
//собираем со всех тасок лучший результат
    return MergeTaskResults(resultLength, threadsCount, taskResults);
}

Метод GetFiftyMostSimilarGroupsMultiThreadCore выглядит один в один как обычный GetFiftyMostSimilarGroups:


GetFiftyMostSimilarGroupsMultiThreadCore
List<TagsSimilarityInfo> GetFiftyMostSimilarGroupsMultiThreadCore(TagsGroup value, int leftIndex, int rightIndex) {
    const int resultLength = 50;
    List<TagsSimilarityInfo> list = new List<TagsSimilarityInfo>(resultLength);
    for (int groupIndex = leftIndex; groupIndex < rightIndex; groupIndex++) {
        TagsGroup tagsGroup = Groups[groupIndex];
        int similarityValue = TagsGroup.MeasureSimilarity(value, tagsGroup);
        TagsSimilarityInfo newInfo = new TagsSimilarityInfo(groupIndex, similarityValue);
        if (list.Count == resultLength && list[resultLength - 1].CompareTo(newInfo) == -1) {
            continue;
        }
        int index = ~list.BinarySearch(newInfo);
        list.Insert(index, newInfo);
        if (list.Count > resultLength) {
            list.RemoveAt(resultLength);
        }
    }
    return list;
}

Самое интересное происходит в MergeTaskResults. Необходимо из инфо-объектов групп тегов в taskResults выбрать пятьдесят наиболее подходящих. Код, написанный здесь, очень похож на сортировку слиянием. Только сливаются не два массива, а threadsCount массивов, но основная идея такая же: на каждом этапе просматриваются некоторые элементы из каждого массива, из них выбирается наиболее подходящий, он записывается в ответ, и это всё повторяется:


MergeTaskResults
TagsGroup[] MergeTaskResults(int resultLength, int threadsCount, List<TagsSimilarityInfo>[] taskResults) {
    TagsGroup[] result = new TagsGroup[resultLength];
    int[] indices = new int[threadsCount]; 
    for (int i = 0; i < resultLength; i++) {
        int minIndex = 0; 
        TagsSimilarityInfo currentBest = taskResults[minIndex][indices[minIndex]];
        for (int j = 0; j < threadsCount; j++) {
            var current = taskResults[j][indices[j]];
            if (current.CompareTo(currentBest) == -1) {
                minIndex = j;
                currentBest = taskResults[minIndex][indices[minIndex]];
            }
        }
        int groupIndex = currentBest.Index;
        result[i] = Groups[groupIndex];
        indices[minIndex]++;
    }
    return result;
}

  • В indices хранятся индексы рассматриваемых позиций из taskResults;
  • minIndex — индекс у taskResults, в котором в данный момент находится наиболее подходящий элемент;
  • currentBest — инфо-объект текущей наиболее подходящей сейчас группы тегов;
  • current — инфо-объект рассматриваемой сейчас группы тегов;

И результаты итогового бенчмарка:


Method Mean Error StdDev Allocated
RandomTest 28.76 ms 0.5677 ms 1.414 ms 1.4 KB
AscendantTest 32.36 ms 0.8930 ms 2.591 ms 1.4 KB
DescendantTest 41.36 ms 0.8908 ms 2.626 ms 1.4 KB

А вот что было в начале:
Method Mean Error StdDev Allocated
RandomTest 25054 ms 1786 ms 1670 ms 1.53 KB
AscendantTest 4180 ms 174 ms 162 ms 1.53 KB
DescendantTest 4147 ms 118 ms 104 ms 1.53 KB

И этого удалось добиться всего лишь рассуждениями и поэтапной эволюцией существующего кода. Естественно код неидеален. Совершенно нет обработки ошибок, а половина методов упадёт, если исходные данные не будут кратны 4 или 50. Но, надеюсь, основные идеи понятны.


Весь код можно найти здесь

Комментарии (40)


  1. a-tk
    05.06.2019 09:49
    +1

    По поводу интринсиков: они будут работать, только если есть поддержка в процессоре. Если запустить этот код на процессоре, где инструкция не поддерживается, получите падение процесса с illegal instruction.
    По-хорошему, надо проверять доступность инструкции и делать fallback на код, который сделает то же самое на более базовом наборе инструкций.


    1. T-D-K Автор
      05.06.2019 09:52

      Знаю. Согласен. Спасибо. Сам писал про интринсики в .net core 3.0 статью. Но тут я решил упростить всё. В конце статьи тоже написано, что нет и обработки ошибок, и проверок. банально всё упадёт, если групп тегов меньше 50.


    1. Soarex16
      06.06.2019 18:01

      Если я правильно трактую документацию Intel и википедию, то _mm_popcnt_u32 была представлена в наборе команд SSE 4.2 в 2007 году и ныне поддерживается большинством процессоров. Так что в современных условиях проверка, конечно, с точки зрения логики важна, но несколько избыточна.
      Хотя при работе с интринсиками в первую очередь стоит ориентироваться на примерные характеристики целевого устройства (или набора устройств).


      1. T-D-K Автор
        06.06.2019 18:02

        Но упомянуть про проверку всё-таки стоило. Ничего плохого в этом нет.


        1. Soarex16
          06.06.2019 18:15

          Не могу не согласится, ибо у самого были случаи, когда находился пара пользователей с крайне специфичным железом :)
          В данном вопросе больше ориентировался на предположение, что данный код будет исполняться на сервере.


          1. T-D-K Автор
            06.06.2019 18:24

            Да. На сервере.
            На самом деле когда я писал это всё для коллеги, он думал гонять на большой фреймворке 4.7 и я туда никакие popcnt не вписывал. Потом показал ему, что на .netcore можно быстрее и он задумался о переезде или хотя бы отпочковывании некоторых функций на .netcore в отдельную библиотеку. Но по-моему там до сих пор ничего не решилось.


        1. Soarex16
          06.06.2019 18:25
          +1

          Прошу прощения, забыл про такую хитрую штуку (слышал о ней от разработчика из Microsoft, смотрел запись его выступления на dotnext), а именно — компилятор сам резолвит данное условное выражение в зависимости от таргета и подставляет соответствующий код. Так что здесь проверка абсолютно никакого оверхеда иметь не будет, но при этом будет полезной.
          Интересно было бы сравнить различные варианты fallback-ов на больших данных.


          1. T-D-K Автор
            06.06.2019 18:28

            Про это видео: www.youtube.com/watch?v=n3-j_sTtGb0? Да, вы правы. Я совсем забыл про то, что JIT может вырубить эти проверки.


            1. Soarex16
              06.06.2019 18:34

              Да, именно оно :)


      1. a-tk
        06.06.2019 20:07
        +1

        Так-то оно так, но есть нюанс. Запустят вашу сборку на каком-нибудь RaspberryPI на ARM, и оно кааак… Ну или на армовом сервере. Или ещё какую экзотику из коробки достанут.


  1. OlehR
    05.06.2019 14:52

    Мне кажиться что на SQL ету задачу можно решить и попроще, и точно будет не медленнее.
    Ведь дание точно есть в базе.


    1. T-D-K Автор
      05.06.2019 15:15

      Предложите решение я с интересом его рассмотрю.


      1. DrunkBear
        06.06.2019 17:40

        Мне почему-то тоже кажется, что словарь тезаурусов на SQL будет выполнять такой поиск за 1 запрос, к тому же, позволит группировать похожие по смыслу тэги как синонимичную связь.
        Но создание словаря и групп — ручная и рутинная работа + лишняя возня с классификацией видео (т.к. метаданные каждого видео нужно обработать, положить в бд и классифицировать ранее не встречавшиеся теги по группам и типам связей)
        Думаю, для миллиона видео это может быть overkill.
        PS зато, добавив немного других типов тезаурусов, можно очень быстро реализовывать функционал «максимально непохожее видео» — все связи омонимы, «частично похожее видео» и просто поиск по нескольким тэгам.


  1. komarovdd
    06.06.2019 14:30

    количество бит можно еще считать так:

    int CountOfBits(int a)
    {
        int result = 0;
        while(a)
        {
             result++;
             a=a&(a-1);
        }
    }
    

    сложность будет O от количества единичных бит
    можно было бы посмотреть на результат с таким подсчетом


    1. T-D-K Автор
      06.06.2019 18:03

      Рассмотрю этот вариант в ближайшее же время. Он скорее всего будет медленее, чем popcnt, но хочется сравнить с предподсчётом битов. Мой вариант составления CountOfSettedBits думаю точно медленее чем ваш код. Спасибо за идею!


      1. thealfest
        09.06.2019 14:15
        +1

        Set — неправильный глагол.


        1. T-D-K Автор
          09.06.2019 14:54

          Ценное замечание. Спасибо большое.


    1. Pochemuk
      06.06.2019 19:57
      +1

      Ну, если уж оптимизировать, то не result++, а ++result. Выполняется быстрее за счет того, что не надо сохранять значение до инкремента для предполагаемого возврата значения.

      Данный алгоритм можно найти в книге «Алгоритмические трюки для программистов», Г. Уоррен мл. Но там написано, что его эффективность проявляется только для «разреженных» чисел с малым количеством единиц. Впрочем, список тэгов таким и будет являться.


      1. a-tk
        06.06.2019 20:09
        +1

        Отброшенное (не использованное) значение будет заоптимизировано. Здесь не имеет значения такая микрооптимизация (которой ещё и нет).


        1. Pochemuk
          06.06.2019 20:33

          Ви, таки, не поверите, но я тоже не верил раньше.

          Я вот не программист, но когда прочитал про это, захотел убедиться лично. Скачал и установил Visual Studio. На C++ написал вложенные циклы из большого количества инкриментов тем и другим способом. Скомпилировал с включенным оптимизатором.

          Разница была такая существенная, что просто поразила.


          1. a-tk
            06.06.2019 22:16

            Вы их просто инкрементировали или результат куда-то сохраняли для дальнейших манипуляций? Предполагаю второе.


            1. Pochemuk
              06.06.2019 22:47

              Просто инкрементировал.


          1. T-D-K Автор
            06.06.2019 22:30

            Prefix 30.73 ms
            Postfix 30.86 ms


            Postfix


            00007fff`97e8617f mov rax,0FFFFFFFFFFFFFFFFh
            00007fff`97e86189 xor     edx,edx
            00007fff`97e8618b inc     edx
            00007fff`97e8618d lea     rcx,[rax-1]
            00007fff`97e86191 and     rax,rcx
            00007fff`97e86194 test    rax,rax
            00007fff`97e86197 jne     00007fff`97e8618b
            00007fff`97e86199 add     esi,edx

            Prefix


            00007fff`97e996ef mov rax,0FFFFFFFFFFFFFFFFh
            00007fff`97e996f9 xor     edx,edx
            00007fff`97e996fb inc     edx
            00007fff`97e996fd lea     rcx,[rax-1]
            00007fff`97e99701 and     rax,rcx
            00007fff`97e99704 test    rax,rax
            00007fff`97e99707 jne     00007fff`97e996fb
            00007fff`97e99709 add     esi,edx

            код
                public class Benchmark {
                    static int CountOfBitsPostfix(ulong a) {
                        int result = 0;
                        while (a != 0) {
                            result++;
                            a &= a - 1;
                        }
                        return result;
                    }
            
                    static int CountOfBitsPrefix(ulong a) {
                        int result = 0;
                        while (a != 0) {
                            ++result;
                            a &= a - 1;
                        }
                        return result;
                    }
            
                    [Benchmark]
                    public int Prefix() {
                        int result = 0;
                        for (int i = 0; i < 1000000; i++) {
                            result += CountOfBitsPrefix(ulong.MaxValue);
                        }
                        return result;
                    }
            
                    [Benchmark]
                    public int Postfix() {
                        int result = 0;
                        for (int i = 0; i < 1000000; i++) {
                            result += CountOfBitsPostfix(ulong.MaxValue);
                        }
                        return result;
                    }        
            
                }


            1. Pochemuk
              06.06.2019 22:49

              Ну что ж… Возможно, что я пользовался VS устаревшим оптимизатором. Она еще на WinXP работала.


      1. komarovdd
        06.06.2019 20:45

        оптимизировать, то не result++, а ++result

        Надо бы посмотреть на результирующий ассемблер, есть подозрение что итог будет одинаковый.

        эффективность проявляется только для «разреженных» чисел с малым количеством единиц

        А я так и написал, сложность будет O от количества единичных бит :)


    1. T-D-K Автор
      06.06.2019 21:52

      Результаты большого бенчмарка с вашим кодом в MeasureSimilarity:
      RandomTest и Descendant = 1.21 c, Ascendant = 1.1 с. Что всего лишь в два раза хуже, чем вариант с предподсчётом. Можно как минимум алгоритм предподсчёта заменить вашим. Спасибо!


  1. Pochemuk
    06.06.2019 15:12

    Чем больше тегов у видео совпадает, тем более они похожи.

    Не совсем понял смысл этого предложения, а в коде разбираться — опыта нет…

    Что значит «больше тегов»? Имеется в виду абсолютное число совпадающих тэгов (пар) или относительное?

    Поясню на примере…

    Вот есть два видео А и Б и у каждого ровно один тэг:

    А — #драма
    Б — #драма

    Как видим, число совпадающих пар тэгов — 1. Относительное — тоже 1.

    Но возьмем другую пару видео:

    В — #драма, #мелодрама, #криминал, #боевик
    Г — #драма, #мелодрама, #детектив, #нуар

    Абсолютное число совпадающих пар — 2.
    А вот относительное число можно подсчитать, аналогично доле совпадающих N-грамм в N-граммном анализе: 2*(чило_совпадающих_тэгов)/(число_тэгов_в_первом + число_тэгов_во_втором):

    2*2/(4+4)=0,5

    Т.е. абсолютное число совпадающих тэгов стало больше, а относительное — меньше.

    И мне кажется, пользоваться надо как раз относительной величиной, т.к. она учитывает не только чем фильмы похожи, но и насколько они различаются.


    1. T-D-K Автор
      06.06.2019 16:04

      Во втором вашем случае MeasureSimilarity вернёт 2. Моему коллеге показалось, что этого достаточно. Но и такой алгоритм как у вас неплохо ложится на интринсики и будет совсем чуть медленее текущего. Я вашу идею передам коллеге для экспериментов.


      1. Pochemuk
        06.06.2019 16:39

        Простите, не владею терминологией по теме. Интринсики — это куда?


        1. komarovdd
          06.06.2019 17:18

          Это одна из оптимизаций применённых автором. встроенная в компилятор функция.
          В данном случае функция подсчёта единичных бит в числе. Но как и говорилось, ваш подход не сильно изменит код и не повлияет на оптимизации. Но ваш подход идеалогически мне нравится больше.


        1. T-D-K Автор
          06.06.2019 17:48

          Выше вам уже ответили, но дополню, что определение можно почитать здесь:
          en.wikipedia.org/wiki/Intrinsic_function
          Рассматриваемый в статье интринсик можно найти тут: software.intel.com/sites/landingpage/IntrinsicsGuide/#othertechs=POPCNT&expand=4373
          Там есть псевдокод описания операции и много всего другого. Идея в том, что вся логика подсчёта количества выставленных битов заменяется на вызов одной ассемблерной инструкции popcnt r64, r64, а дальше оно уже как-то сделано в процессоре и отрабатывает быстрее, чем если это писать руками. Но процессор может не поддеживать эту инструкцию и нужны дополнительные проверки. В .NetCore 3.0 для всего этого сделали очень удобные механизмы.


  1. Peter03
    06.06.2019 17:20

    del


  1. iluxa1810
    07.06.2019 09:51

    На мой взгляд, можно было бы делегировать эту задачу SQL.

    1. Он оптимизирован для работы с большими массивами данных
    2. Многие моменты он сам оптимизирует. Например, видя индексы он может выбрать соединение слиянием. Может еще и параллелизм добавить


    Конечно при использовании SQL возникают вопросы лицензии…


    1. Pochemuk
      07.06.2019 11:12

      А по каким признакам предполагаете индексировать? По самим тэгам? Так их 4096. Многовато будет для индексации.


    1. a-tk
      07.06.2019 13:52
      +1

      Предложите реализацию, можно отдельной статьёй. Заодно было бы интересно сравнить быстродействие.


    1. Pochemuk
      07.06.2019 16:23
      +1

      Кстати, решить это на SQL было бы тоже занимательно.

      Конечно, индексировать по 4096 индексам невозможно. Ну не поддерживает SQL такого числа индексов у одной таблицы.
      Но можно попытаться сделать вот что: Т.к. многие тэги будут идти «в наборе», то можно сначала кластеризировать фильмы в несколько типичных кластеров. Вернее, не фильмы, а их наборы тэгов.

      И делать двухстадийную выборку. На первом шаге выбирать фильмы, которые могут принадлежать тому же кластеру, что и эталон, а потом уже из этой выборки выбрасывать лишние.

      Например, есть два кластера, в один из которых (К1) входит среди прочих тэг #драма, а в другой (К2) — тэг #фэнтези (тоже среди прочих). И много еще прочих кластеров, но они нам не интересны.

      Если эталонный фильм имеет эти два тэга, то в первичную выборку попадают только фильмы, принадлежащие хотя бы одному из этих кластеров. А потом уже в ней делаем более точное вычисление релевантности.


      1. Pochemuk
        07.06.2019 16:36

        P.S. Если говорить о кластеризации, то это само по себе интересная задача.

        Прежде всего тем, что кластеризировать придется в неевклидовом пространстве. Чтобы не заниматься этим в евклидовом, но 4096-мерном.

        Но этого мало… Дело в том, что сама метрика невнятная. Что принимать за расстояние между тэгами?

        Я вот предлагаю принять за расстояние величину -ln(K), где K — ДОЛЯ фильмов, в которых эти два тэга встретились одновременно.
        Тогда, если какие-то два тэга одновременно не встречаются никогда, то расстояние равно +Inf. А если встречаются только вместе постоянно, то расстояние равно 0 (они, как бы, эквивалентны).

        Может быть есть другие предложения по метрике?


      1. Pochemuk
        09.06.2019 11:27
        +1

        Немного тупанул… Забыл про такую удобную вещь, как расстояние редактирования.

        В случае использования хранимых процедур SQL, удобнее будет выдавать в выборку только фильмы, для которых расстояние Левенштейна с эталонным набором тэгов не превышает, допустим, 2.

        Правда, это не отменяет возможности (а может быть и необходимости) применения кластеризации для предварительной выборки.

        Кстати, для каждого тэга можно указывать его «вес» в соответствующем кластере тэгов. Т.е. долю фильмов/роликов с данным тэгом среди общего числа фильмов/роликов, обладающими хотя бы одним тэгом из этого же кластера тэгов.
        Тогда поиск можно начинать среди кластеров, имеющих наибольший вес в своем кластере. Это позволит лучше находить в первую очередь ролики по редким тэгам.

        Правда, как сделать эту приоритетность поиска средствами самого SQL — не знаю. Яжнепрограммист :)


  1. ignorance
    09.06.2019 16:16
    +1

    Немного жаль, что до AVX-512 нет метода для подсчета количества единичных бит даже в 128-битном регистре. Можно было бы на них тогда перейти.
    Единственное мелкое улучшение, которое я вижу — код
    list.Insert(index, newInfo);
    if (list.Count > resultLength) {
    list.RemoveAt(resultLength);
    }
    имеет смысл заменить на
    if (list.Count == resultLength) {
    list.RemoveAt(resultLength);
    }
    list.Insert(index, newInfo);

    В первом варианте список временно может вырасти до 51 элемента, что больше Capacity и повлечет перевыделение памяти.

    Раньше CompareTo() == -1 заменили бы на CompareTo() < 0, поскольку сравнение с 0 было на 1 инструкцию короче, но сейчас это вряд ли имеет значение.


    1. T-D-K Автор
      09.06.2019 16:19

      В первом варианте список временно может вырасти до 51 элемента, что больше Capacity и повлечет перевыделение памяти.

      Согласен, но только один раз и, имхо, можно пренебречь. Хотя соглашусь, что проглядел такую оптимизацию.


      CompareTo() == -1 заменили бы на CompareTo() < 0

      Тут надо смотреть что будет после JIT компиляции, но спасибо.