Постановка задачи
Обучение с подкреплением молодая и бурно растущая дисциплина. Это обстоятельство привело к тому, что информации об этом мало на английском и почти нет на русском языке. Особенно, если дело касается объектно-ориентированного подхода, и практических задач не из арсенала Open Gym. Стало интересно, как решать задачи RL в других средах.
Представляю вам результат простой задачи, которая как я надеюсь, убережет вас от части шишек встречающихся на этом интересном пути.
Предположим задачу, в которой нано робот с антибиотиком должен подобраться к скоплению патогенных бактерий для их уничтожения.
Загрузим Reinforsment Learning от Keras и библиотеку для анимации.
!pip install keras-rl2
!pip install celluloid
# Базовые Модули
import time # модуль для операций со временными характеристиками
import random
import numpy as np
# Модули Keras
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Activation, Flatten, Input, Concatenate
from tensorflow.keras.optimizers import Adam
# Модули Keras-RL2
import rl.core as krl
from rl.agents import DDPGAgent
from rl.memory import SequentialMemory
from rl.random import OrnsteinUhlenbeckProcess
# Модули визуализации
from celluloid import Camera
import matplotlib.pyplot as plt
from matplotlib import rc
rc('animation', html='jshtml')
%matplotlib inline
Среда
Для обучения с подкреплением требуется среда и агент.
Средой в нашем случае будет мигрирующие в тканях патогены. Их движение соответствует роевому поведению.
Роевое поведение описано моделью Вичека (1995 г.).С помощью этой системы можно имитировать скопления бактерий, поведение стаи птиц или косяка рыб, а также увидеть, как из простых правил появляются само упорядоченные движения.
Возьмем описание модели из статьи Создание собственной симуляции активной материи на Python. И перепишем ее используя объектно-ориентированный подход. Предполагается, что вы уже знакомы с ООП для Python.
# Имитация роевого поведения
class Colony:
# положения частицы
x : np.ndarray
y : np.ndarray
# угол направления частицы
theta : np.ndarray
# скорость частицы по осям
vx : np.ndarray
vy : np.ndarray
# Конструктор
def __init__(self,N):
self.reset(N)
# расстановка N частиц на площадке LxL
def reset(self,N):
# положения частиц
self.x = np.random.rand(N,1)*L
self.y = np.random.rand(N,1)*L
# направление и осевые скорости частиц относительно
# постоянной линейной скорости v0
self.theta = 2 * np.pi * np.random.rand(N,1)
self.vx = v0 * np.cos(self.theta)
self.vy = v0 * np.sin(self.theta)
# Шаг имитации
def step(self):
# движение
self.x += self.vx*dt
self.y += self.vy*dt
# применение периодических пограничных условий
self.x = self.x % L
self.y = self.y % L
# найти средний угол соседей в диапазоне R
mean_theta = self.theta
for b in range(N):
neighbors = (self.x-self.x[b])**2+(self.y-self.y[b])**2 < R**2
sx = np.sum(np.cos(self.theta[neighbors]))
sy = np.sum(np.sin(self.theta[neighbors]))
mean_theta[b] = np.arctan2(sy, sx)
# добавление случайного отклонения
self.theta = mean_theta + eta*(np.random.rand(N,1)-0.5)
# изменение скорости
self.vx = v0 * np.cos(self.theta)
self.vy = v0 * np.sin(self.theta)
return self.theta
# Получить список частиц в внутри радиуса r от координат x,y
def observe(self,x,y,r):
return (self.x-x)**2+(self.y-y)**2 < r**2
# Вывести координаты частицы i
def print(self,i):
return print(self.x[i],self.y[i])
# Получить координаты частиц
def get_bacteria(self):
return self.x, self.y
# Получить массив направлений частиц
def get_theta(self):
return self.theta
Описанный класс послужит нам для описания состояния среды state. Не путайте observation и state. Наблюдаемые данные observation, это только то, что наблюдает агент. Состоянием state считается вся среда: описание всех наших бактерий.
Для правильной работы в автоматическом режиме требуется через атрибуты класса action_space и observation_space описать допустимые значения action агента и представление среды observation.
Их надо наследовать от класса rl.Space. Для action_space требуется переписать методы:
sample() - возвращает случайное допустимое действие. В нашем случае возвращает число из диапазона [-1,1)
contains(x) - проверяет x на допустимость.
В атрибуте shape классов мы будем хранить форму значений
# action - скаляр от -1 до 1
class actionSpace(krl.Space):
def __init__(self):
self.shape = (1,)
def sample(self, seed=None):
if seed: random.seed(seed)
return random.triangular(-1,1)
def contains(self, x):
return abs(x) <= 1
# observation - массив
# допустимые значения можно не описывать.
class observationSpace(krl.Space):
def __init__(self):
self.shape = (5,) #
def sample(self, seed=None): pass
def contains(self, x): pass
Для того чтобы сделать среду нам надо создать класс наследуя ее из базового класса среды rl.Env предоставляемой керас. Это абстрактный класс, в соответствии с задуманной средой необходимо описать его методы:
reset() - "сотворение мира"
step(action) - изменение мира на шаге в соответствии с action
render() - вывод любой информации по состоянию мира на данном шаге.
close() - завершение экземпляра класса
В классе среды мы должны описать состояние state, наблюдение observation, награду reward.
В observation подадим 5 переменных:
Количество "захваченных" бактерий внутри радиуса R
Средний угол направления бактерий внутри R
Угол направления на центр бактерий внутри R
Угол направления на центр бактерий внутри круга R-1.5R
-
Текущий угол направления нано робота
Награда - точка приложения вашего максимального внимания. Награда должна соответствовать задаче. Мы будем строго штрафовать за потерю бактерий, тем строже чем их меньше в области видимости R. Так же, решим поощрять за приобретение и сохранение точек.
Действием,- будет угол движения нано робота. Все переменные нормализуем делением на Pi.
# наша "чашечка Петри"
class Cure(krl.Env):
# имитируемая колония
bacteria : Colony
# положение нано робота
x: float
y: float
theta: float # направление нано робота
R: float # область видимости бактерий нано роботом
n_bacteria : int # сохраняем предыдущее значение количества видимых бактерий для rewarda
# конструктор
def __init__(self):
self.bacteria = Colony(N)
self.reward_range = (-1,1) #(-np.inf, np.inf)
self.action_space = actionSpace()
self.observation_space = observationSpace()
self.R = observation_R
self.reset()
# Формирование вектора обзора observation.
# То что происходит в области видимости R от робота.
def observe_area(self):
# получим список соседей в радиусе R
observe_bacteria = self.bacteria.observe(self.x,self.y,self.R)
# получим список соседей в радиусе R*1.5
observe_far_bacteria = self.bacteria.observe(self.x,self.y,self.R*1.5)
observe_far_bacteria=np.array(np.bitwise_and(observe_far_bacteria,np.invert (observe_bacteria)))
observation = np.zeros(5)
# подадим количество соседей
n_bacteria = np.sum(observe_bacteria)
observation[0] = n_bacteria/20
# посчитаем и подадим среднее направлений соседних бактерий
sx = np.sum(np.cos(self.bacteria.theta[observe_bacteria]))
sy = np.sum(np.sin(self.bacteria.theta[observe_bacteria]))
observation[1] = np.arctan2(sy, sx)/np.pi
# посчитаем и подадим среднее направление от робота до удаленных бактерий
sx = np.sum(self.bacteria.x[observe_bacteria]-self.x)
sy = np.sum(self.bacteria.y[observe_bacteria]-self.y)
observation[2] = np.arctan2(sy, sx)/np.pi
# посчитаем и подадим среднее направление от робота до удаленных бактерий
sx = np.sum(self.bacteria.x[observe_far_bacteria]-self.x)
sy = np.sum(self.bacteria.y[observe_far_bacteria]-self.y)
observation[3] = np.arctan2(sy, sx)/np.pi
if n_bacteria:
observation[4]=self.theta/np.pi # подадим направление наноробота
return np.sum(observe_bacteria), observation
# старт симуляции
def reset(self):
self.bacteria.reset(N)
self.x = .5*L
self.y = .5*L
self.theta = actionSpace().sample()
self.n_bacteria , observation = self.observe_area()
return observation
# шаг симуляции
def step(self,action):
action = action * 3.2#np.pi
# Для экономии времени при попадании на "чистую воду"
# просчитываем симуляцию не выпуская ее для обработки сети
while True:
# шаг симуляции бактерий
self.bacteria.step()
# шаг робота
self.theta = np.sum(action) #% (2*np.pi)
self.x = self.x + dt*v0 * np.cos(self.theta)
self.y = self.y + dt*v0 * np.sin(self.theta)
self.x = self.x % L
self.y = self.y % L
# осматриваем окружение
nBacteria, observation = self.observe_area()
if np.sum(observation)!=0: break
if self.n_bacteria > 0: break
delta = nBacteria - self.n_bacteria
if delta<0:
reward = 50 * delta/self.n_bacteria
elif delta>0 and self.n_bacteria:
reward = 1+delta
elif nBacteria>0:
reward = 1
elif nBacteria == 0:
reward = 0
else:
reward = nBacteria
done = nBacteria > N/7
self.n_bacteria = nBacteria
return observation, reward, done, {}
# получить координаты робота
def get_position(self):
return self.x, self.y, self.R
# получить координаты всех бактерий
def get_bacteria(self):
return self.bacteria.get_bacteria()
# отразить отладочную информацию
def render(self, mode='human', close=False):
#print(self.n_bacteria)
pass
# завершить симуляцию
def close(self): pass
На этом этапе, давайте определим параметры среды и проиграем случайные эпизоды.
Просмотр эпизодов даст понимание с каким разнообразием ситуаций успевает столкнуться наш робот. В зависимости от этого, регулируем: количество бактерий; размер площадки; скорость и количество эпох.
Вы можете это сделать в ноутбуке Google Collab.
Агент и обучение
Среда определена. Остается создать агента.
Наш агент - нано робот, который будет двигаться с той же скоростью, что и бактерии, а управлять мы будем его угловым направлением action. Робот "видит" соседние бактерии и должен двигаться за ними достигая очага поражения.
Для решения задачи используем метод Deep Deterministic Policy Gradient (DDPG), его можно рассматривать как DQN для непрерывных пространств действий. Мы попеременно обучаем 2 сети Актера(производит действие action) и Критика(оценивает вознаграждение reward).
Для тренировки используется keras-rl класс DDPGAgent. Он берет на себя всю техническую реализацию, а нам остается написать несколько строчек кода и получить результат. ООП великая сила!
# Создадим среду и извлечем пространство действий
env = Cure()
np.random.seed(123)
assert len(env.action_space.shape) == 1
nb_actions = env.action_space.shape[0]
# Построим модель актера. Подаем среду, получаем действие
actor = Sequential()
actor.add(Flatten(input_shape=(1,) + env.observation_space.shape))
actor.add(Dense(4, use_bias=True))
actor.add(Activation('relu'))
actor.add(Dense(4, use_bias=True))
actor.add(Activation('relu'))
actor.add(Dense(nb_actions, use_bias=True))
actor.add(Activation('tanh'))
print(actor.summary())
# Построим модель критика. Подаем среду и действие, получаем награду
action_input = Input(shape=(nb_actions,), name='action_input')
observation_input = Input(shape=(1,) + env.observation_space.shape, name='observation_input')
flattened_observation = Flatten()(observation_input)
x = Concatenate()([action_input, flattened_observation])
x = Dense(8, use_bias=False)(x)
x = Activation('relu')(x)
x = Dense(5, use_bias=True)(x)
x = Activation('relu')(x)
x = Dense(1)(x)
x = Activation('linear')(x)
critic = Model(inputs=[action_input, observation_input], outputs=x)
print(critic.summary())
# Keras-RL предоставляет нам класс, rl.memory.SequentialMemory
# где хранится "опыт" агента:
memory = SequentialMemory(limit=100000, window_length=1)
# чтобы не застрять с локальном минимуме, действия модели полезно "встряхивать" случайным поведением
# с помощью Процесса Орнштейна – Уленбека
random_process = OrnsteinUhlenbeckProcess(size=nb_actions, theta=.15, mu=0., sigma=.3)
# Создаем agent из класса DDPGAgent
agent = DDPGAgent(nb_actions=nb_actions, actor=actor, critic=critic, critic_action_input=action_input,
memory=memory, nb_steps_warmup_critic=100, nb_steps_warmup_actor=100,
random_process=random_process, gamma=.99, target_model_update=1e-3)
agent.compile(Adam(learning_rate=.001, clipnorm=1.), metrics=['mae'])
# Обучим процесс на nb_steps шагах,
# nb_max_episode_steps ограничивает количество шагов в одном эпизоде
agent.fit(env, nb_steps=100000, visualize=True, verbose=1, nb_max_episode_steps=Epochs)
# Тестируем обученую сеть на 5 эпизодах
agent.test(env, nb_episodes=5, visualize=True, nb_max_episode_steps=Epochs)
env.close()
Результат
Давайте посмотрим на действия обученного нано робота. Изменим для наглядности параметры среды
v0 = 4 # линейная скорость
N = 1000 # количество бактерий
Epochs = 500 # количество шагов
L = 300 # размер области
R = 5 # радиус взаимодействия
observation_R = 2*R # Радиус видимости соседей
fig = plt.figure()
camera = Camera(fig)
random.seed(123)
theCure = Cure()
observation = theCure.reset()
# информационная плашка
props = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
sum_reward = 0
for i in range(200):
action = np.sum(actor.predict(observation.reshape((1,1,5))))# % (2*np.pi)
observation, reward, done, _ = theCure.step(action)
sum_reward += reward
if done:
print('Победа на шаге',i, ' захвачено ',observation[0]*20,'бактерий. Награда ',sum_reward)
break
# покажем бактерий
bacteria_x,bacteria_y = theCure.get_bacteria()
plt.scatter(bacteria_x, bacteria_y, c='red') # метод, отображающий данные в виде точек
# покажем робота
x, y, r = theCure.get_position()
plt.scatter(x, y, c='blue')
fig = plt.gcf()
ax = fig.gca()
circle = plt.Circle((x, y), r, color='b', fill=False)
ax.add_patch(circle)
textstr = '\n'.join((
r'epoch=%d' % (i, ),
r'points=%d' % (reward, ),
))
ax.text(0.05, 0.95, textstr, transform=ax.transAxes, fontsize=14,
verticalalignment='top', bbox=props)
camera.snap()
print('Итоговое вознаграждение',sum_reward)
theCure.close()
animation = camera.animate()
#animation.save('celluloid_minimal.gif', writer = 'imagemagick')
animation
Выводы
Существует дефицит информации по RL даже на английском языке, вам объяснят основы, покажут пару стандартных задач из арсенала Open Gym, на этом - все. Документация по Keras-RL не выдерживает никакой критики.
Обучение с подкреплением имеет свои нюансы, например длительное обучение на 0.5-1 млн. шагов похоже приводит к переобучению. Сеть начинает выдавать крайние значения -1,1 ни как не реагируя на среду.
При планировании актера, если существует непрерывный допустимый диапазон органов управления, последний нейрон лучше зажать активациями sigmoid(0,1) или tanh(-1,+1) вместо linear. Затем в step() среды развернуть до требуемого диапазона.
Отдельно надо отметить, что набор подаваемых данных должен быть адекватен задаче. Не получится научить агента вождению не показывая дороги. В нашем случае пришлось показать бактерии ситуацию чуть за пределами радиуса R. Без этого наш робот просто тащился за последней бактерией в рое, боясь быть наказанным и не понимая как получить награду.
Через тернии лежит путь к звездам. Буду рад, если кому-то помог разобраться в этой интереснейшей теме.
Комментарии (8)
uchitel
31.10.2021 08:38+2Есть такая библиотека - rlib, в которой вроде бы многое идет из коробки. Но RL не панацея, по крайней мере в задачах с множественной неопределенностью и большим коэффициентом ветвления ситуаций (особенно дискретных). Многие уповают на концепции типа альфаго, вполне возможно, что это как-то поможет.
Если интересно двигать эту тему вперед (мне самому очень интересно, но времени нет), то можете попробовать сделать симулятор в котором размер награды зависит от цепочки действий, а не одного действия, а сам "мир" является случайным. Любой успех в этом направлении имел бы огромную практическую пользу, так как многие процессы, например, производства, как раз и являются цепочками действий. Иногда, использование стохастического программирования в таких процессах, позволяет добиться экономии в 70-80%.Есть хорошая книга "обучение с подкреплением" Саттон и Барто. В ней мало чего про использование DL, но много полезных концепций.
В общем:1) статья огонь!
2) вам успехов!
vandriichuk
31.10.2021 09:03А какая практическая ценность данного примера? Не могу додуматься
Dmi3Ut Автор
31.10.2021 12:36Ценность конечно не в нанороботе. Задача была в использовании произвольной среды. Среды которую можно запрограммировать без рамок Атари и т.д. в Open Gym. Удивило почти полное отсутствие информации на русском, минимум на английском. В результате простая задача затянулась на несколько недель. Кому то она может сэкономить хотя-бы неделю.
Yerin
29.11.2021 06:37+1Сейчас занимаюсь задачей оптимизации планирования производства (flexible job shop problem) с помощью RL, и тоже столкнулась с тем, что большинство найденных материалов по RL связаны с играми и gym.
Kilorad
Что с быстродействием? Через сколько кадров после пуска RL начинает вести себя систематически лучше, чем рандом?
Насколько такая система переносима на более "взрослые" задачи, вроде игр Атари, где на входе картинка с экрана?
Dmi3Ut Автор
В задаче я как раз уходил от "взрослых" игр серии Gym. Таких примеров в интернете - масса. При всем уважении к сайту тренажеру, - их практическая применимость заканчивается за порогом gym-качалки.
На обучение этой задачи ушло около получаса на Google Colab. Причем GPU не давал преимуществ.
Dmi3Ut Автор
Динамику обучения я не оценивал. Вывод keras-rl можно увидеть здесь