Управление подписками — с первого взгляда несложная тема, которая, однако, вызывает у многих проблемы на фронте. Под катом я собрал собственные наблюдения о том, как грамотно выстроить работу с подписками без утечек памяти, увеличения времени загрузки элементов и колоссальных трат бюджета.

За основу я взял библиотеку RxJs, работающую с фреймворком Angular. Однако любителям остальных фреймворков, возможно, это тоже пригодится.

Почему это важно

Подписка на обозреваемую область, на поток, который способен возвращать какие‑либо данные, привносит большую гибкость и много возможностей по манипуляциям с данными, позволяя в полной мере использовать все возможности асинхронности. Вещь по‑настоящему полезная, гибкая, однако за все возможности необходимо платить. Где‑то более высоким порогом вхождения в определенную технологию, а в каких‑то случаях материально, ведь по‑настоящему полезные вещи очень часто распространяются на платной основе.

В нашем случае возможности полноценно оперировать потоками данных обойдутся нам в цену необходимости их контролировать. Хоть это и звучит странно, ведь все возможности по контролю за потоками нам уже предоставляет необходимая библиотека, а тут придется за контроль платить контролем. Важный момент заключается в том, что мы можем создавать и изменять потоки очень большим количеством способов, однако на фоне всей этой магии часто забываем про их завершение.

По сути, это можно сравнить с подписками на новостные издания, рассылками с новыми скидками, просто с ненужными уведомлениями или же, например, с подписками на какие‑либо платные услуги. В какой‑то момент, какой бы ни был полезный ресурс, на который мы подписались, он перестает приносить нам пользу, и, если от него не отписываться, это может привести к забитому почтовому ящику, где в многообразии рассылок и спама уже не найти нужное сообщение, к ежемесячному списыванию средств со счета.

В нашем же случае, пропуск отписки на поток приведет к утечке памяти, увеличению времени загрузки элементов, в конце концов к потере удобности использования продукта, и потере пользователей.

Казалось бы, всего лишь необходимо в нужный момент отписаться, но все ли так просто?

Когда нужно отписываться, а когда нет

Точно отписываемся в пяти случаях: формы, роутер, рендерер, бесконечные обозреваемые последовательности и ngRx.

1. Формы

Необходимо отписаться от формы и от отдельных контролов, на изменение которых была подписка:

  ngOnInit() {
    this.form = new FormGroup({...});
    this.valueChangesSubscription  = this.form.valueChanges.subscribe(console.log);
    this.statusChangesSubscription = this.form.statusChanges.subscribe(console.log);
  }

  ngOnDestroy() {
    // Тут отписываемся
  }
}

2. Роутер

Если мы доверимся официальной документации, то Angular должен сам отписать нас, но в работе этого не происходит, поэтому стоит сделать это самостоятельно:

export class TestComponent {
  constructor(private route: ActivatedRoute, private router: Router) { }

  ngOnInit() {
    this.route.params.subscribe(console.log);
    this.route.queryParams.subscribe(console.log);
    this.route.fragment.subscribe(console.log);
    this.route.data.subscribe(console.log);
    this.route.url.subscribe(console.log);
    
    this.router.events.subscribe(console.log);
  }

  ngOnDestroy() {
    // Тут отписываемся
  }
}

3. Рендерер

export class TestComponent {
constructor(private renderer: Renderer2, 
            private element : ElementRef) { }

  ngOnInit() {
    this.clickSubscription = this.renderer.listen(this.element.nativeElement, "click", handler);
  }

  ngOnDestroy() {
    // Тут отписываемся
  }
}

4. Бесконечные обозреваемые последовательности

Такие можно создать с помощью оператора interval() или просто слушать какое-либо событие с помощью fromEvent():

export class TestComponent {

  constructor(private element : ElementRef) { }

  interval: Subscription;
  click: Subscription;

  ngOnInit() {
    this.intervalSubscription = Observable.interval(1000).subscribe(console.log);
    this.clickSubscription = Observable.fromEvent(this.element.nativeElement, 'click').subscribe(console.log);
  }

  ngOnDestroy() {
    // Тут отписываемся
  }
}

5. ngRx

От подписок на состояние Store ngRx также необходимо отписываться:

export class TestComponent {

  constructor(private store: Store) { }

  todos: Subscription;

  ngOnInit() {
     this.todosSubscription = this.store.select('todos').subscribe(console.log);  
  }

  ngOnDestroy() {
    // Тут отписываемся
  }
}

Отписка не потребуется при работе с Async pipe, @HostListener и конечными обозреваемыми последовательностями.

1. Async pipe

Тут вся работа выполняется за нас, нам нужно лишь создать поток, который будет возвращать данные:

@Component({
  selector: 'test',
  template: `<todos [todos]="todos$ | async"></todos>`
})
export class TestComponent {

  constructor(private store: Store) { }

  ngOnInit() {
     this.todos$ = this.store.select('todos');
  }
}

2. @HostListener

Когда навесили слушатель с помощью HostListener, отписка также не потребуется:

export class TestDirective {

  @HostListener('click')
  onClick() {
    ....
  }
}

3. Конечные обозреваемые последовательности

Есть последовательности, которые завершатся сами, и не будут употреблять драгоценные ресурсы после этого.

Например: timer (), HTTP:

export class TestComponent {

  constructor(private http: Http) { }

  ngOnInit() {
    Observable.timer(1000).subscribe(console.log);
    this.http.get('http://api.com').subscribe(console.log);
  }
}

Проблема понятна. Решения есть.

Итак, когда надо, а когда не надо мы разобрались. Но теперь становится главный вопрос: а как?

Было бы нелогично давать возможность по созданию, но не дать возможности по уничтожению. Теперь разберемся как отписаться от потока. В первую очередь необходимо понимать, что отписка происходит при уничтожении компонента, то есть, когда он прекращает свою работу в рамках приложения и мы уже не ждем никаких данных от него. Отписываемся в хуке onDestroy

Самый простой способ создать переменную, в которую помещается подписка, а при уничтожении компонента просто от нее отписаться через свойство класса:

@Component()
export class ManualComponent implements OnInit, OnDestroy {
  private subscription = Subscription.EMPTY;

  ngOnInit() {
    this.subscription = interval(1000).subscribe(observer('manual'));
  }

  ngOnDestroy(): void {
    this.subscription.unsubscribe();
  }
}

Довольно неплохой, прозрачный способ. Ушел компонент, отписались от потока. Но ведь компоненты редко умещаются в небольшое количество потоков и логика порядком больше, а значит, если подписок много, то мы каждый раз будем описывать все отписки в onDestroy. Это некрасиво, да и сил много уйдет чтобы это делать.

Идем дальше.

Создадим массив, куда просто будем пушить новую подписку, а при уничтожении компонента удалим все скопом:

@Component({…})
export class ManualComponent implements OnInit, OnDestroy {
    subscription1$: Subscription; 
    subscription2$: Subscription; 
    subscriptions: Subscription[] = [];
    ngOnInit () {
        this.subscription1$ = Rx.Observable.interval(1000).subscribe(x => console.log("From interval 1000" x));
        this.subscription2$ = Rx.Observable.interval(400).subscribe(x => console.log("From interval 400" x));
        this.subscriptions.push(this.subscription1$);
        this.subscriptions.push(this.subscription2$);
    }    
    ngOnDestroy() {
        this.subscriptions.forEach((subscription) => subscription.unsubscribe());
    }
}

Уже намного лучше, и код смотрится намного чище, но мы можем зайти дальше с этим методом:

@Component({…})
export class ManualComponent implements OnInit, OnDestroy {
    subscription = new Subscription(); 
    ngOnInit () {
         this.subscription.add(interval(900).subscribe(observer('add 1')));
    	 this.subscription.add(interval(1200).subscribe(observer('add 2')));
    }    
    ngOnDestroy() {
        this.subscription.unsubscribe();
    }
}

Избавились от лишних переменных, циклов, и всего остального, что выглядит очень избыточно. Теперь не так уж и лень все это описать и можно быть уверенным, что память никуда не будет испаряться.

Однако мы на этом не остановимся, это далеко не предел. Закрыть некоторые потребности эти методы могут, однако мы пойдем по более элегантному пути и рассмотрим способы поинтереснее.

Операторы библиотеки RxJs, которые можно использовать в целях завершения потока, когда нам это необходимо:

Методы take(n), first, takeWhile, takeUntil

Начнем с take(n), и вначале посмотрим, как его использовать:

сonst intervalCount = interval(1000);
const takeFive = intervalCount.pipe(take(5));
takeFive.subscribe(x => console.log(x));

Тут наглядно видно, что в оператор передается числовое значение, дающее ему понять, сколько значений пропустить через себя, прежде чем завершить поток. Достаточно удобно, ведь мы уже не используем хук onDestroy.

Однако с этим не все так прозрачно как кажется на первый взгляд, давайте посмотрим, из чего состоит этот оператор внутри:

export function take<T>(count: number): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) => {
    if (count === 0) {
      return empty();
    } else {
      return source.lift(new TakeOperator(count));
    }
  };
}

Тут мы видим переменную count, которая описывает количество значений, которые необходимо пропустить. Далее если, условие выполнено, и необходимое количество значений прошло через оператор, наш поток закрывается.

А теперь посмотрим на вот такой вот пример:

from([1, 2, 3])
    .pipe(
        filter(value => false),
        take(1)
    ).subscribe(console.log) 

take(n) всегда ожидает, что через него пройдет значение, и после первого он произведет отписку и завершит процесс. Однако в данном случае значение никогда не прилетит в оператор, и получится утечка памяти. Поэтому всегда необходимо следить, чтобы перед оператором не стояло нигде фильтра или редюсера, которые возвращают false или пустое значение.

Метод takeWhile(predicate)

Пользоваться им не сложно, и если у нас есть условие, которое следит за продолжением потока, то данный оператор нам идеально подойдет. Причем выражение, которое проходит через оператор, должно возвращать простой булевский тип:

const clicks = fromEvent<PointerEvent>(document, 'click');
const result = clicks.pipe(takeWhile(ev => ev.clientX > 200));
result.subscribe(x => console.log(x));

Теперь заглянем как он работает

protected _next(value: T): void {
  const destination = this.destination;
  let result: boolean;
  try {
    result = this.predicate(value, this.index++);
  } catch (err) {
    destination.error(err);
    return;
  }
  this.nextOrComplete(value, result);
}

Пока предикат выполняется, то есть возвращает true, события походят через оператор и все отлично, но как только предикат не выполняется и прилетает значение, то подписка будет закрыта.

И опять не все так гладко. Давайте создадим слушателя события клика мыши, и добавим в него как раз-таки наш оператор, который будет ссылаться на переменную isDestroyed, которая будет изменять свое значение в момент уничтожения компонента.

isDestroyed = false;

fromEvent(this.element.nativeElement, 'click')
     .pipe(
         takeWhile(() => this.isDestroyed)
     ).subscribe(console.log)

ngOnDestroy(){
  this.isDestroyed = true;
}

То есть, при уничтожении компонента, переменная меняется, и, по идее, подписка должна закрываться. Однако, если посмотреть внимательнее, то при уничтожении компонента, больше никто не слушает событие клика по нему, а значит, никакое значение пролетать не будет через оператор, и он не закроет подписку. Отсюда утечка.

Метод first()

Данный метод применяется точно так же, как и два предыдущих. Суть его в том, чтобы без никаких условий взять значение один раз и закрыть подписку. Казалось бы, тут точно ничего не может пойти не так, однако есть одно НО.

Посмотрим на исходник:

protected _complete(): void{
  const destination = this.destination;
  if(!this.hasCompleted && typeof this.defaultValue !== 'undefined') {
    destination.next(this.defaultValue);
    destination.complete();
  } else if (!this.hasCompleted) {
    destination.error(new EmptyError())
  }
}

Тут видно, что в случае, если в оператор зайдет значение undefined, то все упадет с ошибкой. Соответственно, необходимо не допустить возможности попадания undefined в оператор.

Метод takeUntil()

Данный оператор позволяет проходить значениям до тех пор, пока в уведомляющий поток не прокинет значение, которое не позволит ему выполнить цепочку дальше. То есть в параметрах у нас Observable, который и уведомляет оператор о прекращении действия подписки.  Стоит отметить, что тут мы не полезем под капот проверять подводные камни, так как это честный оператор и делает то, что ему сказано делать.

const source = interval(1000);
const clicks = fromEvent(document, 'click');
const result = source.pipe(takeUntil(clicks));
result.subscribe(x => console.log(x));

А вот и наиболее конкретный пример, взятый прямо из рабочего проекта. Как и было сказано в начале статьи, от контролов необходимо отписываться. В этом коде имеется подписка, слушающая изменение значений поля формы. Однако сама форма находится в компоненте, и когда он прекращает свою работу на странице, нам необходимо избавляться от всего мусора, который впоследствии навредит проекту. Для этих целей объявим ReplaySubject, способный хранить лишь одно булевское значение new ReplaySubject(1).

А в хуке onDestroy просто скажем ему выбросить значение, которое в последствии и приведет к отписке от элемента формы. Далее такая конструкция остается неизменной, и не портит код, ведь дальше попросту в каждую новую подписку стоит добавлять takeUntil(this.componentDestroyed$), и все будет работать как и ожидалось:

componentDestroyed$: ReplaySubject<boolean> = new ReplaySubject(1);

this.form.get('approvalDate').valueChanges
  .pipe(
    takeUntil(this.componentDestroyed$)
  )
  .subscribe(v => {
    console.log(v);
  })

ngOnDestroy() {
  this.componentDestroyed$.next(true)
  this.componentDestroyed$.complete()
}

Все рассмотренные операторы выполняют свою задачу, выполняют её, как и описано в документации, однако в любом из них важен порядок использования.

Async Pipe

Конечно же нельзя не упомянуть об этой опции в Angular. Самый простой способ отписаться — это не создавать руками подписку. Для случаев, когда данные нам необходимы, чтобы их отрисовать и минимально взаимодействовать с ними, можно вообще забыть про подписки.

Сразу перейдем к примерам:

Начнем с простого, создадим в компоненте стрим, который с задержкой выбросит строку с именем

export class AppComponent {
name$ = of(‘Dan’).pipe(delay(1500));
}

А в шаблоне используем эту переменную с пайпом async

<p>{{ name$ | async }}</p>

Как результат, у нас в шаблоне появляется строка, которую выдает наша переменная.

Однако это вряд ли закроет все необходимости по работе с данными. Продолжим.

Если данные не придут, то в шаблоне нечего будет отображать, и это повлечет за собой поведение, которое мы едва ли ожидаем. Добавим проверку

<p *ngIf=” name$ | async”>
{{ name$ | async }}
</p>

Вроде все улажено, но нет, async используется дважды, а значит, и каждый раз происходит задержка перед выдачей значения. Это тоже не подходит.

Можно результат выражения name$ | async вынести в переменную в шаблоне

<p *ngIf="name$ | async as name">
  {{ name }}
</p>

Или же если необходимо отобразить целый массив со сложными данными, то мы можем сделать это так:

<p *ngFor="let user of user$ | async">
  <span>User name is {{ user.name }}</span>
  <span> User surname is {{ user.surname }}</span>
</p>

Теперь сложные данные отображаются как надо, и все это с минимумом кода, а самое главное без явных подписок. async сделает все за вас.

Решения из сторонних библиотек

Продолжим тему упрощения написания кода. Мы можем воспользоваться декораторами для решения этой проблемы.

Они позволяют написать один раз код, который проконтролирует подписки, и завершит их, когда это понадобится. Один раз напишем код и будем подгружать его с помощью метаданных.

Ну а что, если обо всем этом уже позаботились и написали эти декораторы за нас?

И можно убрать этап написания своего кода, и взять готовое решение, способное выполнять задачу с малейшими трудозатратами, ведь если бы не сторонние библиотеки, то пришлось бы описывать что‑то вроде такого:

export function unsubscriber(constructor: any) {
    const originalOnDestroy = constructor.prototype.ngOnDestroy;

    constructor.prototype.componentDestroy = function () {
        this._takeUntilDestroy$ = this._takeUntilDestroy$ || new Subject();
        return this._takeUntilDestroy$.asObservable();
    }

    constructor.prototype.ngOnDestroy = function() {
        if ( this._takeUntilDestroy$ ) {
            this.takeUntilDestroy$.next(true)
            this.takeUntilDestroy$.complete();
        }
        if ( originalOnDestroy && typeof originalOnDestroy === "function" ) {
            originalOnDestroy.apply( this, arguments );
        }
    }
}

Писать функцию, которая будет отписываться от потоков на основе takeUntil, подменять внутри хук компонента, при его выполнении вызывать методы next и complete, чтобы он завершил выполнение потока, а потом еще создавать свойство в компоненте, благодаря которому вся эта магия будет работать.

Этого делать не придется. Давайте посмотрим какую альтернативу этому можно использовать у себя в проекте.

Библиотека UntilDestroyed

Данное решение используется и в некоторых проектах, с которыми работаю я, и оно довольно хорошо себя зарекомендовало.

Всё, что нужно — это установить библиотеку и добавить декоратор @UntilDestroy(), теперь библиотека готова к использованию.

this.appMode$
    .pipe(
        untilDestroyed(this)
    )
    .subscribe((mode) => {
      this.appMode = mode;
    })

Все, что нужно сделать дальше - это добавлять встроенный оператор для RxJs, и при уничтожении компонента будет происходить отписка.

Его так же можно использовать и с другими операторами, например, с хранилищем NgRx и его оператором select:

appMode$ = this.store.pipe(
    untilDestroyed(this),
    select(ApplicationsSelectors.selectAppMode)
);

Этот метод отлично подойдет для решения проблемы с управлением подписками, оно сочетается с любыми технологиями, применимыми к RxJs, и не вредит их работе, при этом выполняя свои задачи.

Выводы

Итак, что глобально полезного можно вынести из данного материала?

Предлагаю тут отбросить большинство примеров исходного кода, а также код, приведенный мной в качестве примера и поговорить о практической пользе, удобстве в использовании.

Главное, с чего нужно начать изучение оптимизации работы с потоками — это выяснить, в каких случаях вообще необходимо об этом думать, ведь иногда все произойдет само собой и не создаст проблем.

Во‑вторых, необходимо понимать, что некоторый функционал по работе с потоками, который нам предоставляет библиотека также не заставит нас беспокоится о последствиях халатности, к такому функционалу относится, например, Async Pipe. И если использовать его по полной, то в большинстве случаев даже не понадобится лезть в компонент и совершать подписку руками.

В‑третьих, если вам дается какой‑либо функционал, который способен своим использованием вызвать проблемы, то вам точно предоставят и инструментарий, чтобы этого избежать. Сюда можно отнести, как и отписку от общего объекта, куда заносятся все текущие подписки в компоненте, так и встроенные операторы библиотеки для управления ими. Однако стоит быть внимательным, ведь все они разные и способны создать еще больше проблем.

И в конце, следует изучить решения из сторонних библиотек, ведь, как показал этот материал, данный вариант является самым привлекательным в использовании. Речь идет об UntilDestroyed.

Надеюсь, что этот материал был для вас полезным, и теперь незавершенные потоки и потери памяти обойдут стороной ваши проекты.

Комментарии (2)


  1. ubx7b8
    00.00.0000 00:00

    Ещё бы написать почему именно ReplaySubject, а не просто Subject.
    И что в случае использования takeUntil рекомендуется ставить его последним оператором внутри pipe.


    1. jQwery Автор
      00.00.0000 00:00

      ReplaySubject используется по той причине, что он не зависимо от того, где была осуществлена на него подписка отдаст значение, что гарантирует отписку в любом случае.

      А по поводу takeUntil полностью согласен, так как авершенный поток может быть переключен на другой, который не завершится, и в таких местах необходимо в конце ставить жирную точку в виде takeUntil