var nodes = arrayThatLeaks( linklist )
.leakMap( xhrLoad )
.leakFilter( responseNotEmpty )
.leakMap( insertInDocument );
// или теперь ещё и так
Promise.resolve(
arrayThatLeaks( linklist )
.leakMap( xhrLoad )
.leakFilter( responseNotEmpty )
.leakMap( makeDomNode )
)
.then( insertNodesInDocument );
Подобная запись встретилась как-то в комментариях, и то сподвигло на эксперимент – упражнение для мозгов. Асинхронные операции над массивами в форме записи цепочкой.
Определенно, что здесь представлен очередной велосипед, и кто-то сочтет за невежество сам факт появления этого кода. Но как упомянуто, это эксперимент – попытка очередного кодера «оседлать» однопоточную асинхронность javascript. Кто заглянет в код, тот будет обескуражен его качеством, а имена функций и переменных могут запутать чтеца в попытках осмысления. Но есть надежда, что кому-то этот код сможет пригодиться и не пропадет даром.
Естественно, что обработчики методов должны быть написаны с учетом асинхронности происходящего. Тут сделано по-простому:
function responseNotEmpty( r, v ) {
if( /* что-то c v*/ ) r( true );
else r( false );
}
Первым аргументом в обработчике идет служебная процедура – приёмник результата, второй аргумент – элемент массива. Результирующий массив nodes будет заполняться значениями по мере их прохождения через цепочку процедур. Порядок элементов в результирующем массиве зависит от того, какой из элементов обработается раньше, то есть, значения начального массива теряют свой индекс, даже на map().
Была мысль обратиться к современным подходам Обещаний и Асинхронных ожиданий, но они были отброшены по необъяснимым причинам. В итоге получен код, состоящий из потенциальных утечек в половине функций. А его структура есть чудовищное воплощение callback-hell – 5 ступеней на сотню строк.
Обдумано два способа
Сначала хотелось создавать большой объект с внутренним контролем состояния и исполнения всей цепочки асинхронных обработчиков. То представлялось в виде многомерного массива, целой карты с результатами и флагами, и микрофункциями, и флагами, и счетчиками, и флагами. Это было хорошо для одного метода map(), где длина массива не меняется. Но метод фильтрации сделает часть карты ненужной. Пусть даже интерпретатор и сделает ненужные ссылки фантомными для физической памяти, но это же надо постараться в коде, чтобы эти ссылки не трогать. Плюс реализация параллельных ветвей методов заставляет разращивать карту в нескольких измерениях почти бесконтрольно.
var a1 = arrayThatLeaks( [/**/] );
a1.leakMap(/**/).leakFilter(/**/).leakMap(/**/); // ветка 1
a1.leakFilter(/**/).leakReduce(/**/); // ветка 2
Второй и текущий вариант предполагает создавать отдельный набор контрольных данных для каждого вызова метода массива. Здесь упомянутые трудности схлопываются в переменную счетчик (почти). А для связки методов в цепочку введена процедура обмена takeforward(), вся «магия» местной асинхронщины. Эта процедура внутри замыкания одного метода получает внутренние процедуры из замыкания следующего метода. Что конкретно передавать выяснялось в процессе написания, оказалось, что достаточно процедуры запуска обработчика и синхронизирующей процедуры контроля счетчиков.
Детально
Первым делом, как полагается, пришлось от конкретики разных методов выделить общие вспомогательные процедурки в функцию chain(). Здесь оборачиваются обработчики и приёмники из методов, и происходит связывание с предыдущим методом через аргумент giveback(), которым обычно является предыдущая процедура takeforward(). Из chain() выходит результирующий массив, расширенный «утечковыми» методами. Расширяется массив в функции expa(), где каждый метод есть результат работы chain(). При этом, для создания метода в chain() передаются синхронизатор, приёмник и предобработчик метода, которые оборачиваются и получают немного ссылок из замыкания chain().
Такая схема работает для простых по смыслу методов map() и filter(). А вот для reduce() нужно создавать внешние хранилища промежуточных значений, что порождает анонимку над chain(). Возможно, в дальнейшем, когда методов станет больше, chain() усложнится для лучшего описания методов с сильносвязанными значениями, типа сортировки.
Процедуры, определяющие логику метода:
Синхронизатор метода запускается из внутренней процедуры контроля счетчиков и получает счетчики своего и предыдущего замыкания chain(), флаг завершения предыдущейго метода, ссылку на запуск обработчика своего метода, ссылку на запуск следующего метода и ссылку на массив результатов. Для простых методов здесь вычисляется только завершенность исполнения метода. Для сложных дополнительно определяется очередность запуска обработчиков с подстановкой всех необходимых значений, поэтому в синхронизатор приходят ссылки на запуск. Каждый раз, когда синхронизируется один метод, синхронизируются все последующие – это позволяет отрабатывать логику методов наперед.
Приёмник результатов вставляется аргументом во внешний обработчик (аргумент метода) и получает оттуда значение. Каждый приёмник в обертке связывается с конкретным индексом, может отправить какое-нибудь значение по цепочке дальше и запускает синхронизацию. Запуск цепочки здесь необязателен потому, что далеко не всегда полученному значению необходимо продолжать обработку. И индекс можно вставить любой, лишь бы без повторов.
Предобработчик метода возник лишь для реализации логики свертки, конечно же, он будет полезен и для каких-нибудь других методов. Суть предобработчика в перехвате значений из цепочки обработки. Для reduce() это важно, так как обработчик требует два значения, а из цепочки приходит одно, а что ещё важнее, логика reduce() требует вовсе остановить цепочку. К тому же, предобработчик может запустить синхронизацию – но это скорее костыль, чтобы не распылять по процедурам ссылки на запуск.
В итоге
Таким образом, цепочка методов разворачивается и уплотняется скрытыми процедурами для обеспечения контролируемого асинхронного исполнения (а как иначе?), и по ней обрабатываются значения массива, где-то они проходят без ожидания остальных, где-то ждут, где-то пропадают. Утечки здесь закономерны в силу цепочечности замыканий, и всякая кривая ссылка будет держать всю цепочку. Есть ещё концептуальный вопрос поведения в случаях опустошения массива, выбрасывать ошибку или сохранять в результаты.
UPD: Теперь можно совмещать с обещаниями. Для этого появилась проверка в обертке приёмника данных и внешний метод .then(), который лишь прикидывается обещанием.
Комментарии (9)
VasilioRuzanni
15.03.2017 13:52А какова должна быть практическая польза от этого дела. Еще не совсем понятно само определение «утечек», что это?
P.S. Что-то мне подсказывает, что на самом деле вам нужны Observables — действительно мощнецкий инструмент для борьбы с асинхронностью. Посмотрите в сторону RxJS/xstream/most.js. Или я что-то упустил? :)hahenty
15.03.2017 15:43Пользы пока пренебрежимо мало, мой код работает только с массивами. А «утечка» – стандартный эффект неспособности сборщика мусора вытравить ненужное.
Как мне показалось, rxjs, xstream, stream работают с единичными значениями. Для обработки массива по соответствующим массиву принципам нужно дописывать что-то внутри или снаружи этих библиотек. А mostjs уж очень «функциональнен», сходу толком не разобрал, но опять же – не массивы.
В целом, обсерверы – те же события, а события – это колбэки, назначаемые из другого места, которое считают правильнее.
hithimVasilioRuzanni
15.03.2017 16:14Они работают с единичными значениями «over time» — то есть, с течением времени. Вот и асинхронность. Никто не мешает сделать поток из массива и разом пустить его через цепочку операторов стрима. Получится своего рода «асинхронный массив» (хотя и не совсем, но результат, как мне кажется, будет именно тот, что нужен).
Тут, собственно, весь вопрос сводится к тому, что именно вы хотите делать с массивами с практической точки зрения?hahenty
15.03.2017 17:08Свернуть через удаленный ресурс, например.
Конечно же, методов понадобится побольше, бэкенд нужно подготовить для такого и обеспечить транспортировку данных. Важно же то, что завязка на массив – пусть и малая часть всей логики – может быть учтена, а не создана вновь.
lolmaus
Код у вас получился весьма сложночитаемый, я поленился в него вникать, извините.
Пример использования мне показался притянут за уши. Было бы интересно вычисления выполнять в WebWorker'е, тогда асинхронность будет оправдана.
Но зачем все это, если есть промисы? Пример с использованием каррированных функций:
```js
Promise
.resolve( [ 1.1, 5.4, 8.3, 2.7, 4.9, 4.4, 7] )
.then(mapAsync(plus(3)))
.then(filterAsync(greater(3)))
/*… */
```
hahenty
Промисы – это дело предпочтений и знаний, мои остановились на колбэках. Про webworker-ы говорят, что они не имеют доступа к DOM, а так хочется.
Вероятно, когда-нибудь переделаю свой код, чтобы он мог жевать промисы, рассовывать цепочки по worker-ам, читать мысли… но пока я лишь доказал для себя свою маленькую идею.
А пример на числах для упрощенного восприятия вычислений в консоли.
hithim
Посмотрите Rx.js и нативные Stream в node.js