PyTorch — это мощный и гибкий фреймворк для машинного обучения, широко используемый для создания нейронных сетей. Он особенно популярен благодаря простоте использования, динамическим вычислительным графам и богатой экосистеме инструментов для обучения моделей. Для использования этого фреймворка, часто достаточно поверхностно понимать работу алгоритмов машинного обучения.

Но Андрей Карпаты, известный исследователь в области ИИ, считает, что реализация алгоритмов с нуля позволяет понять их суть и детали работы, что сложно осознать, используя только готовые библиотеки. Это помогает развить интуицию для дальнейшего применения и улучшения методов. Андрей посвящает много собственного времени, чтобы объяснять ключевые принципы работы нейросетей в своих блогах и на своём ютуб-канале. Он также не раз подчеркивал, что на его курсе в Cтэнфорде есть задачи по реализации различных алгоритмов, например, обратное распространение.

Я хотел бы посвятить данную статью этой идеи, потому что мне самому особенно интересно копаться в алгоритмах глубокого обучения. Эта статья продолжение первой статьи

Сегодня мы:

  • добавим CNNBatchNormMaxPoolMinPool

  • реализуем RMSPropNaGAdam

  • добавим регуляризацию в loss-функцию

  • добавим новые функции активации

  • напишем DataLoader

Поехали!

Мы остановились на том, что сделали достаточно похожую на PyTorch обертку.

loss_fn = CrossEntropyLoss()
model = SimpleNet()
optim = SGD(model.parameters(), learning_rate = 0.01)

for i in range(100):
    output = model(input_x)
    loss = loss_fn(output, target_x)
    loss.backward()
    optim.step()

Давайте теперь попытаемся обучить на каких-нибудь реальных данных! Для этой задачи нам подойдет набор рукописных цифр MNIST. Но как нам эффективно обработать весь датасет? Вспомним как это делается в PyTorch. Мы создаем объект класса DataLoader - который позволяет разбить весь датасет на батчи и работает как итерируемый объект. Давайте продолжим создавать собственные реализации.

Давайте хранить ссылку на наш набор в переменной dataset. Например

import pandas as pd
import numpy as np
data_pd = pd.read_csv(path)
dataset = np.array(data_pd).astype(float)
np.random.shuffle(dataset) # перемешаем датасет
dataset.shape
>>> (5000, 785) # 1 значение класса + 784 значения пискелей

Dataloader

class DataLoader():
    def __init__(self, data, batch_size=64, shuffle=True, flatten = True):
        self.data = data
        self.index = 0 # индекс нужен для корректной итерации
        self.items = [] # здесь будем хранить наборы батчей
        self.flatten = flatten

		# определяем сколько раз сможем проитерировать весь набор
        self.max = data.shape[0] // batch_size + (1 if data.shape[0] % batch_size != 0 else 0)

        if shuffle == True:
	        # мешаем набор если захотел пользователь
            self.data = np.random.permutation(self.data)
		# создаем список всех батчей
        for _ in range(self.max):
            self.items.append(self.data[batch_size * _: batch_size * (_ + 1)])
            
    # превращаем в итерируемый объект
	def __iter__(self):
        return self

    # для циклов for и while
    def __next__(self):
        if self.index < self.max:
            value = self.items[self.index] # получаем батч
            self.index += 1
            if self.flatten: 
                # возвращаем в нашем случае либо ((64, 784), (64, 1))
                return value[:, 1:], value[:, 0].reshape(-1, 1)
            else:
		        # либо ((64, 28, 28), (64, 1)), то есть как изображение
                return value[:, 1:].reshape(value.shape[0], 28, 28), value[:, 0].reshape(-1, 1)
        else:
            self.index = 0
            raise StopIteration # выходим из цикла

Теперь вся загрузка набора данных по батчам выглядит так:

data_loader = DataLoader(dataset, batch_size=64, shuffle=True, flatten=True)

Давайте посмотрим на наш батч.

for x, target in dataloader:
	break
target.shape
>>> (64, 1)

Странно, у нас вроде 10 классов, а размер почему-то 1. Заглянем внутрь

target[0]
>>> array([8.])

Это номер класса, а мы ведь ждали, что будет [0, 0, 0, 0, 0, 0, 0, 0, 1, 0]
Тогда давайте обработаем такой случай внутри нашего класса CrossEntropyLoss

class CrossEntropyLoss:

    def __call__(self, logits, true):
        predicted = np.exp(logits) / np.sum(np.exp(logits), axis=1).reshape(-1, 1) # softmax
        self.predicted = np.array(predicted, copy=True) # сделаем копию входных матрицы для дальнейших вычислений
        ### Добавили эти строчки
        number_of_classes = predicted.shape[1]
		self.true = np.int_(np.arange(0, number_of_classes) == true) 
		###
        # вычисляем значение лосс-функции прямо по формуле
        self.loss = -1 * np.sum(self.true * np.log(self.predicted + 1e-5), axis=1)
        return self

Теперь попробуем обучить нашу нейронку. Создадим модель

class SimpleNet(Module):
    def __init__(self):
        super().__init__()
        self.linear1 = Linear(input_channels=784, output_channels=200, bias=True)
        self.linear2 = Linear(input_channels=200, output_channels=50, bias=True)
        self.linear3 = Linear(input_channels=50, output_channels=10, bias=True)
        self.relu = ReLU()

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.relu(x)
        x = self.linear3(x)
        return x

Инициализируем всё необходимое

loss_fn = CrossEntropyLoss()
model = SimpleNet()
optim = SGD(model.parameters(), learning_rate = 0.01)

Для нашего набора в 5000 экземпляров получится 5000 // 64 = 78 батчей. Пройдемся, например, 5 раз по нашему датасету, то есть у нас будет 5 эпох.

for i in range(5):
    for batch in data_loader:
        input_x, target = batch
        input_x = input_x / 255 # нормируем на [0, 1], иначе обучение может стать нестабильным
        output = model(input_x)
        loss = loss_fn(output, target)
        loss.backward()
        optim.step()
    print(loss.loss.mean(), i) # Берем среднее значение лосса по батчу 
>>>0.622 0
0.294 1
0.166 2
0.101 3
0.070 4

Значение loss-функции уменьшается, значит наша модель обучается на нашем наборе данных!

Давайте пойдем дальше и добавим новые слои! В первую очередь я хотел бы добавить свёрточные слои, так как именно они хорошо справляются с сложными изображениями. Только лишь на линейных слоях мы далеко не уйдем.

CNN

Операция свертки состоит в сворачивании изображения с помощью фильтра. Фильтр или kernel — это тоже матрица чисел.

Свертка изображения размера 4×4 фильтром размера 3×3: Получ енный результат называется картой активации. Свертка картинки шаг за шагом:

Convolution
Convolution

Заметим, что размер карты активации после свертки стал меньше, чем размер изначального изображения. Общая формула размера карты активации такая:

m = (i − f + 2 * padding) / stride + 1,
где:

  • m — размер карты активации;

  • i — размер изображения;

  • f — размер фильтра.

  • padding — отступ от края

  • stride — шаг фильтра

Рекомендую ознакомиться с хорошим ноутбуком, чтобы понять интуицию свёрток

Как будет выглядеть свёрточная сеть для классификации картинок:

У нас уже есть слои Flatten и Linear. Остается только Conv2d.

Давайте попробуем реализовать простую операцию свертки одной картинки и одного фильтра.

image = np.array([[0, 50, 0, 29],
        [0, 80, 31, 2], 
        [33, 90, 0, 75],
        [0, 9, 0, 95]
        ])
kernel = np.ones((3, 3))

image, kernel
>>>array([[ 0, 50,  0, 29],
        [ 0, 80, 31,  2],
        [33, 90,  0, 75],
        [ 0,  9,  0, 95]]),
 array([[1., 1., 1.],
        [1., 1., 1.],
        [1., 1., 1.]])
i = image.shape[0]
f = kernel.shape[0]
padding = 0
step = 1
m = (i - f + 2*padding) // step + 1
>>> 2
new_image = np.zeros((m, m))

for y in range(m):
    for x in range(m):
        start_x = x * step
        end_x = start_x + f
        start_y = y * step
        end_y = start_y + f
        new_image[y][x] = np.sum(image[start_y:end_y, start_x:end_x] * kernel)
new_image
>>> array([[284., 357.],
       [243., 382.]])

Проверим с библиотечной реализацией.

import scipy.signal
scipy.signal.fftconvolve(image, kernel, mode='valid')
>>>array([[284., 357.],
       [243., 382.]])

В дальнейшем мы будем использовать для операции свёртки именно scipy.signal.fftconvolve, потому что это весьма ресурсоемкая операция, а данный метод позволяет посчитать тоже самое быстрее с помощью быстрого преобразования Фурье!

Давайте попробуем цветную картинку, то есть не (height, width), а (channels, height, width)

Спойлер: есть два способа производить свертку многоканальной картинки

Мы будем использовать первый метод, так как он наиболее часто используемый.
Сгенерируем картинку и фильтр

image = np.random.randn(3, 7, 7)
kernel = np.ones((3, 3, 3))

Реализация операции почти никак не поменяется

i = image.shape[1]
f = kernel.shape[1]
padding = 0
step = 1
m = (i-f + 2*padding) // step +1
new_image = np.zeros((m, m))
for y in range(m):
    for x in range(m):
        start_x = x * step
        end_x = start_x + f
        start_y = y * step
        end_y = start_y + f
        new_image[y][x] = np.sum(image[:, start_y:end_y, start_x:end_x] * kernel)

Сравниваем!

np.allclose(new_image, scipy.signal.fftconvolve(image, kernel, mode='valid'))
>>> True

Круто! Теперь давайте сделаем операцию свёртки на нескольких картинках одним фильтром. Опять совсем немного усложним код

image = np.random.randn(10, 3, 7, 7)
kernel = np.ones((1, 3, 3, 3))

i = image.shape[-1]
f = kernel.shape[-1]
padding = 0
step = 1
m = (i-f + 2*padding) // step +1
number_of_images = image.shape[0]
new_image = np.zeros((number_of_images, m, m))
for image_n in range(number_of_images):
    for y in range(m):
        for x in range(m):
            start_x = x * step
            end_x = start_x + f
            start_y = y * step
            end_y = start_y + f
            new_image[image_n][y][x] = np.sum(image[image_n,:, start_y:end_y, start_x:end_x] * kernel)

Проверим

np.allclose(new_image, scipy.signal.fftconvolve(image, kernel, mode='valid').squeeze(axis=1))
>>> True

Ура, получилось, теперь давайте наоборот, пройдемся несколькими фильтрами по одной картинке!

image = np.random.randn(1, 3, 7, 7)
kernel = np.ones((5, 3, 3, 3))

i = image.shape[-1]
f = kernel.shape[-1]
step = 1
m = (i-f) // step +1
number_of_kernels = kernel.shape[0]
new_image = np.zeros((number_of_kernels, m, m))
for kernel_n in range(number_of_kernels):
    for y in range(m):
        for x in range(m):
            start_x = x * step
            end_x = start_x + f
            start_y = y * step
            end_y = start_y + f
            new_image[kernel_n][y][x] = np.sum(image[0, :, start_y:end_y, start_x:end_x] * kernel[kernel_n])

Снова сравниваем!

np.allclose(new_image, scipy.signal.fftconvolve(image, kernel, mode='valid').squeeze(axis=1))
>>> True

Это очень круто! А теперь давайте соберём это всё и реализуем операцию свертки нескольких фильтров на нескольких картинках!

image = np.random.randn(10, 1, 3, 7, 7)
kernel = np.ones((1, 5, 3, 3, 3))

i = image.shape[-1]
f = kernel.shape[-1]
padding = 0
step = 1
m = (i-f + 2*padding) // step +1
number_of_kernels = kernel.shape[1]
number_of_images = image.shape[0]
new_image = np.zeros((number_of_images, number_of_kernels, m, m))
for image_n in range(number_of_images):
    for kernel_n in range(number_of_kernels):
        for y in range(m):
            for x in range(m):
                start_x = x * step
                end_x = start_x + f
                start_y = y * step
                end_y = start_y + f
                new_image[image_n][kernel_n][y][x] = np.sum(image[image_n, 0, :, start_y:end_y, start_x:end_x] * kernel[0, kernel_n])
np.allclose(new_image, scipy.signal.fftconvolve(image, kernel, mode='valid').squeeze(axis=2))
>>> True

Очень крутой результат! Мы с вами смогли полностью реализовать одну из базовых и самых важных операций в нейронных сетях.

В PyTorch за операцию свертки отвечают слои nn.Conv1d, nn.Conv2d, nn.Conv3d - логика у них у всех одинаковая, только немного отличается обработка. Например, мы реализовали nn.Conv2d, если бы мы делали метод 2 на предыдущей картинке, то получили бы nn.Conv1d. Теперь давайте соберем все знания в один класс.

Note!
Давайте условимся, что картинка в этот слой будет подаваться в виде (batch, channels, height, width)

Например, 1 черно-белая картинка должна иметь формат (1, 1, 28, 28)
Также для операции в PyTorch можно использовать разные значения padding и stride. Можно сделать реализацию операции свёртки учитывая эти параметры, но я предлагаю этого не делать, так как это заметно усложнит реализацию!
Итак, padding = 0, stride=1 всегда

Conv2d

class Conv2d:

    def __init__(self, input_channels: int, output_channels: int, kernel_size: int, bias = True):
        self.bias = bias
        self.kernel_size = (output_channels, input_channels, kernel_size, kernel_size)
        self.filter_array = np.random.randn(1, *self.kerne_size) * 0.1 # домножаем на 0.1 чтобы начальные веса были совсем маленькими
        self.bias = np.random.randn(1, output_channels, 1, 1) * 0.1
        Parameter([self, self.filter_array, self.bias * bias])

    def __call__(self, x):
        self.x = np.expand_dims(x, axis=1) # сохраняем на всякий случай значение
        # x: (batch, channels, height, width) - > (batch, 1, channels, height, width)
        return scipy.signal.fftconvolve(np.expand_dims(x, axis=1), self.filter_array, mode='valid').squeeze(axis=2) + self.bias

Как всё красиво выглядит с использованием scipy.signal.fftconvolve, но мы с вами имеем право сделать такую замену, потому что мы разобрались в деталях реализации и убедились, что можем корректно сделать реализацию такой функции. Но конечно вам никто не запрещает заменить scipy.signal.fftconvolve на нашу реализацию.
В свёрточных слоях также используют bias, как в линейных слоях, чтобы получить дополнительную степень свободы при обучении. Метод .backward() добавим чуть позже. Рекомендую также ознакомиться с данным плейлистом, с данным плейлистом и с видео

Идём дальше. Предлагаю не ограничиваться одной функцией активации!

Sigmoid

class Sigmoid:

    def __init__(self):
        self.input = None
        Parameter([self, []])

    @staticmethod
    def sigmoid_(x):
        return 1 / (1 + np.exp(-x))

    def __call__(self, x):
        self.input = x
        return self.sigmoid_(x)

    def backward(self, input_matrix):
        return (1 - self.sigmoid_(self.input)) * self.sigmoid_(self.input) * input_matrix

Leaky Relu

class Leaky_relu:

    def __init__(self, a = 0.2):
        self.a = a
        Parameter([self, []])

    def __call__(self, x):
        self.input = np.array(x, copy=True)
        return x * ((1 + np.sign(x)) / 2 + self.a * (1 + np.sign(-x)) / 2)

    def derivative(self, input_matrix):
        return ((1 + np.sign(self.input)) / 2 + self.a * (1 + np.sign(-self.input)) / 2) * input_matrix
        

Tanh

class Tanh:

    def __init__(self):
        self.input = None
        Parameter([self, []])

    @staticmethod
    def tanh_(x):
        return (np.exp(2 * x) - 1) / (np.exp(2 * x) + 1)

    def __call__(self, x):
        self.input = np.array(x, copy=True)
        return self.tanh_(x)

    def derivative(self, input_matrix):
        return 1 - self.tanh_(self.input) * input_matrix

Добавим теперь регуляризацию, она позволяет контролировать величину весов модели.

class CrossEntropyLoss:

    def __init__(self, l1_reg = 0, l2_reg = 0):
        self.l1_reg = l1_reg
        self.l2_reg = l2_reg
    def backward(self):
        self.backward_list = []
        loss = self.predicted - self.true
        for index, layer in enumerate(Parameter.layers[::-1]):
            if type(layer).__name__ == 'Linear':
	            base = (layer.x.T @ loss) / loss.shape[0]
	            l1_term = self.l1_reg * np.sign(Parameter.calling[layer][0])
	            l2_term = self.l2_reg * Parameter.calling[layer][0]
                changes_w = base + l1_term + l2_term
			...
	...

Добавим теперь новые алгоритмы градиентного спуска

Nesterov Accelerated Gradient

Очень рекомендую ознакомиться с данными материалами первое и второе

class NAG:
    def __init__(self, model, learning_rate, momentum):
        self.model = model
        self.lr = learning_rate
        self.momentum = momentum
        self.last_grad_w = None # будем хранить здесь предыдущий градиент 
        self.last_grad_b = None # будем хранить здесь предыдущий градиент 
    def step(self):
	    if self.last_grad_w == None:
            self.last_grad_w = [0] * len(self.model._constructor_Parameter.layers) # храним для каждого слоя отдельно 
            self.last_grad_b = [0] * len(self.model._constructor_Parameter.layers)
        
        for index, layer in enumerate(self.model._constructor_Parameter.layers[::-1]):
            if type(layer).__name__ in ('Linear', 'Conv2d'):
                weight, bias = self.model._constructor_Parameter.calling[layer]
                weight_gradient, bias_gradient = layer.backward_list[0], layer.backward_list[1]
                # можно сказать, что это скальзящее среднее для градиентов
                self.last_grad_w[index] = - self.lr * weight_gradient + self.momentum * self.last_grad_w[index]
                self.last_grad_b[index] = - self.lr * bias_gradient + self.momentum * self.last_grad_b[index]
                # обвновляем веса
                new_weight = weight + self.last_grad_w[index]
                new_bias = bias + self.last_grad_и[index]
                # меняем на обновленные веса
                self.model._constructor_Parameter.calling[layer] = [new_weight, new_bias]

RMSProp

Очень рекомендую ознакомиться с данными материалами первое и второе

class RMSProp:
    def __init__(self, model, learning_rate, ro):

        self.model = model
        self.lr = learning_rate
        if ro &lt; 0 or ro &gt; 1:
            raise Exception("Incorrect ro value")
        self.ro = ro
        self.grad_velocity_w = None
        self.grad_velocity_b = None

    def step(self):

        if self.grad_velocity_w== None:
            self.grad_velocity_w = [0] * len(self.model._constructor_Parameter.layers)
            self.grad_velocity_b = [0] * len(self.model._constructor_Parameter.layers)
        for index, layer in enumerate(self.model._constructor_Parameter.layers[::-1]):
            if type(layer).__name__ in ('Linear', 'Conv2d'):
                weight, bias = self.model._constructor_Parameter.calling[layer]
                weight_gradient, bias_gradient = layer.backward_list[0], layer.backward_list[1]
                
                self.grad_velocity_w[index] = self.ro * self.grad_velocity_w[index] + (1 - self.ro) * weight_gradient ** 2
                self.grad_velocity_b[index] = self.ro * self.grad_velocity_b[index] + (1 - self.ro) * bias_gradient ** 2

                new_weight = weight - self.lr * weight_gradient / np.sqrt(self.grad_velocity_w[index] + 1e-5)
                new_bias = bias - self.lr * bias_gradient / np.sqrt(self.grad_velocity_b[index] + 1e-5)
                
                self.model._constructor_Parameter.calling[layer] = [new_weight, new_bias]

Adam

Снова очень рекомендую ознакомиться с данными материалами первое и второе
Грубо говоря adam это сумма rmsprop и nag, поэтому просто «сложим» 2 реализации

class Adam:
    def __init__(self, model, learning_rate, momentum, ro):
        self.model = model
        self.lr = learning_rate
        self.momentum = momentum
        self.last_grad_w = None
        self.last_grad_b = None

        if ro &lt; 0 or ro &gt; 1:
            raise Exception("Incorrect ro value")

        self.ro = ro
        self.grad_velocity_w = None
        self.grad_velocity_b = None


    def step(self):

        if self.last_grad_w == None:
            self.last_grad_w = [0] * len(self.model._constructor_Parameter.layers)
            self.last_grad_b = [0] * len(self.model._constructor_Parameter.layers)
            self.grad_velocity_w = [0] * len(self.model._constructor_Parameter.layers)
            self.grad_velocity_b = [0] * len(self.model._constructor_Parameter.layers)

        for index, layer in enumerate(self.model._constructor_Parameter.layers[::-1]):
            if type(layer).__name__ in ('Linear', 'Conv2d'):
                weight, bias = self.model._constructor_Parameter.calling[layer]
                weight_gradient, bias_gradient = layer.backward_list[0], layer.backward_list[1]
                
                self.grad_velocity_w[index] = self.ro * self.grad_velocity_w[index] + (1 - self.ro) * weight_gradient ** 2
                self.grad_velocity_b[index] = self.ro * self.grad_velocity_b[index] + (1 - self.ro) * bias_gradient ** 2
                
                self.last_grad_w[index] = - self.lr * weight_gradient + self.momentum * self.last_grad_w[index]
                self.last_grad_b[index] = - self.lr * bias_gradient + self.momentum * self.last_grad_b[index]

                new_weight = weight + self.last_grad_w[index] / np.sqrt(self.grad_velocity_w[index] + 1e-5)
                new_bias = bias + self.last_grad_b[index] / np.sqrt(self.grad_velocity_b[index] + 1e-5)
                
                self.model._constructor_Parameter.calling[layer] = [new_weight, new_bias]

Обучение

Итак, мы добавили все необходимое. Осталось только добавить обновление весов для Conv2d. Сейчас будет немного сложновато, но у нас всё получится!

Я очень рекомендую ознакомиться с данными материалами, в них ребята на пальцах рассказывают про расчет обратного распространения через свёрточный слой. Я буду пользоваться их результатами

Давайте проверим корректность этих формул в общем случае с помощью scipy.signal.fftconvolve

image = np.random.randn(10, 1, 3, 7, 7) # Наша входная картинка
kernel = np.random.randn(1, 5, 3, 3, 3) # Наши фильтры
bias = np.random.randn(1, 5, 1, 1) # наш bias 
result = scipy.signal.fftconvolve(image, kernel, mode='valid') #(10, 5, 1, 5, 5) # Предположим, что это наш dl/dx - по размерности он совпадает как раз

Считаем градиент для фильтров по формуле с картинки

scipy.signal.fftconvolve(image, result, mode='valid').shape
>>> (1, 5, 3, 3, 3)

Видим, что совпадает с размерностью фильтров. Идём дальше
Считаем градиент для bias по формуле с картинки

result.sum(axis=(0, 2, 3, 4))
>>> (5,)

У нас также всё совпадает благодаря broadcasting

Считаем градиент для дальнейшего прохождения по формуле с картинки

rot = np.transpose(kernel, (0, 1, 2, 4, 3)) # поменяли местами 2 последние оси
pad = np.pad(result, ((0, 0), (0, 0), (0, 0), (2, 2), (2, 2)), 'constant', constant_values=(0)) # Заполнили 2 последние размерности 0 слева и справа

Пример использования np.pad

tmp = np.random.randn(2, 2)
np.pad(tmp, ((1, 1), (1, 1)), 'constant', constant_values=(0))
>>>array([[ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  1.93470843, -1.40590807,  0.        ],
       [ 0.        , -0.61160614,  0.15255254,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ]])
scipy.signal.fftconvolve(pad, rot, mode='valid').shape
>>> (10, 1, 3, 7, 7)

И у нас всё опять сошлось с размерностью image. Значит мы можем использовать эти формулы для расчетов градиентов. Давайте добавим наконец-то вычисления в класс CrossEntropyLoss

class CrossEntropyLoss:

    def backward(self):
        for index, layer in enumerate(Parameter.layers[::-1]):
            elif type(layer).__name__ == 'Conv2d':
                expanded_loss = np.expand_dims(loss, axis=2)
                changes_w = scipy.signal.fftconvolve(layer.x, expanded_loss, mode='valid')
                changes_b = loss.sum(axis=(0, 2, 3)) * self.bias_flag
                layer.backward_list = [changes_w, changes_b]
            
                rotated_filters = np.transpose(layer.filter_array, (0, 1, 2, 4, 3))
                pad = layer.kernel_size[-1]
                padded_loss = np.pad(expanded_loss, ((0, 0), (0, 0), (0, 0),  (pad - 1, pad - 1), (pad - 1, pad - 1)), 'constant', constant_values=(0))

                loss = scipy.signal.fftconvolve(padded_loss, rotated_filters, mode='valid').squeeze(axis=1)
                

Готово! Оказалось совсем несложно, да?)

Теперь давайте немного поменяем алгоритм нашего класса Flatten. Он будет применяться после всех сверточных слоёв чтобы превратить вектор из (batch, channels, height, width) в (batch, channels * height * width) и отправить этот вектор в линейный слой. Также добавим для него метод .backward, потому что он уже является промежуточным слоем.

class Flatten:
    def __init__(self): 
        Parameter([self, []])
    def __call__(self, x):
		self.init_shape = x.shape
        return x.reshape(self.init_shape[0], -1)
    def backward(self, input_matrix):
	    return input_matrix.reshape(self.init_shape)

class CrossEntropyLoss:
    def backward(self):
	    for index, layer in enumerate(Parameter.layers[::-1]):
	        ...
			elif type(layer).__name__ == 'Flatten':
                loss = layer.backward(loss)

Всё готово! Давайте собирать и обучать нашу свёрточную нейронку!

class SimpleNet(Module):
    def __init__(self):
        super().__init__()
        self.conv1 = Conv2d(input_channels = 1, output_channels = 4, kernel_size=5) #28 -> 24
        self.conv2 = Conv2d(input_channels = 4, output_channels = 8, kernel_size=5) #24 -> 20
        self.conv3 = Conv2d(input_channels = 8, output_channels = 16, kernel_size=5) #20 -> 16
        self.flatten = Flatten()
        self.linear1 = Linear(input_channels=16 * 16 * 16, output_channels=200, bias=True)
        self.linear2 = Linear(input_channels=200, output_channels=50, bias=True)
        self.linear3 = Linear(input_channels=50, output_channels=10, bias=True)
        self.relu = ReLU()

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.relu(x)
        x = self.flatten(x)
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.relu(x)
        x = self.linear3(x)
        return x
        
loss_fn = CrossEntropyLoss(l1_reg=0.1, l2_reg=0.1)
model = SimpleNet()
optim = Adam(model.parameters(), learning_rate = 0.001, momentum=0.9, ro=0.9)

for i in range(5):
    for index, batch in enumerate(data_loader):
        input_x, target = batch
        input_x = input_x / 255
        input_x = np.expand_dims(input_x, axis=1) # (64, 28, 28) -> (64, 1, 28, 28)
        output = model(input_x)
        loss = loss_fn(output, target)
        loss.backward()
        optim.step()

        if index % 1 == 0:
            print(loss.loss.mean(),"index:", index)
        
    print(loss.loss.mean(), "epoch:", i)

>>>7.968706513917543 index: 0
7.551666321134583 index: 1
7.278343803030227 index: 2
6.431693753643258 index: 3
5.637298937703794 index: 4
5.633115375065631 index: 5
5.00220357224557 index: 6
5.925713000464821 index: 7
5.045218328819374 index: 8
5.103600182770298 index: 9
4.13759075103372 index: 10
3.576208929225664 index: 11
3.320147584705806 index: 12
2.995831081105269 index: 13
2.8976790588242745 index: 14
2.8176135148843797 index: 15
2.7363129802870807 index: 16
2.297198679475752 index: 17
2.310984252722919 index: 18
2.2726362928678476 index: 19
2.11617617635163 index: 20
2.23533730800758 index: 21
2.064323468827378 index: 22
1.9638605959573385 index: 23
2.1304361443736948 index: 24
2.0881922952210124 index: 25
1.9733052417437202 index: 26
2.099729829345195 index: 27
2.0752790945980193 index: 28

Отлично, модель кое-как обучается, только вот значение loss-функции в какой-то момент перестает изменятся. Возможно причиной этого может быть слишком сложная задача или слишком сложная модель (в одном только linear1 — 16*16*16*200 = 800_000 весов)

Давайте упростим немного задачу нашей модели, введя еще один слой — MaxPool

MaxPool

Pooling — это операция уменьшения карт активации. Делается она очень просто:

  • Берем карту активации, делим ее на квадраты размера два на два;

  • Из каждого квадрата два на два берем только одно число: максимальное число в этом квадрате;

  • Записываем эти числа вместо каждого квадрата два на два. Так мы из карты активации размера (n x n) получаем карту активации размера (n/2 x n/2)

Такая операция называется MaxPooling с ядром размера 2. Ядро размера 2, потому что мы делили карту активации на квадраты размера 2х2.

class MaxPool:

    def __init__(self, kernel_size: tuple):
        self.kernel_size = kernel_size
        Parameter([self, []])

    def __call__(self, x):
        array = x.copy()
        result_full = np.zeros((array.shape[0], array.shape[1], int(array.shape[2] / self.kernel_size[0]), int(array.shape[3] / self.kernel_size[1])))
        for k in range(array.shape[0]):
            for m in range(array.shape[1]):
                result = []
                self.i = 0
                while self.i < array[k][m].shape[0] - self.kernel_size[0] + 1:
                    self.j = 0
                    while self.j < array[k][m].shape[1] - self.kernel_size[1] + 1:
                        result.append(np.max(array[k][m][self.i:self.i + self.kernel_size[0], self.j:self.j + self.kernel_size[1]]))
                        array[k][m][self.i:self.i + self.kernel_size[0], self.j:self.j + self.kernel_size[1]] = (array[k][m][self.i:self.i + self.kernel_size[0], self.j: self.j + self.kernel_size[1]]) * [array[k][m][self.i:self.i + self.kernel_size[0],
                                                                                                      self.j:self.j +self.kernel_size[1]] == np.max(array[k][m][self.i:self.i +self.kernel_size[0], self.j:self.j +self.kernel_size[1]])]

                        self.j += self.kernel_size[1]
                    self.i += self.kernel_size[0]

                result_full[k][m] = np.array(result).reshape(int(array[k][m].shape[0] / self.kernel_size[0]), int(array[k][m].shape[1] / self.kernel_size[1]))

        self.array = array
        return result_full

Как видим, логика простая, а реализация не очень. Давайте заменим на более короткую функцию skimage.measure.block_reduce и сравним результаты

import skimage

class MaxPool2d:
    def __init__(self, kernel_size: tuple):
        self.kernel_size = kernel_size
        Parameter([self, []])

    def __call__(self, x):
	    self.x = np.array(x, copy=True)
	    return skimage.measure.block_reduce(a, (1, 1, *self.kernel_size), np.max)

class MinPool2d:
    def __init__(self, kernel_size: tuple):
        self.kernel_size = kernel_size
        Parameter([self, []])

    def __call__(self, x):
	    self.x = np.array(x, copy=True)
	    return skimage.measure.block_reduce(a, (1, 1, *self.kernel_size), np.min)
a = np.random.randn(10, 20, 30, 30)
maxpool1 = MaxPool1((2, 2)) # Наша реализация
maxpool2 = MaxPool2((2, 2)) # Реализация Scimage
np.allclose(maxpool1(a), maxpool2(a))
>>>True
a = np.random.randn(4, 19, 20, 20)
maxpool1 = MaxPool1((5, 5)) # Наша реализация
maxpool2 = MaxPool2((5, 5)) # Реализация Scimage
np.allclose(maxpool1(a), maxpool2(a))
>>>True

Раз они одинаковые, давайте оставим более короткую реализацию. Углубляться в свою реализацию я не стану, так как она довольная простая, просто немного запутанная.

Теперь добавим backprop для max-min pool. Логика простая, инициализируем нулевую матрицу, и на тех местах, где было наибольшее ( или наименьше в случае MinPool) значение в окне меняем 0 на 1. Всё просто, но реализация всё также немного запутанная! Я верю, что можно сделать более эффективно, но пока не хочу останавливаться на этом!


class CrossEntropyLoss:
    def backward(self):
	    for index, layer in enumerate(Parameter.layers[::-1]):
	        ...
			elif type(layer).__name__ == 'MaxPool2d':
			                new_shape = np.zeros(layer.x.shape)
			                for k in range(layer.x.shape[0]):
			                    for m in range(layer.x.shape[1]):
			                        inx_ = 0
			                        inx__ = 0
			                        layer.i = 0
			                        while layer.i &lt; layer.x[k][m].shape[0] - layer.kernel_size[0] + 1:
			                            layer.j = 0
			                            inx__ = 0
			                            while layer.j &lt; layer.x[k][m].shape[1] - layer.kernel_size[1] + 1:
			                                new_shape[k][m][layer.i:layer.i + layer.kernel_size[0], layer.j:layer.j + layer.kernel_size[1]] = loss[k][m][inx_][inx__]
			                                inx__ += 1
			                                layer.j += layer.kernel_size[1]
			
			                            inx_ += 1
			                            layer.i += layer.kernel_size[0]
			
			                loss = np.squeeze([layer.x &gt; 0] * new_shape, axis=0)
			# Для minpool аналогично, только поменять знак

Теперь собираем

class SimpleConvNet(Module):
    def __init__(self):
        super().__init__()
        self.conv1 = Conv2d(input_channels = 1, output_channels = 4, kernel_size=3) #28 -> 26
        self.maxpool1 = MaxPool2d(kernel_size=(2,2)) # 26 -> 13
        self.conv2 = Conv2d(input_channels = 4, output_channels = 8, kernel_size=4) #13 -> 10
        self.maxpool2 = MaxPool2d(kernel_size=(2,2)) # 10 -> 5
        self.flatten = Flatten()
        self.linear1 = Linear(input_channels= 5 * 5 * 8, output_channels=50, bias=True)
        self.linear2 = Linear(input_channels=50, output_channels=10, bias=True)
        self.relu = ReLU()

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.maxpool2(x)
        x = self.flatten(x)
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        return x

loss_fn = CrossEntropyLoss(l1_reg=0.001, l2_reg=0.001)
model = SimpleConvNet()
optim = Adam(model.parameters(), learning_rate = 5e-3, momentum=0.9, ro=0.9)

for i in range(5):
    y_pred_list = []
    y_true_list = []
    for index, batch in enumerate(data_loader):
        input_x, target = batch
        input_x = input_x / 255
        input_x = np.expand_dims(input_x, axis=1) # (64, 28, 28) -&gt; (64, 1, 28, 28)
        output = model(input_x)
        loss = loss_fn(output, target)
        loss.backward()
        optim.step()

        y_pred_list.extend(output)
        y_true_list.extend(np.int_(np.arange(0, 10) == target))

        if index % 20 == 0:
            print(f"loss: {loss.loss.mean():.2f}","index:", index, 'acc:', f"{accuracy(np.array(y_true_list), np.array(y_pred_list)):.2f}")
        
    print(f"loss: {loss.loss.mean():.2f}", "epoch:", i, 'acc:', f"{accuracy(np.array(y_true_list), np.array(y_pred_list)):.2f}")

>>>loss: 2.94 index: 0 acc: 0.08
loss: 2.02 index: 20 acc: 0.23
loss: 1.76 epoch: 0 acc: 0.31
loss: 1.67 index: 0 acc: 0.53
loss: 1.06 index: 20 acc: 0.60
loss: 1.05 index: 40 acc: 0.63
loss: 0.98 index: 60 acc: 0.65
loss: 1.05 epoch: 1 acc: 0.65
loss: 0.98 index: 0 acc: 0.72
loss: 0.92 index: 20 acc: 0.64
loss: 1.27 index: 40 acc: 0.62
loss: 1.04 index: 60 acc: 0.62
loss: 0.97 epoch: 2 acc: 0.63
loss: 0.73 index: 0 acc: 0.77

Отлично! У нас всё обучается!

Теперь давайте добавим еще один метод регуляризации BatchNorm. Будем применять его после линейных слоев. Вычисление производной для BatchNorm. Здесь опять же просто механическая работа в виде следования формулам!

class BatchNorm2d:

    def __init__(self, size):
        self.conv = False
        Parameter([self, np.ones((size)), np.ones((size))])

    def __call__(self, x):
		self.mean = np.mean(x, axis=(0))
		self.std = np.std(x, axis=(0)) + 0.0001
		self.x = (x - self.mean) / self.std
		return Parameter.calling[self][0] * self.x + Parameter.calling[self][1]


class CrossEntropyLoss:
    def backward(self):
		    for index, layer in enumerate(Parameter.layers[::-1]):
	        ...
				layer.backward_list = [np.sum(loss * layer.x, axis = 0), np.sum(loss, axis = 0)]
				dl_dx = loss * Parameter.calling[layer][0]
				dl_dstd = np.sum(dl_dx * (layer.x * layer.std) * (-1/2) / (layer.std ** 3), axis = 0)
				dl_dmean = np.sum(dl_dx * (- 1 / layer.std), axis = 0) + dl_dstd * (np.sum(-2 * layer.x * layer.std, axis = 0) / len(layer.x))
				loss = dl_dx / layer.std + dl_dstd * 2 * (layer.x * layer.std) / len(layer.x) + dl_dmean / len(layer.x)

Обучаем

class SimpleConvNet(Module):
    def __init__(self):
        super().__init__()
        self.conv1 = Conv2d(input_channels = 1, output_channels = 4, kernel_size=3) #28 -> 26
        self.maxpool1 = MaxPool2d(kernel_size=(2,2)) # 26 -> 13
        self.conv2 = Conv2d(input_channels = 4, output_channels = 8, kernel_size=4) #13 -> 10
        self.maxpool2 = MaxPool2d(kernel_size=(2,2)) # 10 -> 5
        self.flatten = Flatten()
        self.linear1 = Linear(input_channels=5 * 5 * 8, output_channels=50, bias=True)
        self.bn = BatchNorm2d(50)
        self.linear2 = Linear(input_channels=50, output_channels=10, bias=True)
        self.relu = ReLU()

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.maxpool2(x)
        x = self.flatten(x)
        x = self.linear1(x)
        x = self.bn(x)
        x = self.relu(x)
        x = self.linear2(x)
        return x

loss_fn = CrossEntropyLoss(l1_reg=0.005, l2_reg=0.005)
model = SimpleConvNet()
optim = Adam(model.parameters(), learning_rate = 1e-3, momentum=0.99, ro=0.99)

for i in range(5):
    y_pred_list = []
    y_true_list = []
    for index, batch in enumerate(data_loader):
        input_x, target = batch
        input_x = input_x / 255
        input_x = np.expand_dims(input_x, axis=1) # (64, 28, 28) -&gt; (64, 1, 28, 28)
        output = model(input_x)
        loss = loss_fn(output, target)
        loss.backward()
        optim.step()

        y_pred_list.extend(output)
        y_true_list.extend(np.int_(np.arange(0, 10) == target))

        if index % 20 == 0:
            print(f"loss: {loss.loss.mean():.2f}","index:", index, 'acc:', f"{accuracy(np.array(y_true_list), np.array(y_pred_list)):.2f}")
        
    print(f"loss: {loss.loss.mean():.2f}", "epoch:", i, 'acc:', f"{accuracy(np.array(y_true_list), np.array(y_pred_list)):.2f}")

>>>loss: 4.56 index: 0 acc: 0.12
loss: 0.66 index: 20 acc: 0.59
loss: 0.74 epoch: 0 acc: 0.65
loss: 0.59 index: 0 acc: 0.78
loss: 1.24 index: 20 acc: 0.74
loss: 0.55 index: 40 acc: 0.77
loss: 0.54 index: 60 acc: 0.78
loss: 1.45 epoch: 1 acc: 0.78
loss: 0.49 index: 0 acc: 0.84
...

Мы сделали большую работу! Добавили много слоев, реализовали несколько алгоритмов градиентого спуска и обучили свёрточную модель для задачи классификации изображений!

Вот и на этом вторая часть подошла к концу. Полный код можно найти тут

В третьей части я планирую:

  • представить аналог pytorch.tensor() 

  • перевести все вычисления на динамический вычислительный граф 

  • добавить EmbeddingLayerNormModuleList 

  • провести рефакторинг библиотеки 

  • добавить перенос вычислений на gpu

  • написать на библиотеке gpt2-1.5В и запустить его

Первая версия библиотеки

Вторая версия библиотеки

GPT-2 на этой библиотеке

Комментарии (1)


  1. Dokker24
    26.12.2024 05:19

    Статья очень понравилась, но почему бы не следовать канонам и использовать динамический вычислительный граф как в Pytorch? Сразу извиняюсь, если вопрос неуместный