Всем привет.

Это моя первая публикация на Хабре, я взволнован и вообще; и в качестве дебюта хочу представить вам свой пет-проект.

Краткая предыстория: я давно программирую на C++, C и Obj-C, и всегда хотел иметь под рукой какую-нибудь тулу, чтоб свободно и непринужденно заниматься асинхронным программингом. Помню, когда-то строил я некую аппликашку для OS X. По ходу пьесы мне потребовалось проводить поэтапную обработку потока данных - берем пакет данных, обрабатываем на первом этапе, передаем на второй, а пока что хватаем следующий пакет, пропускаем через первый этап и так далее. Я, разумеется, ухватился в первую очередь за семейство NSOperation, но оно оказалось немного не очень универсальным. А хотелось универсальности, чтоб разные типы данных и много этапов, и чтоб загружались все наличные ядра, и чтоб интерфейс был простенький - пришел, бросил кучу данных, подождал, цапнул результат, ушел. Ну ладно, подумал я, пойду подергаю семейство функций dispatch-async из GCD, тем более что NSOperation просто служит им оберткой. Но и там не нашел я удовлетворенья своим хотеньям. Ну что, придется строить велосипед - подумал я и с этого все нача...

Так я пришел к пакету ASFK (Asynchronous Flow Control Kit). Это программный пакет на Obj-C, который реализует известные мне паттерны асинхронного поведения. Ну не все конечно, но надо ставить себе цели высокие и далекие.

Начать решил с двух вещей, которые понадобились мне в ходе разработки других проектов.

Намбер 1: Конвейер (Pipeline)

Идея здесь простая: есть набор данных, которые необходимо прогнать через несколько этапов обработки. Критически важно чтобы каждый кусок данных прошел через все этапы в в четко определенном порядке; крайне важно, чтобы каждый этап принимал на входе результаты предыдущего этапа и передавал свой результат следующему; и очень важно чтоб они проходили через этапы в FIFO-порядке. Попутно хотелось, чтобы все этапы могли выполняться параллельно, то есть каждый на своем процессоре. И еще очень хотелось иметь возможность их останавливать и паузить.

Иллюстрация: представим себе, что мы хотим прочитать из файла длинную строку; поменять в ней все буквы 'а' на 'б'; затем все буквы 'б' заменить на пробелы; затем вычислить для этой строки контрольную сумму; и, наконец, забросить ее в другой файл; вернуться в начало и повторить все операции над следующей строкой. Очевидно, что на каждом этапе работы над строкой можно параллельно работать над следующей.

Для этого существует обьект Pipeline которые все эти услуги предоставляет.

Работает он следующим образом: сначала создаем пайплайн:

ASFKPipelinePar* pipeline0=[[ASFKPipelinePar alloc]initWithName:@"Pipeline0"];

впихиваем в него блоки кода, которые будут обрабатывать наши данные; 

[pipeline0 addRoutine: ^id(id<ASFKControlCallback> controlBlock, id data) {

        //Do whatever you need

        return nil;

    }];

Задаем финальный блок который соберет все результаты воедино:

[pipeline0 setSummary:^id(id<ASFKControlCallback> controlBlock,NSDictionary* stats, id data) {

           NSLog(@“We done !!”);

           return nil;

            }];

Создаем сессию, которая будет использовать это блоки:

[pipeline0 createSession:nil sessionId:@“session1”];

затем, кидаем в нее набор данных 

[pipeline0 castArray:@[@(1),@(2),@(3)] session:@“session1” exParam:nil]

и забываем про них до тех пор, пока все не пройдут обработку. 

Как оно устроено внутре?

Внутре основной единицей исполнения является сессия. Это объект который содержит копии процедур и очереди с данными. Сессий может быть неограниченное количество.

Планировщик умеет циклически вызывать каждую сессию и заставить ее выполнить очередную процедуру.

Сессии организуются внутри контейнера, который и называется конвеер (pipeline). Возможное количество конвейеров неограничено. Есть и нюансы: например, сессии могут быть постоянными и временными. Постоянные сессии живут до тех пор пока их не прибьют, и в них можно постоянно добавлять новые порции данных; временные самоуничтожаются после прогона одного набора. Также сессию можно отменить, можно ставить на паузу и снимать с паузы, следить за прогрессом исполнения.

Сложностей здесь было порядочно. Требовалось создать механизм, который бы умел распихать исполнение сессий по процессорам более-менее равномерно, чтобы они все выполнялись, в конце концов. Дополнительная сложность была в создании механизма для отмены сессий и паузы/продолжения; и необходимо было добиться правильного порядка выполнения, несмотря на то, что разные этапы могли выполняться на разных процессорах независимо друг от друга.

В самом начале работы я еще точно не знал, что получится, но некоторые ключевые моменты были ясны: хотелось более менее логичный и несложный API; и хотелось заставить работать все наличные процессоры. Насколько этих целей удалось добиться? Более-менее удалось, я считаю.

Еще две фичи (в разработке): конкурентный мап и композиция. Композиция работает подобно пайплайну: создаем объект композиции, создаем сессию, бросаем в нее данные и ждем результатов. Результат будет состоять из последовательного применения функций к каждому объекту данных. Но при этом все каждый объект данных может проходить через композицию параллельно с другими. Здесь гарантируется, что функции будут применены в строго определенном порядке. То есть, если есть данные d1, d2 ... dn и функции f1, f2…fm, то мы увидим параллельное исполнение такое: fm(… f2(f1(d1))) | fm(….(f2(f1(d2))))... . Мап работает схожим образом, но с нюансом: каждый объект данных проходит параллельно через все наличные функции. Порядок выполнения функций неопределен. А здесь параллельное исполнение примет другой вид: f1(d1) | f2(d1) ... | fm(d1) ; f1(d2) | f2(d2) ... |  fm(d2) итд;

Планы на будущее - продолжить тщательное тестирование, добавить некоторые новые фичи и паттерны использования.

Второй паттерн: Mailbox

Собственно, это альтернатива механизму нотификаций в OS X. Я некоторое время мучился с ними, закрывая глаза на которые недостатки, как-то: необходимость подписываться на конкретный тип сообщения, невозможность доставки сообщения в те уголки, которые вне eventloop-а итп. Но в конце концов я осознал что с закрытыми глазами работать неудобно и принялся проектировать альтернативу...

 В поисках более сложного удовольствия я пришел к почтовому ящику. Идея прозрачна: юзер получает FIFO контейнер, куда любой другой юзер может кидать сообщения, а владелец будет их читать. Это главный принцип. Кроме того можно создать групповые ящики - владелец один, но он позволяет посторонним читать сообщения. 

Здесь я тоже хотел выжать максимум. Поэтому загрузился несколькими целями: несложный и понятный API; богатый функционал, закрывающий как можно больше возможных сценариев использования; Еще одна сложность заключалась в моем нежелании жестко ограничивать количество контейнеров и сообщений, не заставляя при этом операционную систему давиться. Но при этом использовать наличные вычислительные мощности как можно полнее.  Еще важным соображением было чтоб API как можно меньше тормозил систему. Из-за этого пришлось много работы перевести в асинхронный режим.

Звучит амбициозно, знаю. 

Вырулилось примерно следующее: во-первых, сообщения могут быть посланы как асинхронно, так и синхронно - в первом случае, сообщение отправляется, пославший тред продолжает работу; во-втором случае, посылающий тред останавливается до тех пор, пока сообщение не прочтут. Есть и обратная возможность - читающий тред может заснуть, пока не придет сообщение.

Во-вторых есть возможность фильтрования сообщений и установления верхнего/нижнего порога для размера очереди - если очередь достигла максимума, то новые сообщения могут заменить старые или быть отвергнутыми. А если размер меньше нижнего порога, то из такой очереди нельзя будет читать, пока не будет достигнут минимум. Кроме того, сообщения могут самоуничтожаться, если их хорошо об этом попросить.

В-третьих, есть система токенов. Мне хотелось создать систему авторизации, которая бы позволяла выполнять операции только тем пользователям, которые заработали на это право. Мне не хотелось для каждой операции создавать список разрешенных пользователей (есть и другие способы конечно), поэтому я решил использовать систему токенов.

Токены характеризуются 2 параметрами: Приватный/Групповой/Мастер; и ролями - например, чтение, запись, бродкаст итп.

И если система сконфигурирована на использование токенов, то операции требуют в качестве параметра правильный токен.

Как ни странно, с токенами я заморочился тоже. Токены одноразовые - они могут потерять валидность через заданный промежуток времени, или через заданной количество использований. Впрочем, другие условия для инвалидации тоже могут быть определены. А потеряв валидность, они становятся бесполезными, ибо восстановить ее невозможно.

Сами контейнеры тоже непростые. Они могут существовать заданное время, а потом самоуничтожаться. Или уничтожать все сообщения, которые живут слишком долго. Или выбрасывать юзеров, по истечении определенного времени.

Вот например как выглядит базовый API

ASFKMailbox* mail=[ASFKMailbox sharedManager];

//Создаем групповой ящик (просто для примера)

id gr0=[ mail createGroup:@“group1” withProperties:nil secret:nil];

//Создаем персональный ящик (просто для примера)

[mail createMailbox:@“user1” secret:nil withProperties:nil];

//Забрасываем сообщение в групповой ящик.

id msgid=[mail cast:@“msg1” forGroup:@“group1” withProperties:nil secret:nil];

//Добавляем пользователя в группу; он ею не владеет, но может писать и читать сообщения.

[mail addUser:@“user1" toGroup:@“group1" withProperties:nil secret:nil]

//Забрасываем сообщение в персональный ящик.

[mail cast:@“msg2” forMailbox:@“user1” withProperties:nil secret:nil];

//Тут мы читаем 2 самых свежих сообщения, пропуская самое первое (нумерация с нуля). Результатом должен быть пустой массив:

NSArray* a0=[mail readLatestMsg:NSMakeRange(1, 2) fromMailbox:@“user1” withSecret:nil]; 

NSArray* a1=[mail readLatestMsg:NSMakeRange(1, 2) fromGroup:@“group1” forUser:@“user1” withSecret:nil];

А какие еще фичи есть?

Вообще-то много.

Из ключевых я бы отметил: бродкаст и мультикаст; блокирующий постинг и блокирующее чтение; самоуничтожение контейнеров и сообщений.

И еще управление размером очередей. Бывает так что нужно следить чтоб очередь не раздувалась. Для этого я определил для очереди верхний и нижний предел: если нижний предел выставлен, то из очереди нельзя читать, пока не наберется минимум сообщений. С верхним пределом немного сложнее. Понятно, что если размер очереди достиг максимума, то его нельзя увеличивать. Но что делать с приходящими сообщениями? в общем, я реализовал несколько стратегий решения этой проблемы. Теперь можно либо отбрасывать приходящие сообщения; либо выбрасывать самое последнее и вставлять вместо него новое; или же выбрасывать самое старое из начала и добавлять новое в конец. А можно просто определить собственный алгоритм для регуляции размера.

В планах также операции над множествами - например разослать сообщения юзерам состоящим одновременно в 2 группах или состоящим в одной, но не в другой; модерация контейнеров - например, запретить конкретному юзеру читать или писать сообщения. 

Планируются также отложенные сообщения и отмена посланных сообщений; думаю добавить какую-нибудь обратную связь, чтобы регулировать нагрузку со стороны посылки.

Ну и конечно надо продолжить тестирование.

#3

Еще одна вещь, которой я уделил меньше внимания статье - FIFO очереди.
Их несколько видов, некоторые позволяют фильтрацию, ограничение по размерам и еще кое-что.

С кодом можно ознакомиться здесь.

Конечно, можно спросить: а на кой надо было так заморачиваться? Ну, мне было интересно выжать из идеи максимум. Так чтобы в конце концов прийти к состоянию, когда уже ни убавить, ни прибавить. Похоже, что я все еще не там. Но останавливаться не собираюсь. 

Что еще следует добавить?

Есть много идей, но всему свое время. Например, в планах сделать портинг на С++. Здесь правда неясно, насколько это может быть востребовано.

А что бы вы предложили?

Стоит портировать на C++?

Комментарии (4)


  1. dyadyaSerezha
    27.09.2022 20:03
    +1

    А что бы вы предложили?

    Я ничего не понимаю в iOS, но почему не портировать/перейти на Swift или ещё что более современное?


    1. varton86
      27.09.2022 20:35
      +1

      Objective-C прикольный) А еще компилируется намного быстрее Swift'а, особенно на последних версиях Xcode.


    1. tri_tuza_v_karmane Автор
      27.09.2022 21:27
      +1

      Не хочу распыляться.
      В конце концов можно использовать Obj-C из Свифта.


  1. V1tol
    27.09.2022 21:23

    Чем-то отдалённо напомнило https://github.com/BoltsFramework/Bolts-ObjC