Это вторая публикация по материалам нашей внутренней конференции Sync.NET. Первая публикация была посвящена многопоточности в .NET.

Реактивные расширения — звучит настолько круто, что напрашивается связь с реактивными самолетами. Никакой связи, конечно, нет, но это действительно отличный инструмент. Reactive происходит от слова react (реагировать), подразумевается, что система реагирует на изменения состояния. В процессе развития программного обеспечения возникла потребность, чтобы система умела реагировать на множество источников данных, была устойчива и чтобы разные модули не были тесно связаны.

Как правило, мы пишем код, в котором есть методы и функции, которые мы вызываем, получаем результат и его обрабатываем. Rx в свою очередь позволяет создавать события и обработчики, которые будут реагировать на них. Таким образом, система будет состоять из последовательности событий, которые будут сообщать об изменении состояния и должным образом реагировать на них.

Rx состоит из двух базовых абстракций в пространстве имен System начиная с .NET 4.0, а именно System.IObserver и System.IObservable. Как видно из названия, это реализация паттерна «наблюдатель» (Observer). В данной реализации IObservable выступает как subject, и очевидно, что IObserver это наблюдатель, который может подписываться на изменения. В платформе .NET уже есть реализация наблюдателя в виде событий (Events). Как уже упоминалось, Rx позволяет создавать последовательность событий, и само собой разумеется, что это можно сделать с помощью ивентов. Способы работы с Rx и Events отличаются, но об этом немного позже.

IObserver<in T>


Предоставляет механизм получения уведомлений. Интерфейс объявляет три метода:

void OnNext(T value) — предоставляет следующий элемент в последовательности.

void OnError(Exception ex) — позволяет передать Exception и адекватно его обработать. Подразумевается, что после этого сообщения последовательность заканчивается и наблюдателям больше не нужно следить за изменениями.

void OnCompleated() — сообщается, что последовательность закончилась и больше не будет новых сообщений, не нужно их ожидать.

IObservable<out T>


Производит уведомления и позволяет подписываться наблюдателям. Объявляет один метод:

IDisposable Subscribe(IObserver<T> observer) — принимает наблюдателя (IObserver) параметром и подписывает его на сообщения. Обратите внимание, что метод возвращает IDisposable, с помощью чего можно потом вызывать метод Dispose, тем самым отписав и уничтожив наблюдателя.

Если мы захотим реализовать IObservable, то нужно будет помимо метода Subscribe также реализовать логику, которая может отправлять новые сообщения, ошибки или сообщать об окончании последовательности. Получается, что также нужно будет реализовать интерфейс IObservable, для таких целей можно использовать тип Subject. Но чтобы его использовать, нужно будет с Nuget установить дополнительную библиотеку (Install-Package Rx-Mail), которая также предоставляет дополнительные расширения и возможность использовать LINQ.

using System;
using System.Reactive.Subjects;
namespace Demo
{
    class Program
    {     
        static Subject<int> sub = new Subject<int>();//Declare
  	 static void Main()
        {
sub.Subscribe(Console.WriteLine); //Subscribe

sub.OnNext(234); //Publish
	 }
    }
}


В этом примере создается новая последовательность, то есть Subject<int> (также можно назвать последовательность int’ов), затем на нее подписывается наблюдатель (в данном случае просто выводится в консоль каждое значение последовательности), и передается значение, которое выводится в консоль с помощью наблюдателя. Каждый раз, когда подписывается новый наблюдатель, ему начинают поставляться элементы последовательности. Но есть еще несколько реализаций с другим поведением:

ReplaySubject


using System;
using System.Reactive.Subjects;

namespace Demo
{
    class Program
    {       
        static ReplaySubject<int> sub = new ReplaySubject<int>();

        static void Main()
        {
            sub.OnNext(222);

            sub.Subscribe(Console.WriteLine);

            sub.OnNext(354);
        }
    }
}


ReplaySubject — поставляет все элементы последовательности независимо от того, когда был подписан наблюдатель.

BehaviorSubject


using System;
using System.Reactive.Subjects;

namespace DemoData
{
    class Program
    {       
        static BehaviorSubject<int> sub = new BehaviorSubject<int>(666);

        static void Main()
        {
            sub.OnNext(222);

            sub.Subscribe(Console.WriteLine); // 222
        }
    }
}


BehaviorSubject — не может быть пустым, всегда содержит в себе элемент, но только последний.

AsyncSubject


using System;
using System.Reactive.Subjects;

namespace DemoData
{
    class Program
    {       
        static AsyncSubject<int> sub = new AsyncSubject<int>();

        static void Main(string[] args)
        {
            sub.OnNext(222);

            sub.Subscribe(Console.WriteLine);

            sub.OnCompleted(); // Publish 222
        }
    }
}


AsyncSubject — также возвращает только последнее значение, но, в отличие от остальных реализаций, данные будут публиковаться при вызове OnCompleated.

Теперь сравним с Event’ами, вот как выглядел бы код:

using System;
namespace Demo
{
    class Program
    {       
        static event Action<int> Ev; //Declare

        static void Main(string[] args)
        {    
            Ev += Console.WriteLine; //Subscribe

            Ev(234); //Publish
        }
    }
}


Все предельно просто, выполнение будет проходить так же, но в Rx есть ряд преимуществ перед ивентами:

  • Реализация IObservable — это классы, в которых можно делать все, что хочешь. Методы, которые объявляет IObserver, позволяют более корректно управлять последовательностью.
  • Можно сообщить, что последовательность закончилась и тем самым сделать последние нужные действия и отписаться. Есть возможность управлять ошибками.
  • В ивентах чтобы отписаться, нужно сохранить наблюдателя в какой-то переменной и как-то ими управлять. В Rx метод Subscribe возвращает IDisposable и ему можно просто вызвать Dispose(), чтобы отписаться.

var toDispose = sub.Subscribe(Console.WriteLine);
toDispose.Dispose();


  • Rx содержит множество полезные фич, перегрузок методов и расширений
  • LINQ!

LINQ


Изначально LINQ позволял делать запросы к статическим источникам данных. Но так как количество данных растет, а подходы меняются, то нужно к этому приспосабливаться. Rx позволяет выполнять запросы к динамическим последовательностям.

using System.Reactive.Linq;  // позволяет применять LINQ

namespace Demo
{
    class Program
    {       
        static void Main()
        {
            var sequence = Observable.Range(1, 10, Scheduler.Default); // создается последовательность от 1 до 10
            var query = from s in sequence
                    where s % 2 == 0
                    select s; // создается запрос, ничего не выполняется
            sequence.Subscribe(Console.WriteLine); // подписывается наблюдатель (1,2,3,4,5,6,7,8,9,10)
            query.Subscribe(Console.WriteLine); // подписывается наблюдатель (2,4,6,8,10)        
        }
    }
}


В примере сначала создается последовательность, которая предоставляет данные типа int от 1 до 10, затем к ней применяется LINQ-выражение, которое выбирает из последовательности только значения, кратные 2. Таким образом, получается две разных последовательности, на которые можно подписать разных наблюдателей. Это крайне простой пример, но Rx предоставляет очень много методов, которые дают огромную гибкость.

Выводы


Reactive extensions позволяет создавать отдельные модули, которые будут следить за состоянием системы и реагировать на него. Каждая часть системы будет полностью независима, так как она не знает ничего об остальных модулях. Наблюдатели ожидают изменения последовательности, а ей, в свою очередь, все равно, кто наблюдает за ее изменениями. Тем самым достигается связанность модулей. Rx имеет смысл применять для обработки UI-событий, доменных событий, изменений окружающей среды, изменений на сторонних сервисах (RSS, Twitter и т.д.). Rx также предоставляет возможность преобразовывать события в IObservable, что позволяет интегрироваться в систему.

Не стоит применять Rx для того, чтобы преобразовывать статические последовательности в IObservable, это будет пустая трата ресурсов и не принесет никакой выгоды. Также не стоит реализовывать очереди, так как это совершенно разные подходы. Огромным преимуществом является тот факт, что Rx поддерживает LINQ и ничего нового учить не нужно.

Rx — отличный инструмент, который позволяет создавать реактивные системы, но это не означает, что нужно все бросить и начать писать в этом стиле. Главное — всегда использовать серое вещество!

Комментарии (6)


  1. xGromMx
    24.06.2015 12:23
    +1

    Именно поэтому я когда-то стал оформлять такой онлайн справочник-агрегатор ссылок, видео и туториалов xgrommx.github.io/rx-book


  1. rfq
    24.06.2015 14:14
    +1

    «В процессе развития программного обеспечения возникла потребность, чтобы система умела реагировать на множество источников данных, была устойчива и чтобы разные модули не были тесно связаны».

    Можно подумать, в первые 50 лет развития программного обеспечения не было потребности, чтобы системы могли реагировать на множество источников данных и т.п.

    Во всей этой reactive движухе, кроме болтологии, единственная здравая мысль — это мысль о необходимости backpressure, то есть обратной связи для предотвращения переполнения памяти в случае, если поставщик работает быстрее потребителя.


    1. xGromMx
      24.06.2015 14:48

      CRP существовал еще в 60-х)


      1. rfq
        24.06.2015 14:54

        CRP у вас это что?


        1. xGromMx
          24.06.2015 15:17

          Constraint Reactive Programming


  1. JeStoneDev
    24.06.2015 17:09

    По реактивному программированию неплохой доклад был на msdevcon 2015.
    «Реактивное программирование на C# в робототехнике и при создании Kinect-приложений» от Александра Кирсанова.
    Рекомендую посмотреть.