В этой статье разбирается решение задачи «Гистограммы» с контеста Route 256 от Ozon.
Ссылочки:
Assembler в Go: техники ускорения и оптимизации / Хабр
Руководство по ассемблеру Go / Хабр
Часть 1. Почему Go-ассемблер и векторизация могут быть полезны: идея для ускорения / Хабр
Условие задачи
Гистограммой является массив, каждый элемент которого указывает высоту столбика на соответствующей позиции. Две гистограммы считаются совпадающими, если при совмещении одной гистограммы с другой гистограммой, повёрнутой на угол 180°, получается ровный прямоугольник без наложений и пропусков.
Иллюстрация

Пример теста
1 2 4
1 3 4
1 4 3
Ответ: 1 пара
Тривиальное решение:
func profile(a []uint32) string {
n := len(a)
res := make([]uint32, n-1)
for i := 1; i < n; i++ {
res[i-1] = a[i] - a[i-1]
}
b := unsafe.Slice((*byte)(unsafe.Pointer(&res[0])), (n-1)*4)
return *(*string)(unsafe.Pointer(&b))
}
func reverseProfile(a []uint32) string {
n := len(a)
res := make([]uint32, n-1)
for i := n - 1; i > 0; i-- {
res[n-1-i] = a[i] - a[i-1]
}
b := unsafe.Slice((*byte)(unsafe.Pointer(&res[0])), (n-1)*4)
return *(*string)(unsafe.Pointer(&b))
}
func processHistograms(histos [][]uint32) int {
profCount := make(map[string]int)
for i := 0; i < len(histos); i++ {
p := profile(histos[i])
profCount[p]++
}
res := 0
for i := 0; i < len(histos); i++ {
rp := reverseProfile(histos[i])
res += profCount[rp]
// если профиль и реверс совпадают, не считать пару (a,a)
if profile(histos[i]) == rp {
res--
}
}
return res / 2
}
Мы вычисляем приросты гистограмм и находи соответствующие пары, приросты которых совпадают. Ничего сложного.
Это решение проходит тесты с контеста, но я решил попробовать ускорить вычисление с помощью SIMD.
Ускорить func profile
довольно просто.
// func calcDifferences(src []int32, dst []int32)
// комменты от GPT-4.1
TEXT ·calcDifferences(SB), $0-48
MOVQ src_base+0(FP), SI // Загружаем базовый адрес исходного массива
MOVQ src_len+8(FP), CX // Загружаем длину исходного массива
MOVQ dst_base+24(FP), DI // Загружаем базовый адрес массива назначения
CMPQ CX, $1 // Проверяем, если длина <= 1
JLE done // Если да, завершаем работу
DECQ CX // Длина dst = длина src - 1
XORQ AX, AX // Инициализируем счетчик индекса нулем
// Проверяем, достаточно ли элементов для AVX обработки (минимум 8)
CMPQ CX, $8
JL scalar_loop // Если меньше 8, переходим к скалярной обработке
// Вычисляем количество элементов для AVX обработки (кратно 8)
MOVQ CX, DX
ANDQ $-8, DX // Округляем вниз до ближайшего кратного 8
avx_loop:
VMOVDQU (SI)(AX*4), Y0 // Загружаем 8 элементов из src в YMM0
VMOVDQU 4(SI)(AX*4), Y1 // Загружаем следующие 8 элементов (со сдвигом +1)
VPSUBD Y0, Y1, Y2 // Вычисляем разницы (Y1 - Y0)
VMOVDQU Y2, (DI)(AX*4) // Сохраняем 8 результатов в dst
ADDQ $8, AX // Увеличиваем индекс на 8
CMPQ AX, DX // Сравниваем с количеством обработанных элементов
JL avx_loop // Продолжаем цикл, если есть элементы для AVX обработки
// Обрабатываем оставшиеся элементы скалярно
CMPQ AX, CX
JGE done // Если все элементы обработаны, завершаем
scalar_loop:
MOVL (SI)(AX*4), BX // Загружаем src[i] в регистр BX
MOVL 4(SI)(AX*4), DX // Загружаем src[i+1] в регистр DX
SUBL BX, DX // Вычисляем разницу: DX = src[i+1] - src[i]
MOVL DX, (DI)(AX*4) // Сохраняем результат в dst[i]
INCQ AX // Увеличиваем индекс
CMPQ AX, CX // Сравниваем с длиной массива
JL scalar_loop // Продолжаем, если индекс < длины
done:
RET // Возврат из функции
// Стандартная реализация на Go для проверки
func CalcDifferencesGo(arr []uint32) []uint32 {
if len(arr) <= 1 {
return []uint32{}
}
result := make([]uint32, len(arr)-1)
for i := 0; i < len(arr)-1; i++ {
result[i] = arr[i+1] - arr[i]
}
return result
}
//go:noescape
func calcDifferences(src []uint32, dst []uint32)
// CalculateGrowth - ускоренная реализация на SIMD
func CalcDifferencesASM(arr []uint32) []uint32 {
if len(arr) <= 1 {
return []uint32{}
}
if len(arr) <= 4 {
return CalcDifferencesGo(arr)
}
result := make([]uint32, len(arr)-1)
calcDifferences(arr, result)
return result
}
Как это работает
Мы сохраняем в векторный регистр Y0 восемь значений по 32 бита, а регистр Y1 со смешением на один элемент. Получив векторную разницу между регистрами, мы получим 8 элементов прироста гистограмм сразу.

«Хвост» данных, которые не войдут полностью в векторный регистр, обрабатываем скалярно.
Полученные результаты для длин гистограмм от 10 до 100000
Ускорение в ~2 раза (а хотелось бы в 4~8)
goos: windows
goarch: amd64
pkg: qwe
cpu: AMD Ryzen 5 8400F 6-Core Processor
Benchmark_profile/Go-10-12 67875990 19.03 ns/op
Benchmark_profile/SIMD-10-12 66618182 18.76 ns/op
Benchmark_profile/Go-100-12 13702243 92.77 ns/op
Benchmark_profile/SIMD-100-12 15909126 77.92 ns/op
Benchmark_profile/Go-1000-12 1294166 954.4 ns/op
Benchmark_profile/SIMD-1000-12 1835912 636.4 ns/op
Benchmark_profile/Go-10000-12 144406 7867 ns/op
Benchmark_profile/SIMD-10000-12 277306 4469 ns/op
Benchmark_profile/Go-100000-12 19545 63121 ns/op
Benchmark_profile/SIMD-100000-12 43426 28094 ns/op
Ускорение func reverseProfile
чуть сложнее
Нам нужно не только делать векторное вычитание, но и перестановку элементов в векторе.
// func calcReverseDifferences(src []int32, dst []int32)
// комменты от GPT-4.1
TEXT ·calcReverseDifferences(SB), $0-48
MOVQ src_base+0(FP), SI // SI = &src[0]
MOVQ src_len+8(FP), CX // CX = len(src)
MOVQ dst_base+24(FP), DI // DI = &dst[0]
CMPQ CX, $1 // Проверяем минимальную длину
JLE done
DECQ CX // CX = len(dst) = len(src)-1
MOVQ CX, AX // AX будет индексом в dst (идем с конца)
XORQ DX, DX // DX = индекс в src (идем с начала)
// Проверяем, достаточно ли элементов для AVX
CMPQ CX, $8
JL scalar_loop_start
// AVX-обработка (блоками по 8 элементов)
MOVQ CX, R8
ANDQ $-8, R8 // R8 = количество элементов, кратных 8
MOVQ R8, R9 // R9 = счетчик обработанных элементов
avx_loop:
// Загружаем текущие 8 элементов
VMOVDQU (SI)(DX*4), Y0 // Y0 = src[DX..DX+7]
// Загружаем следующие 8 элементов (сдвиг на 1)
VMOVDQU 4(SI)(DX*4), Y1 // Y1 = src[DX+1..DX+8]
// Вычисляем разницы
VPSUBD Y0, Y1, Y2 // Y2 = разницы
// Реверсируем порядок элементов
VPERM2I128 $0x01, Y2, Y2, Y3 // Меняем местами 128-битные половины
VPSHUFD $0x1B, Y3, Y3 // Реверсируем порядок в каждой половине
// Сохраняем в обратном порядке
MOVQ AX, R10
SUBQ $8, R10 // R10 = AX-7 (начальная позиция для записи)
VMOVDQU Y3, (DI)(R10*4) // Сохраняем 8 элементов
ADDQ $8, DX // Увеличиваем счетчик src
SUBQ $8, AX // Уменьшаем счетчик dst
CMPQ DX, R8 // Проверяем завершение AVX-цикла
JL avx_loop
scalar_loop_start:
DECQ AX
scalar_loop:
CMPQ DX, CX // Проверяем завершение
JGE done
MOVL (SI)(DX*4), BX // Загружаем src[i]
MOVL 4(SI)(DX*4), BP // Загружаем src[i+1]
SUBL BX, BP // Вычисляем разницу
MOVL BP, (DI)(AX*4) // Сохраняем в dst
INCQ DX // Следующий элемент src
DECQ AX // Предыдущая позиция dst
JMP scalar_loop
done:
RET
// Стандартная реализация на Go для проверки
func CalcReverseDifferencesGo(arr []uint32) []uint32 {
if len(arr) <= 1 {
return []uint32{}
}
result := make([]uint32, len(arr)-1)
for i := len(arr) - 1; i > 0; i-- {
result[len(arr)-1-i] = arr[i] - arr[i-1]
}
return result
}
//go:noescape
func calcReverseDifferences(src []uint32, dst []uint32)
// CalculateGrowth - обертка, которая создает массив назначения
func CalReversecDifferencesASM(arr []uint32) []uint32 {
if len(arr) <= 1 {
return []uint32{}
}
if len(arr) <= 4 {
return CalcReverseDifferencesGo(arr)
}
result := make([]uint32, len(arr)-1)
calcReverseDifferences(arr, result)
return result
}
Все также как и с calcDifferences
но другой порядок работы с индексами
VPERM2I128 $0x01, Y2, Y2, Y3
Эта инструкция берет два 256-битных регистра (в данном случае оба — Y2) и переставляет их 128-битные половины.
VPSHUFD $0x1B, Y3, Y3
Эта инструкция переставляет 32-битные элементы внутри каждой 128-битной половины регистра Y3.
Иллюстрация

Полученные результаты ускорения идентичны calcDifferences
goos: windows
goarch: amd64
pkg: qwe
cpu: AMD Ryzen 5 8400F 6-Core Processor
Benchmark_reverse_profile/Go-10-12 57465208 20.66 ns/op
Benchmark_reverse_profile/ASM-10-12 61966186 19.28 ns/op
Benchmark_reverse_profile/Go-100-12 11149742 107.8 ns/op
Benchmark_reverse_profile/ASM-100-12 15162312 76.59 ns/op
Benchmark_reverse_profile/Go-1000-12 1000000 1022 ns/op
Benchmark_reverse_profile/ASM-1000-12 1756675 679.6 ns/op
Benchmark_reverse_profile/Go-10000-12 129410 9519 ns/op
Benchmark_reverse_profile/ASM-10000-12 208200 4928 ns/op
Benchmark_reverse_profile/Go-100000-12 15706 76804 ns/op
Benchmark_reverse_profile/ASM-100000-12 38287 30263 ns/op
Итого совокупный прирост для набора гистограмм с 200000 парами составил
2.80 раза (180.07% быстрее)
func TestHistogramProcessing(t *testing.T) {
// Инициализация генератора случайных чисел
seed := time.Now().UnixNano()
rand.Seed(seed)
t.Logf("Используем seed: %d", seed)
const lengthHistograms = 10_000
const numHistograms = 10_000
const numPair = 20
const numRandom = 10000
var Histogram [][]uint32
for i := 0; i < numHistograms; i++ {
qwe, asd := generatePairedHistograms(lengthHistograms, numPair)
Histogram = append(Histogram, asd...)
Histogram = append(Histogram, qwe)
}
// Создаем случайных гистограмм
for i := 0; i < numRandom; i++ {
Histogram = append(Histogram, generateRandomHistogram(lengthHistograms))
}
// Замер времени для processHistograms
start1 := time.Now()
result1Paired := processHistograms(Histogram)
duration1 := time.Since(start1)
// Замер времени для processHistogramsAVX
start2 := time.Now()
result2Paired := processHistogramsAVX(Histogram)
duration2 := time.Since(start2)
if result1Paired != result2Paired {
t.Errorf("Результаты для парных гистограмм не совпадают: processHistograms=%d, processHistogramsAVX=%d",
result1Paired, result2Paired)
} else {
fmt.Printf("Результаты для парных гистограмм совпадают: %d \n", result1Paired)
t.Logf("Результаты для парных гистограмм совпадают: %d", result1Paired)
}
// Выводим замеры времени
t.Logf("Время выполнения processHistograms: %v", duration1)
t.Logf("Время выполнения processHistogramsAVX: %v", duration2)
// Вычисляем во сколько раз одна функция быстрее другой
var faster, slower time.Duration
var fasterName, slowerName string
if duration1 < duration2 {
faster, slower = duration1, duration2
fasterName, slowerName = "processHistograms", "processHistogramsAVX"
} else {
faster, slower = duration2, duration1
fasterName, slowerName = "processHistogramsAVX", "processHistograms"
}
speedup := float64(slower) / float64(faster)
percentDiff := (speedup - 1) * 100
t.Logf("%s быстрее %s примерно в %.2f раза (%.2f%% быстрее)",
fasterName, slowerName, speedup, percentDiff)
fmt.Printf("%s быстрее %s примерно в %.2f раза (%.2f%% быстрее)\n",
fasterName, slowerName, speedup, percentDiff)
}
en666ki
Отличная статья, подойдет как новеньким, так и опытным разработчикам на Go