Всем привет! Давно уже хотел написать статью о UniRx на Unity3d. Начнем с небольшой философии RX программирования. Например, разрабатывая игру, мы создаем кнопку, наблюдаем событие клика этой кнопки и реагируем на это каким нибудь кодом.

Реактивное программирование — это всё то же самое, только на стероидах, то есть мы можем создавать потоки данных всего. И также наблюдать за ними и реагировать. Update, OnCollisionEnter, Coroutine, Event, Mouse input, Keyboard input, Joystick input — все это потоки.
Все что нас окружает это потоки.

image

Кроме того, нам предоставляется потрясающий набор функций для объединения, создания и фильтрации любого из этих потоков. Именно здесь происходит «функциональная» магия. Поток может использоваться как данные для другого. Даже несколько потоков могут использоваться как данные в другой поток. Вы можете объединить два потока. Вы можете фильтровать поток, чтобы получить еще один, который имеет только те события, которые вас интересуют. Вы можете сопоставить значения данных из одного потока в другой новый.

Streams


В этом примере мы отслеживаем нажатие клавиши и превращаем его в событие.

void Start () {
  Observable.EveryUpdate() // поток update
    .Where(_ => Input.anyKeyDown) // фильтруем на нажатие любой клавиши
    .Select(_ => Input.inputString) // выбираем нажатую клавишу
    .Subscribe (x => { // подписываемся
        OnKeyDown (x); // вызываем метод OnKeyDown c параметром нажатой клавиши
    }).AddTo (this); // привязываем подписку к gameobject-у
}
private void OnKeyDown (string keyCode) {
  switch (keyCode) {
  case "w":
    Debug.Log ("keyCode: W");
    break;
  default:
    Debug.Log ("keyCode: "+keyCode);
    break;
  }
}

MultiThreading


В данном примере мы выполняем тяжелый метод в thread-e и уже полученный результат используем в main thread-e.

Observable.Start (() => { // создаем Observable из thread
  int n = 100000000;
  int res = Fibonacci(n); // выполняем тяжелую операцию
  return res; // возвращаем результат
}).ObserveOnMainThread () // наблюдаем за результатом в main thread-е
  .Subscribe (xs => { // подписываемся
      Debug.Log ("res: "+xs); // получаем результат уже в main thread-е
  }).AddTo (this);

Таким образом очень удобно выполнять долгие вычисления и работы с сетью.

HttpRequest


Раз уж заговорили о работы с сетью, вот небольшой пример работы с hhtp запросами.

var request = ObservableWWW.Get("http://api.duckduckgo.com/?q=habrahabr&format=json")
    .Subscribe(x => { // подписываемся
        Debug.Log ("res: "+x); // результат
    }, ex => { // вызывается при ошибке
        Debug.Log ("error: "+ex);
    });
//  request.Dispose (); если захотим отменить

Coroutines
Здесь мы одновременно запускаем 3 корутины, превращаем их в потоки и обьединяем их в один поток. Далее подписываемся на этот поток.

void Start () {
    Observable.WhenAll ( // метод WhenAll принимает в себя Observable потоки
      Observable.FromCoroutine (AsyncA), // здесь мы превращаем корутины в Observable
      Observable.FromCoroutine (AsyncB),
      Observable.FromCoroutine (AsyncC)
    ).Subscribe (_ => { // подписываемся на этот поток который создал нам WhenAll
      Debug.Log ("end");
    }).AddTo (this);
}

IEnumerator AsyncA () {
      Debug.Log("a start");
      yield return new WaitForSeconds (1);
      Debug.Log("a end");
}

IEnumerator AsyncB () {
      Debug.Log("b start");
      yield return new WaitForFixedUpdate ();
      Debug.Log("b end");
}

IEnumerator AsyncC () {
      Debug.Log("c start");
      yield return new WaitForEndOfFrame ();
      Debug.Log("c end");
}

Также мы можем запускать корутины поочередно

Observable.FromCoroutine (AsyncA) // запускаем корутину AsyncA
    .SelectMany (AsyncB) // AsyncB запуститься только после окончания AsyncA
    .SelectMany (AsyncC) // И уже потом AsynC
    .Subscribe(_ => {
        Debug.Log ("end");
}).AddTo (this);

Асинхронная загрузка сцены


SceneManager.LoadSceneAsync ("HeavyScene") // загружаем асинхронно сцену
  .AsAsyncOperationObservable () // превращаем его в Observable поток
  .Do (x => { // вызывается при выполнении процесса
      Debug.Log ("progress: " + x.progress); // показываем прогресс
  }).Subscribe (_ => { // подписываемся
      Debug.Log ("loaded");
  }).AddTo (this);

Очень удобно использовать если вначале загрузить легкую сцену с loading screen-ом и уже
потом асинхронно загружать тяжелую сцену, анимации в loading screen-е не будут подвисать.

Асинхронная загрузка ресурсов


void Start () {
    SpriteRenderer spriteRenderer = GetComponent<SpriteRenderer> ();

    Resources.LoadAsync<Sprite> ("sprite") // асинхронно загружаем спрайт
        .AsAsyncOperationObservable () // превращаем его в Observable поток
        .Subscribe (xs => { // подписываемся
            if (xs.asset != null) { // проверяем на null
                Sprite sprite = xs.asset as Sprite; // кастим asset в sprite
                spriteRenderer.sprite = sprite; // работаем со sprite-ом
            }
        }).AddTo (this);
}

Таким образом можно загружать prefab-ы, текстовые ресурсы и так далее.

Тоже очень удобная вещь в использовании когда надо загружать тяжелые префабы или спрайты, игра становиться по настоящему отзывчивой потому как пока что-то грузится игра не тормозит.

Timers


void Start () {
    Observable.Timer (System.TimeSpan.FromSeconds (3)) // создаем timer Observable
        .Subscribe (_ => { // подписываемся
            Debug.Log ("через 3 секунды");
        }).AddTo (disposables); // привязываем подписку к disposable

    Observable.Timer (System.TimeSpan.FromSeconds (1)) // создаем timer Observable
        .Repeat () // делает таймер циклическим
        .Subscribe (_ => { // подписываемся
            Debug.Log ("каждую 1 секунду");
        }).AddTo (disposables); // привязываем подписку к disposable
}

void OnEnable () { // создаем disposable
    disposables = new CompositeDisposable();
}

void OnDisable () { // уничтожаем подписки
    if (disposables != null) {
        disposables.Dispose ();
    }
}

MessageBroker


MessageBroker в UniRx — это система издатель-подписчик, базирующаяся на RX, отфильтрованная по типу.

Издатель-подписчик (англ. publisher-subscriber или англ. pub/sub) — поведенческий шаблон проектирования передачи сообщений, в котором отправители сообщений, именуемые издателями (англ. publishers), напрямую не привязаны программным кодом отправки сообщений к подписчикам (англ. subscribers). Вместо этого сообщения делятся на классы и не содержат сведений о своих подписчиках, если таковые есть. Аналогичным образом подписчики имеют дело с одним или несколькими классами сообщений, абстрагируясь от конкретных издателей.

Иногда разрабатывая игру нам необходимо вызвать метод у компонента к которому мы не имеем прямого доступа. Конечно мы можем использовать DI или Singleton, но это все зависит от конкретного случая. А это когда нам надо вызвать метод у множества обьектов или же когда просто хотим использовать MessageBroker.

Как было написано выше фильтрация делается по типу и чтобы не создавать кучу классов на каждого подписчика я создал класс MessageBase в котором есть fields: sender(MonoBehaviour) для упрощенного дебаггинга и функционала, id(int) через него и мы будем делать фильтрацию, data(System.Object) для передачи каких нибудь данных которые нужно кастить. Также в это классе есть статитеский метод(Create) который создает и возвращает нам MessageBase.

MessageBase

public class MessageBase {
    public MonoBehaviour sender {get; private set;} // MonoBehaviour отправителя
    public int id {get; private set;} // id сообщения
    public System.Object data {get; private set;} // данные

        public MessageBase (MonoBehaviour sender, int id, System.Object data) {
            this.sender = sender;
            this.id = id;
            this.data = data;
        }
        
        public static MessageBase Create (MonoBehaviour sender,
            int id, System.Object data) {
                return new MessageBase (sender, id, data);
        }
}

Также я создал класс ServiceShareData для упрощенного дебагинга где на данный моммент хранятся id всех сообщении. Это нужно для того чтобы в процессе разработки не было утечек сообщении и путаницы в коде.

ServiceShareData

public class ServiceShareData {
    public const int MSG_ATTACK = 1001;
}

Пример отправки сообщения

MessageBroker.Default
  .Publish (MessageBase.Create (
      this, // sender MonoBehaviour
      ServiceShareData.MSG_ATTACK, // message id
      "attack!" // data System.Ojbect
  ));

Метод Publish отправляет класс который фильтруется по типу. Мы здесь отправляем MessageBase с sender-ом this, id по которому и будет фильтрация, и в конце data которая по идее может быть чем угодно.

Пример принятия сообщения

public CompositeDisposable disposables;
void OnEnable () {
    disposables = new CompositeDisposable();
    MessageBroker.Default
        .Receive<MessageBase>() // задаем тип MessageBase
        .Where(msg => msg.id == ServiceShareData.MSG_ATTACK)//фильтруем message по id
        .Subscribe(msg => { // подписываемся
            string data = (string)msg.data; // кастим данные в нужный формат
            // можем работать как и с sender-ом так и с данными
            Debug.Log ("sender:"+msg.sender.name+" receiver:"+name+" data:"+data);
            }).AddTo (disposables);
}
void OnDisable () { // отписываемся
    if (disposables != null) {
        disposables.Dispose ();
    }
}

Метод Receive имеет в себе generic по которому идет фильтрация по типу. В Where мы уже делаем фильтрацию по id сообщения. Важно понимать что подписчик(получатель сообщения) будет кастить data в зависимости от id сообщения.

MVP


Пример классического MVP паттерна. Где Model служит для хранения данных а также сериализации десериализации данных и так далее. View для отображения этих самых данных. Ну и Presenter отвечающий за бизнес логику.

Model

    public class SomeModel {
        public ReactiveProperty<int> count { get; private set; }
            
        public SomeModel () {
            count = new ReactiveProperty<int> (0); // инициализируем ReactiveProperty c 0
        }
    }

Как вы видите здесь есть ReactiveProperty в данном случае это int на стероидах на изменения которого мы можем подписаться и реагировать.

View

public class SomeView : MonoBehaviour {
    public Text someText;
    public Button someButton;

    public void RenderCount (int count) { // отображаем данные count
        someText.text = count.ToString ();
    }

    public void AnimateButton () { // анимируем кнопку
        someButton.transform
            .DOShakeScale (0.5F, 0.3F) // анимируем кнопку
            .OnComplete (() => { // по окончании анимации возвращаем прежнии масштаб
                someButton.transform.localScale = Vector3.one;
            });
    }
}

View имеет в себе текст и кнопку. А также методы анимирования кнопки и отображения данных. Для анимации используется DoTween asset.

Presenter

public class SomePresenter : MonoBehaviour {
    public SomeView someView; // view к которому имеем прямой доступ
    public SomeModel someModel = new SomeModel (); // model 

    void Start () {
        someModel.count // ReactiveProperty count
            .ObserveEveryValueChanged (x => x.Value) // отслеживаем изменения в нем
            .Subscribe (xs => { // подписываемся
                someView.RenderCount (xs); // вызываем метод отображения данных
            }).AddTo (this);

        someView.someButton // кнопка
            .OnClickAsObservable () // превращаем клик в Observable поток
            .Subscribe (_ => OnClick (someView.someButton.GetInstanceID ()))
            .AddTo (this);
    }
    
    private void OnClick (int buttonId) {
        if (buttonId == someView.someButton.GetInstanceID ()) {
            someModel.count.Value++;
            someView.AnimateButton ();
        }
    }
}

Как вы видите мы вначале подписываемся на изменения в reactiveProperty для того чтобы отрисовывать только изменения.

Далее подписываемся на кнопку где при клике будет вызываться метод OnClick в который запихиваем instanceId(Unity3d гарантирует его уникальность) этой кнопки.

А в OnClick идет проверка на этот самый intanceId, далее при клике увеличиваем count(reactiveProperty) и анимируем кнопку.

Буду рад вопросам и комментариям.

Рекомендую к прочтению:
github.com/neuecc/UniRx
gist.github.com/staltz/868e7e9bc2a7b8c1f754
www.reactivemanifesto.org

Исходный код

Комментарии (22)


  1. ZimM
    19.11.2017 03:24

    К сожалению, UniRx умирает. Обновления взять все реже и реже (последнее полгода назад), автор на контакт толком не выходит… А жаль, действительно ведь крутая вещь.


    1. Szer
      19.11.2017 03:35

      Попробуйте Akka.Streams
      Живой проект с похожим функционалом.


      1. Shersh
        19.11.2017 14:39

        Причем тут акка? Акка.net — серверная технология написанная на .net 4.5 и поддерживающая .net core, а мы тут о Unity говорим. В которой до сих пор еще .net framework 3.5


        1. Szer
          19.11.2017 16:20

          Про 3.5 в юнити не знал.
          А вообще Akka.Streams это про реактивные стримы, а не про акторов.


          1. Shersh
            19.11.2017 19:30

            ну они построены поверх Akka и используются в этой экосистеме. Так то есть нормальные Reactive Extensions в .net =)


            1. Szer
              20.11.2017 00:10

              Вообще не рядом.
              Akka.Strems — отдельный продукт, который реализует стандарт Reactive Streams.
              Да он использует акторов из фреймворка Akka для своей внутренней деятельности, но используя Akka.Streams программист пишет код как на обычном RX (подписка, мёржи потоков, параллелизация, тротлы, буферы, вот это вот всё).


              При этом "нормальные" Reactive Extensions не реализуют стандарт Reactive Streams, а back pressure в issue висит с 2014го года


              1. Shersh
                20.11.2017 11:24

                Да, back pressure они до сих пор не могут реализовать, что печально. Но к слову, у Akka.net есть в планах реализовать взаимодействие между Streams и Rx — github.com/akkadotnet/akka.net/pull/3112


        1. Lailore
          19.11.2017 16:51

          Начиная с версии 2017.1 .net 4.6 :)


          1. Shersh
            19.11.2017 19:29

            В проде или «предварительная версия» ?) если последнее то это пока не всчет.


    1. Lailore
      19.11.2017 03:50

      Вы вводите в заблуждение. Последние обновление 9 дней назад, просто версия в сторе не обновляется, так как активно делается поддержка .net 4.6. Брать надо не из стора, а с репозитория


      1. ZimM
        19.11.2017 05:16

        Про репозиторий и говорил, конечно. Но да, действительно, 9 дней назад был апдейт, а я проверял последний раз месяц назад. До этого последний коммит был аж в июле, и то, чисто для поддержки 2017.1, ничего нового не было.


        1. Lailore
          19.11.2017 05:26

          Это большое дело — поддержка .net 4.6
          А какие вам нужны еще обновления, помимо поддержки новых версий Unity и рантайма?


          1. ZimM
            19.11.2017 05:39

            Фиксы багов, улучшение производительности, реализация остальных операторов из Rx, вот это вот всё. Но, конечно, радует, что проект как минимум жив.


    1. Neuyazvimy1 Автор
      19.11.2017 11:56

      Насколько я помню в марте-апреле этого года Yoshifumi Kawai хотел остановить поддержку UniRx. Даже был пост на Medium-е. Но после Unite Tokyo он поменял свое мнение и взялся обратно за поддержку проекта. Тот пост на Medium-е был удален. UniRx живее живых)
      forum.unity.com/threads/unirx-reactive-extensions-for-unity.248535/page-6#post-3064218


  1. Nirvano
    19.11.2017 13:50

    Последний пример написан совершенно непонятно.


    1) Смотрим на преобразования с кнопкой. Взяли кнопку, преобразовали ее с помощью OnClickAsObservable. На подписке уже странность: мы в место того что бы взять кнопку из параметра, хардкодим передачу в параметр конкретной кнопки на которую мы подписались. Далее еще странне: мы передаем даже не кнопку, а ее ид. А в самом методе опять сравниваем ее с захардкоженой конкретной кнопкой. В чем выгода относительно простой подписки?


    2) Почему someView.AnimateButton происходит при нажатии в обработчике события кнопки, а не рядом с someView.RenderCount? Суть же в том, что бы 1 раз написать как должно вью работать при изменении модели, а здесь этот момент совершенно упущен.


    1. Neuyazvimy1 Автор
      19.11.2017 15:41

      1) Выгоды относительно простой подписки нет. Кому как удобнее это реализовывать.
      2) Суть показать mvp и reactive properties. someView.AnimateButton вызывается в обработчике событий кнопки потому что анимируется кнопка от нажатия этой кнопки. А не потому что увеличивается каунт.


    1. Apathetic
      19.11.2017 20:35

      В Rx, да и вообще в паттерне Observable, в самой по себе подписке выгоды нет, это обычный callback. Выгода и вся мощь этого подхода проявляет себя в комбинации стримов друг с другом. Скажем, слушать какое-то событие только между событием А и событием Б. На обычных коллбеках это все реализуется через какое-то внешнее состояние и очень легко ломается при изменениях. В случае observable — все сравнительно легко покрывается тестами и чутко относится к изменениям.


  1. esc_fantasy
    19.11.2017 15:44

    А в чем преимущество использования rx в разработке игр на юнити? Я правда не понимаю.


    1. Neuyazvimy1 Автор
      19.11.2017 15:57

      Мне лично Rx дает удобство, linq — везде. Ну и скорость разработки соответственно.
      Есть куча игр которые были разработанны без Rx. Я бы посоветовал вам пока просто присмотреться к UniRx. А потом если почувствуете надобность вы всегда можете использовать его. Придерживаетесь KISS принципа.
      en.wikipedia.org/wiki/KISS_principle


  1. LeonThundeR
    19.11.2017 21:12

    Рекомендую этот канал всем кого заинтересовал UniRx. Там скоро должен появиться толковый тутор https://m.youtube.com/watch?v=kFoBvjwzbNA


  1. MramidX
    20.11.2017 16:31

    Пробовал UniRx. У меня в рантайме строиться модель, в корутине. Меш, текстуры, все.Так вот в основном потоке все работает. А в другие потоки вынести не получается из-за того, что Unity API должно выполняться только в мейн потоке.
    Может кто знает как решить проблему?


    1. Neuyazvimy1 Автор
      20.11.2017 16:36

      Выносить Unity API в другие thread-ы не советую. Есть большой шанс того что в игре будет кучу рандомных багов. Решение в main-thread-е, просто нужно это все по другому как-то использовать.