Реактивное программирование — это всё то же самое, только на стероидах, то есть мы можем создавать потоки данных всего. И также наблюдать за ними и реагировать. Update, OnCollisionEnter, Coroutine, Event, Mouse input, Keyboard input, Joystick input — все это потоки.
Все что нас окружает это потоки.

Кроме того, нам предоставляется потрясающий набор функций для объединения, создания и фильтрации любого из этих потоков. Именно здесь происходит «функциональная» магия. Поток может использоваться как данные для другого. Даже несколько потоков могут использоваться как данные в другой поток. Вы можете объединить два потока. Вы можете фильтровать поток, чтобы получить еще один, который имеет только те события, которые вас интересуют. Вы можете сопоставить значения данных из одного потока в другой новый.
Streams
В этом примере мы отслеживаем нажатие клавиши и превращаем его в событие.
void Start () {
Observable.EveryUpdate() // поток update
.Where(_ => Input.anyKeyDown) // фильтруем на нажатие любой клавиши
.Select(_ => Input.inputString) // выбираем нажатую клавишу
.Subscribe (x => { // подписываемся
OnKeyDown (x); // вызываем метод OnKeyDown c параметром нажатой клавиши
}).AddTo (this); // привязываем подписку к gameobject-у
}
private void OnKeyDown (string keyCode) {
switch (keyCode) {
case "w":
Debug.Log ("keyCode: W");
break;
default:
Debug.Log ("keyCode: "+keyCode);
break;
}
}
MultiThreading
В данном примере мы выполняем тяжелый метод в thread-e и уже полученный результат используем в main thread-e.
Observable.Start (() => { // создаем Observable из thread
int n = 100000000;
int res = Fibonacci(n); // выполняем тяжелую операцию
return res; // возвращаем результат
}).ObserveOnMainThread () // наблюдаем за результатом в main thread-е
.Subscribe (xs => { // подписываемся
Debug.Log ("res: "+xs); // получаем результат уже в main thread-е
}).AddTo (this);
Таким образом очень удобно выполнять долгие вычисления и работы с сетью.
HttpRequest
Раз уж заговорили о работы с сетью, вот небольшой пример работы с hhtp запросами.
var request = ObservableWWW.Get("http://api.duckduckgo.com/?q=habrahabr&format=json")
.Subscribe(x => { // подписываемся
Debug.Log ("res: "+x); // результат
}, ex => { // вызывается при ошибке
Debug.Log ("error: "+ex);
});
// request.Dispose (); если захотим отменить
Coroutines
Здесь мы одновременно запускаем 3 корутины, превращаем их в потоки и обьединяем их в один поток. Далее подписываемся на этот поток.
void Start () {
Observable.WhenAll ( // метод WhenAll принимает в себя Observable потоки
Observable.FromCoroutine (AsyncA), // здесь мы превращаем корутины в Observable
Observable.FromCoroutine (AsyncB),
Observable.FromCoroutine (AsyncC)
).Subscribe (_ => { // подписываемся на этот поток который создал нам WhenAll
Debug.Log ("end");
}).AddTo (this);
}
IEnumerator AsyncA () {
Debug.Log("a start");
yield return new WaitForSeconds (1);
Debug.Log("a end");
}
IEnumerator AsyncB () {
Debug.Log("b start");
yield return new WaitForFixedUpdate ();
Debug.Log("b end");
}
IEnumerator AsyncC () {
Debug.Log("c start");
yield return new WaitForEndOfFrame ();
Debug.Log("c end");
}
Также мы можем запускать корутины поочередно
Observable.FromCoroutine (AsyncA) // запускаем корутину AsyncA
.SelectMany (AsyncB) // AsyncB запуститься только после окончания AsyncA
.SelectMany (AsyncC) // И уже потом AsynC
.Subscribe(_ => {
Debug.Log ("end");
}).AddTo (this);
Асинхронная загрузка сцены
SceneManager.LoadSceneAsync ("HeavyScene") // загружаем асинхронно сцену
.AsAsyncOperationObservable () // превращаем его в Observable поток
.Do (x => { // вызывается при выполнении процесса
Debug.Log ("progress: " + x.progress); // показываем прогресс
}).Subscribe (_ => { // подписываемся
Debug.Log ("loaded");
}).AddTo (this);
Очень удобно использовать если вначале загрузить легкую сцену с loading screen-ом и уже
потом асинхронно загружать тяжелую сцену, анимации в loading screen-е не будут подвисать.
Асинхронная загрузка ресурсов
void Start () {
SpriteRenderer spriteRenderer = GetComponent<SpriteRenderer> ();
Resources.LoadAsync<Sprite> ("sprite") // асинхронно загружаем спрайт
.AsAsyncOperationObservable () // превращаем его в Observable поток
.Subscribe (xs => { // подписываемся
if (xs.asset != null) { // проверяем на null
Sprite sprite = xs.asset as Sprite; // кастим asset в sprite
spriteRenderer.sprite = sprite; // работаем со sprite-ом
}
}).AddTo (this);
}
Таким образом можно загружать prefab-ы, текстовые ресурсы и так далее.
Тоже очень удобная вещь в использовании когда надо загружать тяжелые префабы или спрайты, игра становиться по настоящему отзывчивой потому как пока что-то грузится игра не тормозит.
Timers
void Start () {
Observable.Timer (System.TimeSpan.FromSeconds (3)) // создаем timer Observable
.Subscribe (_ => { // подписываемся
Debug.Log ("через 3 секунды");
}).AddTo (disposables); // привязываем подписку к disposable
Observable.Timer (System.TimeSpan.FromSeconds (1)) // создаем timer Observable
.Repeat () // делает таймер циклическим
.Subscribe (_ => { // подписываемся
Debug.Log ("каждую 1 секунду");
}).AddTo (disposables); // привязываем подписку к disposable
}
void OnEnable () { // создаем disposable
disposables = new CompositeDisposable();
}
void OnDisable () { // уничтожаем подписки
if (disposables != null) {
disposables.Dispose ();
}
}
MessageBroker
MessageBroker в UniRx — это система издатель-подписчик, базирующаяся на RX, отфильтрованная по типу.
Издатель-подписчик (англ. publisher-subscriber или англ. pub/sub) — поведенческий шаблон проектирования передачи сообщений, в котором отправители сообщений, именуемые издателями (англ. publishers), напрямую не привязаны программным кодом отправки сообщений к подписчикам (англ. subscribers). Вместо этого сообщения делятся на классы и не содержат сведений о своих подписчиках, если таковые есть. Аналогичным образом подписчики имеют дело с одним или несколькими классами сообщений, абстрагируясь от конкретных издателей.
Иногда разрабатывая игру нам необходимо вызвать метод у компонента к которому мы не имеем прямого доступа. Конечно мы можем использовать DI или Singleton, но это все зависит от конкретного случая. А это когда нам надо вызвать метод у множества обьектов или же когда просто хотим использовать MessageBroker.
Как было написано выше фильтрация делается по типу и чтобы не создавать кучу классов на каждого подписчика я создал класс MessageBase в котором есть fields: sender(MonoBehaviour) для упрощенного дебаггинга и функционала, id(int) через него и мы будем делать фильтрацию, data(System.Object) для передачи каких нибудь данных которые нужно кастить. Также в это классе есть статитеский метод(Create) который создает и возвращает нам MessageBase.
MessageBase
public class MessageBase {
public MonoBehaviour sender {get; private set;} // MonoBehaviour отправителя
public int id {get; private set;} // id сообщения
public System.Object data {get; private set;} // данные
public MessageBase (MonoBehaviour sender, int id, System.Object data) {
this.sender = sender;
this.id = id;
this.data = data;
}
public static MessageBase Create (MonoBehaviour sender,
int id, System.Object data) {
return new MessageBase (sender, id, data);
}
}
Также я создал класс ServiceShareData для упрощенного дебагинга где на данный моммент хранятся id всех сообщении. Это нужно для того чтобы в процессе разработки не было утечек сообщении и путаницы в коде.
ServiceShareData
public class ServiceShareData {
public const int MSG_ATTACK = 1001;
}
Пример отправки сообщения
MessageBroker.Default
.Publish (MessageBase.Create (
this, // sender MonoBehaviour
ServiceShareData.MSG_ATTACK, // message id
"attack!" // data System.Ojbect
));
Метод Publish отправляет класс который фильтруется по типу. Мы здесь отправляем MessageBase с sender-ом this, id по которому и будет фильтрация, и в конце data которая по идее может быть чем угодно.
Пример принятия сообщения
public CompositeDisposable disposables;
void OnEnable () {
disposables = new CompositeDisposable();
MessageBroker.Default
.Receive<MessageBase>() // задаем тип MessageBase
.Where(msg => msg.id == ServiceShareData.MSG_ATTACK)//фильтруем message по id
.Subscribe(msg => { // подписываемся
string data = (string)msg.data; // кастим данные в нужный формат
// можем работать как и с sender-ом так и с данными
Debug.Log ("sender:"+msg.sender.name+" receiver:"+name+" data:"+data);
}).AddTo (disposables);
}
void OnDisable () { // отписываемся
if (disposables != null) {
disposables.Dispose ();
}
}
Метод Receive имеет в себе generic по которому идет фильтрация по типу. В Where мы уже делаем фильтрацию по id сообщения. Важно понимать что подписчик(получатель сообщения) будет кастить data в зависимости от id сообщения.
MVP
Пример классического MVP паттерна. Где Model служит для хранения данных а также сериализации десериализации данных и так далее. View для отображения этих самых данных. Ну и Presenter отвечающий за бизнес логику.
Model
public class SomeModel {
public ReactiveProperty<int> count { get; private set; }
public SomeModel () {
count = new ReactiveProperty<int> (0); // инициализируем ReactiveProperty c 0
}
}
Как вы видите здесь есть ReactiveProperty в данном случае это int на стероидах на изменения которого мы можем подписаться и реагировать.
View
public class SomeView : MonoBehaviour {
public Text someText;
public Button someButton;
public void RenderCount (int count) { // отображаем данные count
someText.text = count.ToString ();
}
public void AnimateButton () { // анимируем кнопку
someButton.transform
.DOShakeScale (0.5F, 0.3F) // анимируем кнопку
.OnComplete (() => { // по окончании анимации возвращаем прежнии масштаб
someButton.transform.localScale = Vector3.one;
});
}
}
View имеет в себе текст и кнопку. А также методы анимирования кнопки и отображения данных. Для анимации используется DoTween asset.
Presenter
public class SomePresenter : MonoBehaviour {
public SomeView someView; // view к которому имеем прямой доступ
public SomeModel someModel = new SomeModel (); // model
void Start () {
someModel.count // ReactiveProperty count
.ObserveEveryValueChanged (x => x.Value) // отслеживаем изменения в нем
.Subscribe (xs => { // подписываемся
someView.RenderCount (xs); // вызываем метод отображения данных
}).AddTo (this);
someView.someButton // кнопка
.OnClickAsObservable () // превращаем клик в Observable поток
.Subscribe (_ => OnClick (someView.someButton.GetInstanceID ()))
.AddTo (this);
}
private void OnClick (int buttonId) {
if (buttonId == someView.someButton.GetInstanceID ()) {
someModel.count.Value++;
someView.AnimateButton ();
}
}
}
Как вы видите мы вначале подписываемся на изменения в reactiveProperty для того чтобы отрисовывать только изменения.
Далее подписываемся на кнопку где при клике будет вызываться метод OnClick в который запихиваем instanceId(Unity3d гарантирует его уникальность) этой кнопки.
А в OnClick идет проверка на этот самый intanceId, далее при клике увеличиваем count(reactiveProperty) и анимируем кнопку.
Буду рад вопросам и комментариям.
Рекомендую к прочтению:
github.com/neuecc/UniRx
gist.github.com/staltz/868e7e9bc2a7b8c1f754
www.reactivemanifesto.org
Исходный код
Комментарии (22)
Nirvano
19.11.2017 13:50Последний пример написан совершенно непонятно.
1) Смотрим на преобразования с кнопкой. Взяли кнопку, преобразовали ее с помощью OnClickAsObservable. На подписке уже странность: мы в место того что бы взять кнопку из параметра, хардкодим передачу в параметр конкретной кнопки на которую мы подписались. Далее еще странне: мы передаем даже не кнопку, а ее ид. А в самом методе опять сравниваем ее с захардкоженой конкретной кнопкой. В чем выгода относительно простой подписки?
2) Почему someView.AnimateButton происходит при нажатии в обработчике события кнопки, а не рядом с someView.RenderCount? Суть же в том, что бы 1 раз написать как должно вью работать при изменении модели, а здесь этот момент совершенно упущен.
Neuyazvimy1 Автор
19.11.2017 15:411) Выгоды относительно простой подписки нет. Кому как удобнее это реализовывать.
2) Суть показать mvp и reactive properties. someView.AnimateButton вызывается в обработчике событий кнопки потому что анимируется кнопка от нажатия этой кнопки. А не потому что увеличивается каунт.
Apathetic
19.11.2017 20:35В Rx, да и вообще в паттерне Observable, в самой по себе подписке выгоды нет, это обычный callback. Выгода и вся мощь этого подхода проявляет себя в комбинации стримов друг с другом. Скажем, слушать какое-то событие только между событием А и событием Б. На обычных коллбеках это все реализуется через какое-то внешнее состояние и очень легко ломается при изменениях. В случае observable — все сравнительно легко покрывается тестами и чутко относится к изменениям.
esc_fantasy
19.11.2017 15:44А в чем преимущество использования rx в разработке игр на юнити? Я правда не понимаю.
Neuyazvimy1 Автор
19.11.2017 15:57Мне лично Rx дает удобство, linq — везде. Ну и скорость разработки соответственно.
Есть куча игр которые были разработанны без Rx. Я бы посоветовал вам пока просто присмотреться к UniRx. А потом если почувствуете надобность вы всегда можете использовать его. Придерживаетесь KISS принципа.
en.wikipedia.org/wiki/KISS_principle
LeonThundeR
19.11.2017 21:12Рекомендую этот канал всем кого заинтересовал UniRx. Там скоро должен появиться толковый тутор https://m.youtube.com/watch?v=kFoBvjwzbNA
MramidX
20.11.2017 16:31Пробовал UniRx. У меня в рантайме строиться модель, в корутине. Меш, текстуры, все.Так вот в основном потоке все работает. А в другие потоки вынести не получается из-за того, что Unity API должно выполняться только в мейн потоке.
Может кто знает как решить проблему?Neuyazvimy1 Автор
20.11.2017 16:36Выносить Unity API в другие thread-ы не советую. Есть большой шанс того что в игре будет кучу рандомных багов. Решение в main-thread-е, просто нужно это все по другому как-то использовать.
ZimM
К сожалению, UniRx умирает. Обновления взять все реже и реже (последнее полгода назад), автор на контакт толком не выходит… А жаль, действительно ведь крутая вещь.
Szer
Попробуйте Akka.Streams
Живой проект с похожим функционалом.
Shersh
Причем тут акка? Акка.net — серверная технология написанная на .net 4.5 и поддерживающая .net core, а мы тут о Unity говорим. В которой до сих пор еще .net framework 3.5
Szer
Про 3.5 в юнити не знал.
А вообще Akka.Streams это про реактивные стримы, а не про акторов.
Shersh
ну они построены поверх Akka и используются в этой экосистеме. Так то есть нормальные Reactive Extensions в .net =)
Szer
Вообще не рядом.
Akka.Strems — отдельный продукт, который реализует стандарт Reactive Streams.
Да он использует акторов из фреймворка Akka для своей внутренней деятельности, но используя Akka.Streams программист пишет код как на обычном RX (подписка, мёржи потоков, параллелизация, тротлы, буферы, вот это вот всё).
При этом "нормальные" Reactive Extensions не реализуют стандарт Reactive Streams, а back pressure в issue висит с 2014го года
Shersh
Да, back pressure они до сих пор не могут реализовать, что печально. Но к слову, у Akka.net есть в планах реализовать взаимодействие между Streams и Rx — github.com/akkadotnet/akka.net/pull/3112
Lailore
Начиная с версии 2017.1 .net 4.6 :)
Shersh
В проде или «предварительная версия» ?) если последнее то это пока не всчет.
Lailore
Вы вводите в заблуждение. Последние обновление 9 дней назад, просто версия в сторе не обновляется, так как активно делается поддержка .net 4.6. Брать надо не из стора, а с репозитория
ZimM
Про репозиторий и говорил, конечно. Но да, действительно, 9 дней назад был апдейт, а я проверял последний раз месяц назад. До этого последний коммит был аж в июле, и то, чисто для поддержки 2017.1, ничего нового не было.
Lailore
Это большое дело — поддержка .net 4.6
А какие вам нужны еще обновления, помимо поддержки новых версий Unity и рантайма?
ZimM
Фиксы багов, улучшение производительности, реализация остальных операторов из Rx, вот это вот всё. Но, конечно, радует, что проект как минимум жив.
Neuyazvimy1 Автор
Насколько я помню в марте-апреле этого года Yoshifumi Kawai хотел остановить поддержку UniRx. Даже был пост на Medium-е. Но после Unite Tokyo он поменял свое мнение и взялся обратно за поддержку проекта. Тот пост на Medium-е был удален. UniRx живее живых)
forum.unity.com/threads/unirx-reactive-extensions-for-unity.248535/page-6#post-3064218