PyTorch — это мощный и гибкий фреймворк для машинного обучения, широко используемый для создания нейронных сетей. Он особенно популярен благодаря простоте использования, динамическим вычислительным графам и богатой экосистеме инструментов для обучения моделей. Для использования этого фреймворка, часто достаточно поверхностно понимать работу алгоритмов машинного обучения.
Но Андрей Карпаты, известный исследователь в области ИИ, считает, что реализация алгоритмов с нуля позволяет понять их суть и детали работы, что сложно осознать, используя только готовые библиотеки. Это помогает развить интуицию для дальнейшего применения и улучшения методов. Андрей посвящает много собственного времени, чтобы объяснять ключевые принципы работы нейросетей в своих блогах и на своём ютуб-канале. Он также не раз подчеркивал, что на его курсе в Cтэнфорде есть задачи по реализации различных алгоритмов, например, обратное распространение.
Я хотел бы посвятить данную статью этой идеи, потому что мне самому особенно интересно копаться в алгоритмах глубокого обучения. Эта статья продолжение первой статьи
Сегодня мы:
добавим CNN, BatchNorm, MaxPool, MinPool
реализуем RMSProp, NaG, Adam
добавим регуляризацию в loss-функцию
добавим новые функции активации
напишем DataLoader
Поехали!
Мы остановились на том, что сделали достаточно похожую на PyTorch обертку.
loss_fn = CrossEntropyLoss()
model = SimpleNet()
optim = SGD(model.parameters(), learning_rate = 0.01)
for i in range(100):
output = model(input_x)
loss = loss_fn(output, target_x)
loss.backward()
optim.step()
Давайте теперь попытаемся обучить на каких-нибудь реальных данных! Для этой задачи нам подойдет набор рукописных цифр MNIST. Но как нам эффективно обработать весь датасет? Вспомним как это делается в PyTorch. Мы создаем объект класса DataLoader - который позволяет разбить весь датасет на батчи и работает как итерируемый объект. Давайте продолжим создавать собственные реализации.
Давайте хранить ссылку на наш набор в переменной dataset
. Например
import pandas as pd
import numpy as np
data_pd = pd.read_csv(path)
dataset = np.array(data_pd).astype(float)
np.random.shuffle(dataset) # перемешаем датасет
dataset.shape
>>> (5000, 785) # 1 значение класса + 784 значения пискелей
Dataloader
class DataLoader():
def __init__(self, data, batch_size=64, shuffle=True, flatten = True):
self.data = data
self.index = 0 # индекс нужен для корректной итерации
self.items = [] # здесь будем хранить наборы батчей
self.flatten = flatten
# определяем сколько раз сможем проитерировать весь набор
self.max = data.shape[0] // batch_size + (1 if data.shape[0] % batch_size != 0 else 0)
if shuffle == True:
# мешаем набор если захотел пользователь
self.data = np.random.permutation(self.data)
# создаем список всех батчей
for _ in range(self.max):
self.items.append(self.data[batch_size * _: batch_size * (_ + 1)])
# превращаем в итерируемый объект
def __iter__(self):
return self
# для циклов for и while
def __next__(self):
if self.index < self.max:
value = self.items[self.index] # получаем батч
self.index += 1
if self.flatten:
# возвращаем в нашем случае либо ((64, 784), (64, 1))
return value[:, 1:], value[:, 0].reshape(-1, 1)
else:
# либо ((64, 28, 28), (64, 1)), то есть как изображение
return value[:, 1:].reshape(value.shape[0], 28, 28), value[:, 0].reshape(-1, 1)
else:
self.index = 0
raise StopIteration # выходим из цикла
Теперь вся загрузка набора данных по батчам выглядит так:
data_loader = DataLoader(dataset, batch_size=64, shuffle=True, flatten=True)
Давайте посмотрим на наш батч.
for x, target in dataloader:
break
target.shape
>>> (64, 1)
Странно, у нас вроде 10 классов, а размер почему-то 1. Заглянем внутрь
target[0]
>>> array([8.])
Это номер класса, а мы ведь ждали, что будет [0, 0, 0, 0, 0, 0, 0, 0, 1, 0]
Тогда давайте обработаем такой случай внутри нашего класса CrossEntropyLoss
class CrossEntropyLoss:
def __call__(self, logits, true):
predicted = np.exp(logits) / np.sum(np.exp(logits), axis=1).reshape(-1, 1) # softmax
self.predicted = np.array(predicted, copy=True) # сделаем копию входных матрицы для дальнейших вычислений
### Добавили эти строчки
number_of_classes = predicted.shape[1]
self.true = np.int_(np.arange(0, number_of_classes) == true)
###
# вычисляем значение лосс-функции прямо по формуле
self.loss = -1 * np.sum(self.true * np.log(self.predicted + 1e-5), axis=1)
return self
Теперь попробуем обучить нашу нейронку. Создадим модель
class SimpleNet(Module):
def __init__(self):
super().__init__()
self.linear1 = Linear(input_channels=784, output_channels=200, bias=True)
self.linear2 = Linear(input_channels=200, output_channels=50, bias=True)
self.linear3 = Linear(input_channels=50, output_channels=10, bias=True)
self.relu = ReLU()
def forward(self, x):
x = self.linear1(x)
x = self.relu(x)
x = self.linear2(x)
x = self.relu(x)
x = self.linear3(x)
return x
Инициализируем всё необходимое
loss_fn = CrossEntropyLoss()
model = SimpleNet()
optim = SGD(model.parameters(), learning_rate = 0.01)
Для нашего набора в 5000 экземпляров получится 5000 // 64 = 78 батчей
. Пройдемся, например, 5 раз по нашему датасету, то есть у нас будет 5 эпох.
for i in range(5):
for batch in data_loader:
input_x, target = batch
input_x = input_x / 255 # нормируем на [0, 1], иначе обучение может стать нестабильным
output = model(input_x)
loss = loss_fn(output, target)
loss.backward()
optim.step()
print(loss.loss.mean(), i) # Берем среднее значение лосса по батчу
>>>0.622 0
0.294 1
0.166 2
0.101 3
0.070 4
Значение loss-функции уменьшается, значит наша модель обучается на нашем наборе данных!
Давайте пойдем дальше и добавим новые слои! В первую очередь я хотел бы добавить свёрточные слои, так как именно они хорошо справляются с сложными изображениями. Только лишь на линейных слоях мы далеко не уйдем.
CNN
Операция свертки состоит в сворачивании изображения с помощью фильтра. Фильтр или kernel — это тоже матрица чисел.
Свертка изображения размера 4×4 фильтром размера 3×3: Получ енный результат называется картой активации. Свертка картинки шаг за шагом:
Заметим, что размер карты активации после свертки стал меньше, чем размер изначального изображения. Общая формула размера карты активации такая:
m = (i − f + 2 * padding) / stride + 1
,
где:
m
— размер карты активации;i
— размер изображения;f
— размер фильтра.padding
— отступ от краяstride
— шаг фильтра
Рекомендую ознакомиться с хорошим ноутбуком, чтобы понять интуицию свёрток
Как будет выглядеть свёрточная сеть для классификации картинок:
У нас уже есть слои Flatten
и Linear
. Остается только Conv2d
.
Давайте попробуем реализовать простую операцию свертки одной картинки и одного фильтра.
image = np.array([[0, 50, 0, 29],
[0, 80, 31, 2],
[33, 90, 0, 75],
[0, 9, 0, 95]
])
kernel = np.ones((3, 3))
image, kernel
>>>array([[ 0, 50, 0, 29],
[ 0, 80, 31, 2],
[33, 90, 0, 75],
[ 0, 9, 0, 95]]),
array([[1., 1., 1.],
[1., 1., 1.],
[1., 1., 1.]])
i = image.shape[0]
f = kernel.shape[0]
padding = 0
step = 1
m = (i - f + 2*padding) // step + 1
>>> 2
new_image = np.zeros((m, m))
for y in range(m):
for x in range(m):
start_x = x * step
end_x = start_x + f
start_y = y * step
end_y = start_y + f
new_image[y][x] = np.sum(image[start_y:end_y, start_x:end_x] * kernel)
new_image
>>> array([[284., 357.],
[243., 382.]])
Проверим с библиотечной реализацией.
import scipy.signal
scipy.signal.fftconvolve(image, kernel, mode='valid')
>>>array([[284., 357.],
[243., 382.]])
В дальнейшем мы будем использовать для операции свёртки именно scipy.signal.fftconvolve
, потому что это весьма ресурсоемкая операция, а данный метод позволяет посчитать тоже самое быстрее с помощью быстрого преобразования Фурье!
Давайте попробуем цветную картинку, то есть не (height, width)
, а (channels, height, width)
Спойлер: есть два способа производить свертку многоканальной картинки
Мы будем использовать первый метод, так как он наиболее часто используемый.
Сгенерируем картинку и фильтр
image = np.random.randn(3, 7, 7)
kernel = np.ones((3, 3, 3))
Реализация операции почти никак не поменяется
i = image.shape[1]
f = kernel.shape[1]
padding = 0
step = 1
m = (i-f + 2*padding) // step +1
new_image = np.zeros((m, m))
for y in range(m):
for x in range(m):
start_x = x * step
end_x = start_x + f
start_y = y * step
end_y = start_y + f
new_image[y][x] = np.sum(image[:, start_y:end_y, start_x:end_x] * kernel)
Сравниваем!
np.allclose(new_image, scipy.signal.fftconvolve(image, kernel, mode='valid'))
>>> True
Круто! Теперь давайте сделаем операцию свёртки на нескольких картинках одним фильтром. Опять совсем немного усложним код
image = np.random.randn(10, 3, 7, 7)
kernel = np.ones((1, 3, 3, 3))
i = image.shape[-1]
f = kernel.shape[-1]
padding = 0
step = 1
m = (i-f + 2*padding) // step +1
number_of_images = image.shape[0]
new_image = np.zeros((number_of_images, m, m))
for image_n in range(number_of_images):
for y in range(m):
for x in range(m):
start_x = x * step
end_x = start_x + f
start_y = y * step
end_y = start_y + f
new_image[image_n][y][x] = np.sum(image[image_n,:, start_y:end_y, start_x:end_x] * kernel)
Проверим
np.allclose(new_image, scipy.signal.fftconvolve(image, kernel, mode='valid').squeeze(axis=1))
>>> True
Ура, получилось, теперь давайте наоборот, пройдемся несколькими фильтрами по одной картинке!
image = np.random.randn(1, 3, 7, 7)
kernel = np.ones((5, 3, 3, 3))
i = image.shape[-1]
f = kernel.shape[-1]
step = 1
m = (i-f) // step +1
number_of_kernels = kernel.shape[0]
new_image = np.zeros((number_of_kernels, m, m))
for kernel_n in range(number_of_kernels):
for y in range(m):
for x in range(m):
start_x = x * step
end_x = start_x + f
start_y = y * step
end_y = start_y + f
new_image[kernel_n][y][x] = np.sum(image[0, :, start_y:end_y, start_x:end_x] * kernel[kernel_n])
Снова сравниваем!
np.allclose(new_image, scipy.signal.fftconvolve(image, kernel, mode='valid').squeeze(axis=1))
>>> True
Это очень круто! А теперь давайте соберём это всё и реализуем операцию свертки нескольких фильтров на нескольких картинках!
image = np.random.randn(10, 1, 3, 7, 7)
kernel = np.ones((1, 5, 3, 3, 3))
i = image.shape[-1]
f = kernel.shape[-1]
padding = 0
step = 1
m = (i-f + 2*padding) // step +1
number_of_kernels = kernel.shape[1]
number_of_images = image.shape[0]
new_image = np.zeros((number_of_images, number_of_kernels, m, m))
for image_n in range(number_of_images):
for kernel_n in range(number_of_kernels):
for y in range(m):
for x in range(m):
start_x = x * step
end_x = start_x + f
start_y = y * step
end_y = start_y + f
new_image[image_n][kernel_n][y][x] = np.sum(image[image_n, 0, :, start_y:end_y, start_x:end_x] * kernel[0, kernel_n])
np.allclose(new_image, scipy.signal.fftconvolve(image, kernel, mode='valid').squeeze(axis=2))
>>> True
Очень крутой результат! Мы с вами смогли полностью реализовать одну из базовых и самых важных операций в нейронных сетях.
В PyTorch за операцию свертки отвечают слои nn.Conv1d
, nn.Conv2d
, nn.Conv3d
- логика у них у всех одинаковая, только немного отличается обработка. Например, мы реализовали nn.Conv2d
, если бы мы делали метод 2
на предыдущей картинке, то получили бы nn.Conv1d
. Теперь давайте соберем все знания в один класс.
Note!
Давайте условимся, что картинка в этот слой будет подаваться в виде (batch, channels, height, width
)
Например, 1 черно-белая картинка должна иметь формат (1, 1, 28, 28)
Также для операции в PyTorch можно использовать разные значения padding
и stride
. Можно сделать реализацию операции свёртки учитывая эти параметры, но я предлагаю этого не делать, так как это заметно усложнит реализацию!
Итак, padding = 0, stride=1
всегда
Conv2d
class Conv2d:
def __init__(self, input_channels: int, output_channels: int, kernel_size: int, bias = True):
self.bias = bias
self.kernel_size = (output_channels, input_channels, kernel_size, kernel_size)
self.filter_array = np.random.randn(1, *self.kerne_size) * 0.1 # домножаем на 0.1 чтобы начальные веса были совсем маленькими
self.bias = np.random.randn(1, output_channels, 1, 1) * 0.1
Parameter([self, self.filter_array, self.bias * bias])
def __call__(self, x):
self.x = np.expand_dims(x, axis=1) # сохраняем на всякий случай значение
# x: (batch, channels, height, width) - > (batch, 1, channels, height, width)
return scipy.signal.fftconvolve(np.expand_dims(x, axis=1), self.filter_array, mode='valid').squeeze(axis=2) + self.bias
Как всё красиво выглядит с использованием scipy.signal.fftconvolve
, но мы с вами имеем право сделать такую замену, потому что мы разобрались в деталях реализации и убедились, что можем корректно сделать реализацию такой функции. Но конечно вам никто не запрещает заменить scipy.signal.fftconvolve
на нашу реализацию.
В свёрточных слоях также используют bias, как в линейных слоях, чтобы получить дополнительную степень свободы при обучении. Метод .backward()
добавим чуть позже. Рекомендую также ознакомиться с данным плейлистом, с данным плейлистом и с видео
Идём дальше. Предлагаю не ограничиваться одной функцией активации!
Sigmoid
class Sigmoid:
def __init__(self):
self.input = None
Parameter([self, []])
@staticmethod
def sigmoid_(x):
return 1 / (1 + np.exp(-x))
def __call__(self, x):
self.input = x
return self.sigmoid_(x)
def backward(self, input_matrix):
return (1 - self.sigmoid_(self.input)) * self.sigmoid_(self.input) * input_matrix
Leaky Relu
class Leaky_relu:
def __init__(self, a = 0.2):
self.a = a
Parameter([self, []])
def __call__(self, x):
self.input = np.array(x, copy=True)
return x * ((1 + np.sign(x)) / 2 + self.a * (1 + np.sign(-x)) / 2)
def derivative(self, input_matrix):
return ((1 + np.sign(self.input)) / 2 + self.a * (1 + np.sign(-self.input)) / 2) * input_matrix
Tanh
class Tanh:
def __init__(self):
self.input = None
Parameter([self, []])
@staticmethod
def tanh_(x):
return (np.exp(2 * x) - 1) / (np.exp(2 * x) + 1)
def __call__(self, x):
self.input = np.array(x, copy=True)
return self.tanh_(x)
def derivative(self, input_matrix):
return 1 - self.tanh_(self.input) * input_matrix
Добавим теперь регуляризацию, она позволяет контролировать величину весов модели.
class CrossEntropyLoss:
def __init__(self, l1_reg = 0, l2_reg = 0):
self.l1_reg = l1_reg
self.l2_reg = l2_reg
def backward(self):
self.backward_list = []
loss = self.predicted - self.true
for index, layer in enumerate(Parameter.layers[::-1]):
if type(layer).__name__ == 'Linear':
base = (layer.x.T @ loss) / loss.shape[0]
l1_term = self.l1_reg * np.sign(Parameter.calling[layer][0])
l2_term = self.l2_reg * Parameter.calling[layer][0]
changes_w = base + l1_term + l2_term
...
...
Добавим теперь новые алгоритмы градиентного спуска
Nesterov Accelerated Gradient
Очень рекомендую ознакомиться с данными материалами первое и второе
class NAG:
def __init__(self, model, learning_rate, momentum):
self.model = model
self.lr = learning_rate
self.momentum = momentum
self.last_grad_w = None # будем хранить здесь предыдущий градиент
self.last_grad_b = None # будем хранить здесь предыдущий градиент
def step(self):
if self.last_grad_w == None:
self.last_grad_w = [0] * len(self.model._constructor_Parameter.layers) # храним для каждого слоя отдельно
self.last_grad_b = [0] * len(self.model._constructor_Parameter.layers)
for index, layer in enumerate(self.model._constructor_Parameter.layers[::-1]):
if type(layer).__name__ in ('Linear', 'Conv2d'):
weight, bias = self.model._constructor_Parameter.calling[layer]
weight_gradient, bias_gradient = layer.backward_list[0], layer.backward_list[1]
# можно сказать, что это скальзящее среднее для градиентов
self.last_grad_w[index] = - self.lr * weight_gradient + self.momentum * self.last_grad_w[index]
self.last_grad_b[index] = - self.lr * bias_gradient + self.momentum * self.last_grad_b[index]
# обвновляем веса
new_weight = weight + self.last_grad_w[index]
new_bias = bias + self.last_grad_и[index]
# меняем на обновленные веса
self.model._constructor_Parameter.calling[layer] = [new_weight, new_bias]
RMSProp
Очень рекомендую ознакомиться с данными материалами первое и второе
class RMSProp:
def __init__(self, model, learning_rate, ro):
self.model = model
self.lr = learning_rate
if ro < 0 or ro > 1:
raise Exception("Incorrect ro value")
self.ro = ro
self.grad_velocity_w = None
self.grad_velocity_b = None
def step(self):
if self.grad_velocity_w== None:
self.grad_velocity_w = [0] * len(self.model._constructor_Parameter.layers)
self.grad_velocity_b = [0] * len(self.model._constructor_Parameter.layers)
for index, layer in enumerate(self.model._constructor_Parameter.layers[::-1]):
if type(layer).__name__ in ('Linear', 'Conv2d'):
weight, bias = self.model._constructor_Parameter.calling[layer]
weight_gradient, bias_gradient = layer.backward_list[0], layer.backward_list[1]
self.grad_velocity_w[index] = self.ro * self.grad_velocity_w[index] + (1 - self.ro) * weight_gradient ** 2
self.grad_velocity_b[index] = self.ro * self.grad_velocity_b[index] + (1 - self.ro) * bias_gradient ** 2
new_weight = weight - self.lr * weight_gradient / np.sqrt(self.grad_velocity_w[index] + 1e-5)
new_bias = bias - self.lr * bias_gradient / np.sqrt(self.grad_velocity_b[index] + 1e-5)
self.model._constructor_Parameter.calling[layer] = [new_weight, new_bias]
Adam
Снова очень рекомендую ознакомиться с данными материалами первое и второе
Грубо говоря adam
это сумма rmsprop
и nag
, поэтому просто «сложим» 2 реализации
class Adam:
def __init__(self, model, learning_rate, momentum, ro):
self.model = model
self.lr = learning_rate
self.momentum = momentum
self.last_grad_w = None
self.last_grad_b = None
if ro < 0 or ro > 1:
raise Exception("Incorrect ro value")
self.ro = ro
self.grad_velocity_w = None
self.grad_velocity_b = None
def step(self):
if self.last_grad_w == None:
self.last_grad_w = [0] * len(self.model._constructor_Parameter.layers)
self.last_grad_b = [0] * len(self.model._constructor_Parameter.layers)
self.grad_velocity_w = [0] * len(self.model._constructor_Parameter.layers)
self.grad_velocity_b = [0] * len(self.model._constructor_Parameter.layers)
for index, layer in enumerate(self.model._constructor_Parameter.layers[::-1]):
if type(layer).__name__ in ('Linear', 'Conv2d'):
weight, bias = self.model._constructor_Parameter.calling[layer]
weight_gradient, bias_gradient = layer.backward_list[0], layer.backward_list[1]
self.grad_velocity_w[index] = self.ro * self.grad_velocity_w[index] + (1 - self.ro) * weight_gradient ** 2
self.grad_velocity_b[index] = self.ro * self.grad_velocity_b[index] + (1 - self.ro) * bias_gradient ** 2
self.last_grad_w[index] = - self.lr * weight_gradient + self.momentum * self.last_grad_w[index]
self.last_grad_b[index] = - self.lr * bias_gradient + self.momentum * self.last_grad_b[index]
new_weight = weight + self.last_grad_w[index] / np.sqrt(self.grad_velocity_w[index] + 1e-5)
new_bias = bias + self.last_grad_b[index] / np.sqrt(self.grad_velocity_b[index] + 1e-5)
self.model._constructor_Parameter.calling[layer] = [new_weight, new_bias]
Обучение
Итак, мы добавили все необходимое. Осталось только добавить обновление весов для Conv2d
. Сейчас будет немного сложновато, но у нас всё получится!
Я очень рекомендую ознакомиться с данными материалами, в них ребята на пальцах рассказывают про расчет обратного распространения через свёрточный слой. Я буду пользоваться их результатами
Давайте проверим корректность этих формул в общем случае с помощью scipy.signal.fftconvolve
image = np.random.randn(10, 1, 3, 7, 7) # Наша входная картинка
kernel = np.random.randn(1, 5, 3, 3, 3) # Наши фильтры
bias = np.random.randn(1, 5, 1, 1) # наш bias
result = scipy.signal.fftconvolve(image, kernel, mode='valid') #(10, 5, 1, 5, 5) # Предположим, что это наш dl/dx - по размерности он совпадает как раз
Считаем градиент для фильтров по формуле с картинки
scipy.signal.fftconvolve(image, result, mode='valid').shape
>>> (1, 5, 3, 3, 3)
Видим, что совпадает с размерностью фильтров. Идём дальше
Считаем градиент для bias по формуле с картинки
result.sum(axis=(0, 2, 3, 4))
>>> (5,)
У нас также всё совпадает благодаря broadcasting
Считаем градиент для дальнейшего прохождения по формуле с картинки
rot = np.transpose(kernel, (0, 1, 2, 4, 3)) # поменяли местами 2 последние оси
pad = np.pad(result, ((0, 0), (0, 0), (0, 0), (2, 2), (2, 2)), 'constant', constant_values=(0)) # Заполнили 2 последние размерности 0 слева и справа
Пример использования np.pad
tmp = np.random.randn(2, 2)
np.pad(tmp, ((1, 1), (1, 1)), 'constant', constant_values=(0))
>>>array([[ 0. , 0. , 0. , 0. ],
[ 0. , 1.93470843, -1.40590807, 0. ],
[ 0. , -0.61160614, 0.15255254, 0. ],
[ 0. , 0. , 0. , 0. ]])
scipy.signal.fftconvolve(pad, rot, mode='valid').shape
>>> (10, 1, 3, 7, 7)
И у нас всё опять сошлось с размерностью image
. Значит мы можем использовать эти формулы для расчетов градиентов. Давайте добавим наконец-то вычисления в класс CrossEntropyLoss
class CrossEntropyLoss:
def backward(self):
for index, layer in enumerate(Parameter.layers[::-1]):
elif type(layer).__name__ == 'Conv2d':
expanded_loss = np.expand_dims(loss, axis=2)
changes_w = scipy.signal.fftconvolve(layer.x, expanded_loss, mode='valid')
changes_b = loss.sum(axis=(0, 2, 3)) * self.bias_flag
layer.backward_list = [changes_w, changes_b]
rotated_filters = np.transpose(layer.filter_array, (0, 1, 2, 4, 3))
pad = layer.kernel_size[-1]
padded_loss = np.pad(expanded_loss, ((0, 0), (0, 0), (0, 0), (pad - 1, pad - 1), (pad - 1, pad - 1)), 'constant', constant_values=(0))
loss = scipy.signal.fftconvolve(padded_loss, rotated_filters, mode='valid').squeeze(axis=1)
Готово! Оказалось совсем несложно, да?)
Теперь давайте немного поменяем алгоритм нашего класса Flatten
. Он будет применяться после всех сверточных слоёв чтобы превратить вектор из (batch, channels, height, width)
в (batch, channels * height * width)
и отправить этот вектор в линейный слой. Также добавим для него метод .backward
, потому что он уже является промежуточным слоем.
class Flatten:
def __init__(self):
Parameter([self, []])
def __call__(self, x):
self.init_shape = x.shape
return x.reshape(self.init_shape[0], -1)
def backward(self, input_matrix):
return input_matrix.reshape(self.init_shape)
class CrossEntropyLoss:
def backward(self):
for index, layer in enumerate(Parameter.layers[::-1]):
...
elif type(layer).__name__ == 'Flatten':
loss = layer.backward(loss)
Всё готово! Давайте собирать и обучать нашу свёрточную нейронку!
class SimpleNet(Module):
def __init__(self):
super().__init__()
self.conv1 = Conv2d(input_channels = 1, output_channels = 4, kernel_size=5) #28 -> 24
self.conv2 = Conv2d(input_channels = 4, output_channels = 8, kernel_size=5) #24 -> 20
self.conv3 = Conv2d(input_channels = 8, output_channels = 16, kernel_size=5) #20 -> 16
self.flatten = Flatten()
self.linear1 = Linear(input_channels=16 * 16 * 16, output_channels=200, bias=True)
self.linear2 = Linear(input_channels=200, output_channels=50, bias=True)
self.linear3 = Linear(input_channels=50, output_channels=10, bias=True)
self.relu = ReLU()
def forward(self, x):
x = self.conv1(x)
x = self.relu(x)
x = self.conv2(x)
x = self.relu(x)
x = self.conv3(x)
x = self.relu(x)
x = self.flatten(x)
x = self.linear1(x)
x = self.relu(x)
x = self.linear2(x)
x = self.relu(x)
x = self.linear3(x)
return x
loss_fn = CrossEntropyLoss(l1_reg=0.1, l2_reg=0.1)
model = SimpleNet()
optim = Adam(model.parameters(), learning_rate = 0.001, momentum=0.9, ro=0.9)
for i in range(5):
for index, batch in enumerate(data_loader):
input_x, target = batch
input_x = input_x / 255
input_x = np.expand_dims(input_x, axis=1) # (64, 28, 28) -> (64, 1, 28, 28)
output = model(input_x)
loss = loss_fn(output, target)
loss.backward()
optim.step()
if index % 1 == 0:
print(loss.loss.mean(),"index:", index)
print(loss.loss.mean(), "epoch:", i)
>>>7.968706513917543 index: 0
7.551666321134583 index: 1
7.278343803030227 index: 2
6.431693753643258 index: 3
5.637298937703794 index: 4
5.633115375065631 index: 5
5.00220357224557 index: 6
5.925713000464821 index: 7
5.045218328819374 index: 8
5.103600182770298 index: 9
4.13759075103372 index: 10
3.576208929225664 index: 11
3.320147584705806 index: 12
2.995831081105269 index: 13
2.8976790588242745 index: 14
2.8176135148843797 index: 15
2.7363129802870807 index: 16
2.297198679475752 index: 17
2.310984252722919 index: 18
2.2726362928678476 index: 19
2.11617617635163 index: 20
2.23533730800758 index: 21
2.064323468827378 index: 22
1.9638605959573385 index: 23
2.1304361443736948 index: 24
2.0881922952210124 index: 25
1.9733052417437202 index: 26
2.099729829345195 index: 27
2.0752790945980193 index: 28
Отлично, модель кое-как обучается, только вот значение loss-функции в какой-то момент перестает изменятся. Возможно причиной этого может быть слишком сложная задача или слишком сложная модель (в одном только linear1 — 16*16*16*200 = 800_000 весов
)
Давайте упростим немного задачу нашей модели, введя еще один слой — MaxPool
MaxPool
Pooling — это операция уменьшения карт активации. Делается она очень просто:
Берем карту активации, делим ее на квадраты размера два на два;
Из каждого квадрата два на два берем только одно число: максимальное число в этом квадрате;
Записываем эти числа вместо каждого квадрата два на два. Так мы из карты активации размера (n x n) получаем карту активации размера (n/2 x n/2)
Такая операция называется MaxPooling с ядром размера 2. Ядро размера 2, потому что мы делили карту активации на квадраты размера 2х2.
class MaxPool:
def __init__(self, kernel_size: tuple):
self.kernel_size = kernel_size
Parameter([self, []])
def __call__(self, x):
array = x.copy()
result_full = np.zeros((array.shape[0], array.shape[1], int(array.shape[2] / self.kernel_size[0]), int(array.shape[3] / self.kernel_size[1])))
for k in range(array.shape[0]):
for m in range(array.shape[1]):
result = []
self.i = 0
while self.i < array[k][m].shape[0] - self.kernel_size[0] + 1:
self.j = 0
while self.j < array[k][m].shape[1] - self.kernel_size[1] + 1:
result.append(np.max(array[k][m][self.i:self.i + self.kernel_size[0], self.j:self.j + self.kernel_size[1]]))
array[k][m][self.i:self.i + self.kernel_size[0], self.j:self.j + self.kernel_size[1]] = (array[k][m][self.i:self.i + self.kernel_size[0], self.j: self.j + self.kernel_size[1]]) * [array[k][m][self.i:self.i + self.kernel_size[0],
self.j:self.j +self.kernel_size[1]] == np.max(array[k][m][self.i:self.i +self.kernel_size[0], self.j:self.j +self.kernel_size[1]])]
self.j += self.kernel_size[1]
self.i += self.kernel_size[0]
result_full[k][m] = np.array(result).reshape(int(array[k][m].shape[0] / self.kernel_size[0]), int(array[k][m].shape[1] / self.kernel_size[1]))
self.array = array
return result_full
Как видим, логика простая, а реализация не очень. Давайте заменим на более короткую функцию skimage.measure.block_reduce
и сравним результаты
import skimage
class MaxPool2d:
def __init__(self, kernel_size: tuple):
self.kernel_size = kernel_size
Parameter([self, []])
def __call__(self, x):
self.x = np.array(x, copy=True)
return skimage.measure.block_reduce(a, (1, 1, *self.kernel_size), np.max)
class MinPool2d:
def __init__(self, kernel_size: tuple):
self.kernel_size = kernel_size
Parameter([self, []])
def __call__(self, x):
self.x = np.array(x, copy=True)
return skimage.measure.block_reduce(a, (1, 1, *self.kernel_size), np.min)
a = np.random.randn(10, 20, 30, 30)
maxpool1 = MaxPool1((2, 2)) # Наша реализация
maxpool2 = MaxPool2((2, 2)) # Реализация Scimage
np.allclose(maxpool1(a), maxpool2(a))
>>>True
a = np.random.randn(4, 19, 20, 20)
maxpool1 = MaxPool1((5, 5)) # Наша реализация
maxpool2 = MaxPool2((5, 5)) # Реализация Scimage
np.allclose(maxpool1(a), maxpool2(a))
>>>True
Раз они одинаковые, давайте оставим более короткую реализацию. Углубляться в свою реализацию я не стану, так как она довольная простая, просто немного запутанная.
Теперь добавим backprop
для max-min pool. Логика простая, инициализируем нулевую матрицу, и на тех местах, где было наибольшее ( или наименьше в случае MinPool
) значение в окне меняем 0
на 1
. Всё просто, но реализация всё также немного запутанная! Я верю, что можно сделать более эффективно, но пока не хочу останавливаться на этом!
class CrossEntropyLoss:
def backward(self):
for index, layer in enumerate(Parameter.layers[::-1]):
...
elif type(layer).__name__ == 'MaxPool2d':
new_shape = np.zeros(layer.x.shape)
for k in range(layer.x.shape[0]):
for m in range(layer.x.shape[1]):
inx_ = 0
inx__ = 0
layer.i = 0
while layer.i < layer.x[k][m].shape[0] - layer.kernel_size[0] + 1:
layer.j = 0
inx__ = 0
while layer.j < layer.x[k][m].shape[1] - layer.kernel_size[1] + 1:
new_shape[k][m][layer.i:layer.i + layer.kernel_size[0], layer.j:layer.j + layer.kernel_size[1]] = loss[k][m][inx_][inx__]
inx__ += 1
layer.j += layer.kernel_size[1]
inx_ += 1
layer.i += layer.kernel_size[0]
loss = np.squeeze([layer.x > 0] * new_shape, axis=0)
# Для minpool аналогично, только поменять знак
Теперь собираем
class SimpleConvNet(Module):
def __init__(self):
super().__init__()
self.conv1 = Conv2d(input_channels = 1, output_channels = 4, kernel_size=3) #28 -> 26
self.maxpool1 = MaxPool2d(kernel_size=(2,2)) # 26 -> 13
self.conv2 = Conv2d(input_channels = 4, output_channels = 8, kernel_size=4) #13 -> 10
self.maxpool2 = MaxPool2d(kernel_size=(2,2)) # 10 -> 5
self.flatten = Flatten()
self.linear1 = Linear(input_channels= 5 * 5 * 8, output_channels=50, bias=True)
self.linear2 = Linear(input_channels=50, output_channels=10, bias=True)
self.relu = ReLU()
def forward(self, x):
x = self.conv1(x)
x = self.relu(x)
x = self.maxpool1(x)
x = self.conv2(x)
x = self.relu(x)
x = self.maxpool2(x)
x = self.flatten(x)
x = self.linear1(x)
x = self.relu(x)
x = self.linear2(x)
return x
loss_fn = CrossEntropyLoss(l1_reg=0.001, l2_reg=0.001)
model = SimpleConvNet()
optim = Adam(model.parameters(), learning_rate = 5e-3, momentum=0.9, ro=0.9)
for i in range(5):
y_pred_list = []
y_true_list = []
for index, batch in enumerate(data_loader):
input_x, target = batch
input_x = input_x / 255
input_x = np.expand_dims(input_x, axis=1) # (64, 28, 28) -> (64, 1, 28, 28)
output = model(input_x)
loss = loss_fn(output, target)
loss.backward()
optim.step()
y_pred_list.extend(output)
y_true_list.extend(np.int_(np.arange(0, 10) == target))
if index % 20 == 0:
print(f"loss: {loss.loss.mean():.2f}","index:", index, 'acc:', f"{accuracy(np.array(y_true_list), np.array(y_pred_list)):.2f}")
print(f"loss: {loss.loss.mean():.2f}", "epoch:", i, 'acc:', f"{accuracy(np.array(y_true_list), np.array(y_pred_list)):.2f}")
>>>loss: 2.94 index: 0 acc: 0.08
loss: 2.02 index: 20 acc: 0.23
loss: 1.76 epoch: 0 acc: 0.31
loss: 1.67 index: 0 acc: 0.53
loss: 1.06 index: 20 acc: 0.60
loss: 1.05 index: 40 acc: 0.63
loss: 0.98 index: 60 acc: 0.65
loss: 1.05 epoch: 1 acc: 0.65
loss: 0.98 index: 0 acc: 0.72
loss: 0.92 index: 20 acc: 0.64
loss: 1.27 index: 40 acc: 0.62
loss: 1.04 index: 60 acc: 0.62
loss: 0.97 epoch: 2 acc: 0.63
loss: 0.73 index: 0 acc: 0.77
Отлично! У нас всё обучается!
Теперь давайте добавим еще один метод регуляризации BatchNorm. Будем применять его после линейных слоев. Вычисление производной для BatchNorm. Здесь опять же просто механическая работа в виде следования формулам!
class BatchNorm2d:
def __init__(self, size):
self.conv = False
Parameter([self, np.ones((size)), np.ones((size))])
def __call__(self, x):
self.mean = np.mean(x, axis=(0))
self.std = np.std(x, axis=(0)) + 0.0001
self.x = (x - self.mean) / self.std
return Parameter.calling[self][0] * self.x + Parameter.calling[self][1]
class CrossEntropyLoss:
def backward(self):
for index, layer in enumerate(Parameter.layers[::-1]):
...
layer.backward_list = [np.sum(loss * layer.x, axis = 0), np.sum(loss, axis = 0)]
dl_dx = loss * Parameter.calling[layer][0]
dl_dstd = np.sum(dl_dx * (layer.x * layer.std) * (-1/2) / (layer.std ** 3), axis = 0)
dl_dmean = np.sum(dl_dx * (- 1 / layer.std), axis = 0) + dl_dstd * (np.sum(-2 * layer.x * layer.std, axis = 0) / len(layer.x))
loss = dl_dx / layer.std + dl_dstd * 2 * (layer.x * layer.std) / len(layer.x) + dl_dmean / len(layer.x)
Обучаем
class SimpleConvNet(Module):
def __init__(self):
super().__init__()
self.conv1 = Conv2d(input_channels = 1, output_channels = 4, kernel_size=3) #28 -> 26
self.maxpool1 = MaxPool2d(kernel_size=(2,2)) # 26 -> 13
self.conv2 = Conv2d(input_channels = 4, output_channels = 8, kernel_size=4) #13 -> 10
self.maxpool2 = MaxPool2d(kernel_size=(2,2)) # 10 -> 5
self.flatten = Flatten()
self.linear1 = Linear(input_channels=5 * 5 * 8, output_channels=50, bias=True)
self.bn = BatchNorm2d(50)
self.linear2 = Linear(input_channels=50, output_channels=10, bias=True)
self.relu = ReLU()
def forward(self, x):
x = self.conv1(x)
x = self.relu(x)
x = self.maxpool1(x)
x = self.conv2(x)
x = self.relu(x)
x = self.maxpool2(x)
x = self.flatten(x)
x = self.linear1(x)
x = self.bn(x)
x = self.relu(x)
x = self.linear2(x)
return x
loss_fn = CrossEntropyLoss(l1_reg=0.005, l2_reg=0.005)
model = SimpleConvNet()
optim = Adam(model.parameters(), learning_rate = 1e-3, momentum=0.99, ro=0.99)
for i in range(5):
y_pred_list = []
y_true_list = []
for index, batch in enumerate(data_loader):
input_x, target = batch
input_x = input_x / 255
input_x = np.expand_dims(input_x, axis=1) # (64, 28, 28) -> (64, 1, 28, 28)
output = model(input_x)
loss = loss_fn(output, target)
loss.backward()
optim.step()
y_pred_list.extend(output)
y_true_list.extend(np.int_(np.arange(0, 10) == target))
if index % 20 == 0:
print(f"loss: {loss.loss.mean():.2f}","index:", index, 'acc:', f"{accuracy(np.array(y_true_list), np.array(y_pred_list)):.2f}")
print(f"loss: {loss.loss.mean():.2f}", "epoch:", i, 'acc:', f"{accuracy(np.array(y_true_list), np.array(y_pred_list)):.2f}")
>>>loss: 4.56 index: 0 acc: 0.12
loss: 0.66 index: 20 acc: 0.59
loss: 0.74 epoch: 0 acc: 0.65
loss: 0.59 index: 0 acc: 0.78
loss: 1.24 index: 20 acc: 0.74
loss: 0.55 index: 40 acc: 0.77
loss: 0.54 index: 60 acc: 0.78
loss: 1.45 epoch: 1 acc: 0.78
loss: 0.49 index: 0 acc: 0.84
...
Мы сделали большую работу! Добавили много слоев, реализовали несколько алгоритмов градиентого спуска и обучили свёрточную модель для задачи классификации изображений!
Вот и на этом вторая часть подошла к концу. Полный код можно найти тут
В третьей части я планирую:
представить аналог
pytorch.tensor()
перевести все вычисления на динамический вычислительный граф
добавить Embedding, LayerNorm, ModuleList
провести рефакторинг библиотеки
добавить перенос вычислений на gpu
написать на библиотеке gpt2-1.5В и запустить его
Dokker24
Статья очень понравилась, но почему бы не следовать канонам и использовать динамический вычислительный граф как в Pytorch? Сразу извиняюсь, если вопрос неуместный