«Если затраты на разработку архитектуры кажутся Вам чрезмерными, подумайте, во сколько Вам может обойтись неправильная архитектура»


— не могу точно вспомнить источник

Когда то, «давным-давно, в одной далекой галактике», я приобрел замечательную книгу Чарльза Уэзерелла «Этюды для программистов», в предисловии к которой автор обосновывал необходимость изучения учебных примеров и задач перед тем, как начать самостоятельное программирование. Настоятельно рекомендую данную книгу найти, предисловие прочитать (и не останавливаясь на этом, прочитать оставшуюся часть и решить приведенные в ней задачи), поскольку лучше автора обосновать необходимость подобной практики я не смогу. Даже если Вы последуете моей рекомендации, и получите множество знаний и практических навыков при чтении упомянутой книги, можно будет вернуться и дочитать данный пост, поскольку он посвящен несколько иным вопросам. А если Вы моим рекомендациям не последуете, то тем более следует войти под кат.

Не так давно в посте, в котором я ругал высказывал свое мнение об одной отечественной ОСРВ, я упомянул, что реализацию кольцевого буфера в известной (и в определенных аспектах cовершенно замечательной) библиотеке mcucpp нельзя считать идеальной. Постараюсь объяснить свою точку зрения и представить идеальную (насколько это возможно в реальном мире) реализацию. Примечание — предлагаемый Вашему вниманию текст пролежал в «неоконченном» довольно таки долго, а тут подвернулся такой удобный случай.

Продолжаем разработку библиотеки для работы с периферийным устройством, и у нас на очереди управление памятью и буферизация (да, мы все еще продолжаем подготовительные операции, но без них никак). Откуда появляется необходимость организации буферов и что это за зверь такой? Дело в том, что значительная часть периферии имеет ограниченное быстродействие и процесс передачи, будучи запущен тем или иным способом, занимает определенное, и порою весьма значительное по сравнению с созданием очередной порции информации для передачи, время. Разумеется, что до истечения этого времени очередная передача не может быть выполнена и, соответственно, не может быть запущена.

Мы имеем классический случай пары «writer-reader» с различными скоростями работы. Решить эту проблему в общем виде просто невозможно, поскольку «при сколь угодно малом, но не нулевом, превышении потока запросов над потоком обслуживания размер очереди стремится к бесконечности», а бесконечность принципиальна не реализуема. Но частный случай задачи, когда мы имеем локальные всплески запросов, но в среднем поток обслуживания способен справиться с нагрузкой, буферная память достаточной емкости решить позволяет. Обратим внимание на словосочетание «достаточной емкости», мы позже научимся ее рассчитывать, пока нам достаточно того факта, что это принципиально возможно.

Является ли наличие буферной памяти абсолютным требованием — конечно же, нет. Для передаваемой информации можно применять блокирующую запись, а вот с принимаемой несколько хуже, ее придется где-то складывать до обработки, если не принять соответствующих мер в протокола верхнего уровня (магическое выражение xon/xoff родилось не на пустом месте), что не всегда реализуемо и, в любом случае, обычно приводит к существенному ограничению скорости передачи. Существует и аппаратная реализация внутренних буферов в периферийных устройствах (хотя бы на один элемент), но сделано это не всегда и размер буфера жестко ограничен сверху.

Поэтому мы программный буфер реализовывать все-таки будем, для чего естественно было бы применить метод FIFO (то есть очередь) для организации подобного буфера, а очередь, в свою очередь, лучше всего реализовывать на кольцевом буфере с двумя указателями. Когда я пишу «лучше всего», это вовсе не означает, что другие реализации (например, ссылочная очередь) невозможны, либо имеют неустранимые недостатки, помимо фатального. Это выражение всего лишь означает, что реализация будет не слишком сложна и достаточно эффективна, хотя другие могут иметь неоспоримые преимущества перед ней, за которые им придется заплатить чем-либо, поскольку ДарЗаНеБы.

Так как весьма маловероятно, чтобы используемая Вами модель МК имела аппаратную реализацию подобного устройства общего назначения (отдельные периферийные модули могут иметь свои собственные кольцевые буфера, но к теме данного поста они отношения не имеют), нам придется создавать кольцевой буфер в линейной памяти (реализовывать на векторе, это, вообще то, единственный естественный объект в адресуемой памяти), а для этого потребуется индекс буфера (а может, даже два индекса, но об этом позже). На мой взгляд, кольцевой буфер с двумя указателями (индексами) — это единственный приемлемый способ реализации очереди на векторе, но существуют разные точки зрения на этот вопрос и я своими глазами видел реализацию в стиле «х1=х2; х2=х3;… х8=новый символ», если позволите, я такую экзотику рассматривать не буду. То, что приведенный фрагмент может иметь право на существование в некоторой конкретной, весьма ограниченной ситуации, никак не делает его приемлемым в общем виде.

Рассмотрим правильную реализацию программного модуля организации указателя и для начала обратим внимание на первое слово в определении. Чем отличается правильный код от неправильного — не только тем, что правильный код не содержит ошибок, хотя это абсолютное требование. Даже полностью выполняющий свои функции код может быть неправильным, если он непонятен, или если существует вариант, который не менее понятен, но выполняется быстрее, или выполняется так же быстро, но понятнее написан, поэтому понятие правильности несколько относительно. Продолжим рассмотрение нашего примера реализации буфера, который позволит нам продемонстрировать разницу между разными степенями правильности.

Прежде, чем перейти к сути, одно важное замечание по поводу дальнейшего изложения. Я подразумеваю, что Ваш компилятор всегда включен на ненулевой (-О2) уровень отпимизации, так что мы можем не задумываться о мелких улучшениях типа 1)префиксной модификации против постфиксной, либо 2)использования результатов предыдущей операции, либо 3) разницей между инкрементом и прибавлением единицы и так далее — мы предполагаем, что компилятор многое сделает за нас. Конечно, это не строгое предположение, но в противном случае нам придется опустится в недра ассемблера, что в наше время мэйнстримом не является.

Напомню, что нам поручили реализовать индекс (указатель) кольцевого буфера, то есть требуется создать поведение переменной, которая последовательно пробегает ряд значений, от некоторого начального до некоторого конечного. Сразу же предположим, что начальное значение будет равно нулю, иначе нам сразу же придется написать более-менее правильный код, а это противоречит учебным целям и мы не торопимся, а конечное равно Max.

Данное поведение переменной можно реализовать при помощи следующей конструкции:

volatile int Counter = 0;
Counter = (++Counter) % (Max+1);

и именно такой код мы можем видеть в множестве (то есть весьма часто) случаев. Что тут неправильного — ну, во-первых, какое то время (от выполнения операции инкрементирования до присвоения результата) наша переменная будет больше максимально допустимого значения и, если именно в этот момент произойдет прерывание, которому потребуется учитывать значение этой переменной, то лично я предсказать результаты не берусь. Поэтому перепишем программу:

int Counter=0;
Counter = (Counter + 1) % (Max + 1);

Одну неправильность мы устранили, причем код (здесь и далее я буду под словом «код» иметь в виду исполняемый код, порожденный компилятором) не стал длиннее и исполняется не дольше (на самом дел он исполняется быстрее, но лишь потому, что в первом варианте используется совершенно излишнее в данном случае слово volatile), и не стал менее понятным (скорее даже более понятным, но это дело вкуса).

Необходимое замечание о volatile — данная директива нужна, если мы хотим избежать оптимизации кода, приводящей к неверному исполнению, и именно в данном конкретном случае (когда вне зоны видимости модуля значение переменной не меняется и в области видимости нет последовательных записей в нее) она (директива) совершенно излишня. Настоятельно рекомендую посмотреть порождаемый код для обоих вариантов на сайте godbolt.org. Почему не следует злоупотреблять директивой volatile, в отличие от ключевого слова static, которое рекомендуется использовать везде, где возможно. Ну, во-первых, мы запрещаем оптимизацию, то есть код однозначно не станет быстрее (скорее всего, он станет больше и медленнее, но мы предпочитаем строгие формулировки). А во-вторых, в данном конкретном случае это слово вводит в заблуждение, поскольку по отношению к нашей программе значение счетчика никоим образом не может измениться вне нашего контроля. В программе, считывающей его значение — то есть собственно в реализации кольцевого буфера можно считать счетчик изменяемым вне модуля, да и там это под вопросом, поэтому здесь данный атрибут к счетчику просто неприменим. Если одна переменная должна трактоваться по разному в разных модулях, к нашим услугам объединения, если же речь идет об организации критической секции, например, при реализации транзакции или атомарных операций, то здесь эта директива не дает вообще ничего.

Возвращаемся к коду и видим, что программа до сих пор неправильная — в чем тут дело — а дело в том, что она делает не то, что нам требуется (смотрим описание задачи), а совсем другое (вычисляет остаток от деления), просто результаты совпадают. Ну это мы так думаем (я так не думаю, но авторы кода — несомненно), что результаты совпадают, на самом деле в общем случае они не совпадают, просто нам повезло с диапазоном изменения переменной (положительные значения). Более того, процесс выполнения кода более длителен, чем можно бы было сделать, поскольку в лучшем случае мы имеем исполнение операции целочисленного деления (если она входит в состав команд нашей архитектуры), а она выполняется отнюдь не за один такт процессора (характерная величина 10 тактов для 8 битной архитектуры), а в худшем случае мы увидим вызов процедуры деления из стандартной библиотеки (и хорошо, если короткого деления), тут время исполнения будет составлять десятки тактов.

Так почему же такой совершенно неправильный подход до сих пор можно встретить весьма часто. Тут из зала мне подсказывают, что при значении Мах+1, являющемся стенью двойки, компилятор догадается вместо операции деления поставить операцию побитового умножения на соответствующую маску (равную Мах), которая выполнится весьма быстро и все будет хорошо.

Я бы согласился с подобным утверждением и принял данный подход, если бы не следующие обстоятельства:

  • это возможно только для Мах, статически определенного на этапе компиляции,
  • это происходит только при включенной оптимизации,
  • это происходит только при Мах, отвечающем данному условию,
  • это происходит не для всех кардинальных типов.

Более того, именно в данном конкретном случае (когда переменная у нас определена, как знаковая) кроме команды умножения (логического) на маску, будет еще сгенерирована команда сравнения с нулем и ветка для отрицательных значений, и, хотя для нашего диапазона эта ветка никогда не выполнится, место в памяти она займет (а в случае подставляемой функции займет неоднократно) и время на выполнение операции сравнения потребует, если не верите, опять таки следуем на указанный сайт и смотрим сами. Еще один аргумент в пользу без-знаковых кардиналов, чему я недавно посвятил целый пост.

Поэтому, если мы хотим использовать именно логическое умножение с маской (получащееся при оптимизации вычисления остатка), то нам следует переписать модуль соответствующим образом:

typedef uint8_t Counter_t;
static inline Counter_t NextCounter(const Counter_t Counter) {
#if  IS_POWER2(Max + 1) 
return (Counter + 1) & Max
#else
return (Counter + 1) % (Max + 1);
#endif
};

В этом варианте все совершенно понятно и контролируемо и при этом все верно (хотя ряд недостатков и остался, но они теперь очевидны, а не замаскированы), поэтому он правильный, хотя есть и правильнее и мы сейчас их поищем. Главный недостаток, на мой взгляд — нарушение принципа KISS, поскольку применение операции остатка от деления совершенно очевидным образом этим принципом пренебрегает. Поэтому мы сейчас одним ударом все недостатки уничтожим (не волнуйтесь за их судьбу, они 100500 раз возродятся, ведь не все программисты для Ардуино читают мои посты).

Но сначала небольшое отклонение в сторону. Как мы можем реализовать проверку на степень двойки (число в двоичной записи может быть представлено, как {0}1{0} ), которую мы только что использовали

не подглядывайте
#define IS_POWER2(N) ( ( ((N) — 1) & (N) ) == 0)

А как мы можем реализовать проверку того, что число является правой последовательностью единиц {0}1{1} в двоичной записи — один вариант очевиден

#define IsRightSequence(N) IsPower2 ((N) + 1) 

а второй тривиален

#define IsRightSequence(N) ( (((N) + 1) & (N)) == 0)

Примечание: не могу не вспомнить великолепную теорему «Трансцедентное число в трансцедентной степени всегда является трансцедентным, за исключением тех случаев, когда обратное очевидно, либо тривиально».

А как мы можем проверить, что число является последовательностью единиц {0}1{1}{0}

#define IsSequence(N) IsPower2( (N) ^ ((N) << 1))

Ну и напоследок — как выделить младший бит числа (не знаю, зачем это может потребоваться, но вдруг пригодится)

#define LowerBit(N) ((((N) - 1) ^ (N)) & (N)).


А вот придумал, для чего может пригодиться
#define IsRightSequence(N) (IsSequence(N) && (LowerBit(N) == 1))

Любопытное наблюдение — эти макросы не вполне корректны, по ним получается, что 0 является одновременно и степенью двойки и правой последовательностью (разумеется и последовательностью тоже), что немного странно. А вот 1 является всеми этими объектами совершенно справедливо, так что ноль, похоже, надо просто учитывать отдельно. Еще одно интересное свойство этих макросов — мы не делаем никаких предположений о длине аргумента, то есть они корректно работают с любым кардинальным типом.

Есть замечательная книжка «Трюки для программистов», там можно найти упомянутые макросы и множество других, не менее забавных и поучительных задач, настоятельно рекомендую ее к прочтению, тем более, что в ней не слишком много букв.

Но вернемся к нашему индексу кольцевого буфера. Мы дали правильное решение, но пообещали еще правильнее, значит у нашего последнего решения есть недостатки (кто бы сомневался). Один из них — длина буфера должна быть статически определена на этапе компиляции, второй — в случае неудачной длины время выполнения весьма велико и есть еще определенное количество неправильностей в относительно небольшом кусочка программы, что заставляет вспомнить анекдот про 4 ошибки в написании слова «еще». Устраним их все (некоторые оставим на потом) и сразу, для чего, наконец то, напишем решение исходной задачи, как она есть:

static inline Counter_t NextCounter(const Counter_t Counter) {
   if  ((Counter + 1) > Max) {
      return 0;
   } else {
      return Counter + 1;
   };
};

(Как Вы уже поняли, я сторонник египетских скобок и с этим ничего не поделать).

Обратим внимание на то, что мы просто переписали условие задачи с естественного языка на выбранном языке программирования, поэтому оно получается предельно ясным и понятным. Можно ли его улучшить — несомненно, но уже только с точки зрения быстродействия кода, поскольку других недостатков у данного решения просто не осталось (это очевидных не осталось, на самом деле они есть и мы их успешно устраним).

Оценим вычислительную сложность данного решения — сложение с единицей (1) и сравнение (2) всегда, далее присвоение нуля (1) (редко) либо сложение (1) (почти всегда) — что дает 1+2+1+?~4 элементарные операции и ноль памяти. Вполне возможно, что хороший компилятор в правильном режиме сделает определенные оптимизации и сократит время выполнения кода, но лучше мы сделаем это сами в явном виде. Вот следующий вариант:

static inline Counter_t NextCouner(const Counter_t Counter) {
   register Counter_t Tmp;
   Tmp = (Counter + 1);
   if (Tmp > Max) {
      Tmp = 0;
   };
   return Tmp;
};

Оцениваем сложность — сложение и сравнение всегда, присвоение нуля (редко) — приблизительно 3 операции и один элемент памяти. На самом деле в предыдущем варианте тоже был один элемент памяти (неявный), так что у нас чистый выигрыш в одну элементарную операцию. Более того, предыдущий вариант имел еще два недостатка — 1) нарушал принцип DRY (дважды вычислял увеличение на единицу) и 2) имел более, чем одну точку выхода, что не есть хорошо. В понятности мы тоже не потеряли, то есть нам удалось убить кучу зайцев одним выстрелом, да еще и патронов не потратили — прямо история в стиле барона Мюнхаузена.

Обратим внимание что я не стал употреблять конструкцию if ( (Tmp = Counter + 1) > Max), хотя она содержит явное указание компилятору постараться не делать избыточных пересылок. Это вкусовщина в самой неприкрытой форме, я просто не люблю возвращаемое оператором присвоения значение и стараюсь избегать его использования. Причину этого сильного чувства я объяснить не смогу, если верить Фрейду, это, скорее всего, психологическая травма в детстве. Современные компиляторы вполне способны провести несложную оптимизацию самостоятельно, да к тому же я еще добавил квалификатор регистра, так что код для моего варианта и правильного (с точки зрения языка С) будет совпадать. Тем не менее, я нисколько не ограничиваю Вашу свободу использовать тот способ, который Вам представляется предпочтительным.

Продолжаем улучшения, ведь совершенству нет предела, а мы его пока не достигли. Для того, чтобы его достичь, несколько переформулируем исходную задачу и оставим только требование переменной находится в диапазоне значений, не указывая направление изменения. Данный подход позволяет переписать программу следующим образом

static inline Counter_t NextCouner(const Counter_t Counter) {
   register Counter_t Tmp;
   Tmp = (Counter - 1);
   if (Tmp < 0) {
      Tmp = Мах;
   };
   return Tmp;
};

На первый взгляд, ничего особенно не изменилось, но, тем не менее, мы получаем выигрыш по времени. Конечно, не за счет того, что операция уменьшения на единицу работает быстрее, чем операция увеличения на нее же (хотя мне приходилось слышать подобную версию), а за счет особенностей выполнения сравнения. Если в предыдущих вариантах я считал сравнение за 2 элементарных операции (сначала идет вычитание, а потом принятие решения), то в данном случае результат предыдущей операции используется при принятии решения напрямую и сравнение занимает одну элементарную операцию, что приводит к двум операциям всегда и одному присвоению (редко) и мы сэкономили одну операцию (не потеряв ни в чем), как говорится «мелочь, а приятно». Является ли полученное решение идеальным — к сожалению, нет. Оно слегка уступает решению с маской (которое требует ровно 2 элементарные операции) по скорости работы и это, пожалуй, единственный его недостаток.

Существует и еще более быстрое решение — просто увеличивать (уменьшать) значение счетчика и больше ничего не делать, но оно возможно только в единственном случае, когда максимальное значение совпадает с максимально представимым в принятом типе значением. Для счетчика длиной 8 разрядов (то есть типа uint8_t) это будет значение 255, тогда мы просто пишем Counter = Counter + 1 и поверьте мне на слово, что писать Counter += 1 либо же ++Counter совершенно не обязательно, хотя многие именно так и напишут и будут абсолютно правы. Если мы не рассматриваем всерьез версию о необходимости экономить символы (поскольку первый вариант самый длинный), то смысла в этом никакого, по крайней мере, если мы пишем программу для ARM или AVR архитектуры (для других я просто не проверял, подозреваю, что результат будет такой же) под GCC компилятором (автор понимает, что пишут программу в редакторе интергированной среды программирования, это всего лишь речевой оборот из прошлого, когда компьютеры были большими, а память маленькой), и с включенной оптимизацией любого уровня, поскольку порождаемый код будет абсолютно идентичен.

Современные компиляторы весьма и весьма продвинулись в плане оптимизации и порождают действительно очень неплохой код, конечно, если у Вас включен соответствующий режим. Хотя я готов согласиться, что вреда никакого подобные языковые конструкции не несут, а пользу при определенных условиях принести могут, единственное замечу, что выражения Counter++ (в данном конкретном случае, конечно) следует избегать однозначно, поскольку оно предназначено совсем для других ситуаций и может породить более медленный код, хотя и необязательно.

Другой вопрос, что буфер длиной 256 элементов не всегда приемлем, но если у Вас достаточно памяти, то почему бы и нет. При такой реализации, если Вы сможете буфер выровнять на границу страницы, то доступ к элементам может быть сделан весьма быстрым за счет исключения операции перехода от индекса к указателю (ключевое слово union подскажет Вам реализацию подобной фичи, я его приводить не буду, чтобы не учить плохому), но это уже весьма и весьма специфичное решение с сильной привязкой к архитектуре, которое находится опасно близко к трюкам в худшем смысле этого слова, а это не наш стиль.

Разумеется, никто не запрещает нам написать обертку, которая будет вызывать тот или иной метод в зависимости от значения максимального (и минимального, поскольку многие методы просто не работают при не равном нулю минимуме) значения счетчика, основные принципы такого решения я уже предложил, так что предложим это в качестве упражнения.

Ну и в заключение подведем итоги — сведем вместе разные реализации работы с кольцевым индексом и оценим их свойства.
Метод Универсальность Время исполнения
± 0 (1) 1
± % 1(7) 2
+ if 3 (любое) 3.х
— if 3 (любое) 2.х

Во второй строке в скобках показано количество значений размеров буфера (не превосходящее 256), для которых данная реализация доступна, при этом имеется в виду, что буфер размером 0 нас не интересует.

Как нетрудно видеть из данной таблица, ДарЗаНеБы (мое любимое выражение, как Вы могли заметить), и преимущества покупаются ценой недостатков, единственное, что можно заявить однозначно — инкремент с проверкой имеет более успешного конкурента в лице декремента с проверкой и не проходит в следующий тур ни при каких условиях.

Необходимое замечание — существуют языки программирования, в которых нам бы вообще не пришлось задумываться о реализации индекса, а просто можно было бы использовать интервальный тип. К сожалению, реализацию данных конструкций в коде я не могу назвать оптимальной, поскольку данные конструкции (и данные языки) предназначены совсем не для оптимизации по времени исполнения, а жаль.

Итак, мы сделали правильный модуль (какое сильное название для инлайновой функции) работы с индексом, и теперь мы готовы приступить к реализации собственно кольцевого буфера.

И для начала нам следует определиться, что именно мы от этого программного объекта хотим. Совершенно необходимо иметь возможность поместить элемент данных в буфер и извлечь его — итого два основных метода, своего рода геттер и сеттер. Теоретически можно представить буфер без одного из этих методов, а то и без обоих (мало что нельзя представить чисто теоретически), но практическая ценность подобной реализации под большим вопросом. Следующий необходимый функционал — проверка на наличие информации — может быть реализован как в виде отдельного метода, так и в виде специального значения (либо признака), возвращаемого чтением. Обычно предпочитают первый метод, так получается понятнее и не слишком дорого.
А вот проверка на полноту буфера уже под большим вопросом — эта операция потребует дополнительного времени, которое будет расходоваться всегда при записи, хотя нас никто не заставляет использовать ее — так что, пусть будет. Больше нам от буфера ничего не надо, запомним эту фразу на будущее.

Возвращаемся к реализации. Нам потребуется место для хранения элементов очереди и два индекса — один для записи в буфер и один для чтения из него. Как именно мы получим это место (и эти указатели) — тема отдельного разговора, пока оставим этот момент за скобками и считаем, что мы их просто имеем. Некоторые (в том числе и уважаемые мной авторы книги «Программирование для математиков», рекомендую ее для прочтения), используют еще и счетчик заполненных мест, но мы так делать не будем и далее по тексту я постараюсь показать, почему это зло.

Сначала об индексах — сразу же обратим внимание, что это индексы, а не указатели, хотя иногда я позволял себе называть их и так. Почему именно индексы (хранящие информацию о номере элемента очереди), а не указатели (хранящие информацию о расположении в памяти элемента очереди)- вопрос очень непростой, бывают ситуации, когда указатели выгоднее, но это явно не наш случай. У нас очереди будут небольшой длины (даже на 256 мы посматриваем с опаской), поэтому индексы займут меньше места, для них быстрее будут выполняться элементарные операции, не будет проблем с атомарностью операций (в нормальной архитектуре их и с указателями быть не должно, но с индексами длиной 8 бит просто не будет никогда, конечно, если у Вас не 4-х битный контроллер), дополнительные расходы, связанные с переходом от индекса к указателю, будут не слишком велики (при условии, что элементы очереди имеют небольшой размер).

Заметки на полях
Например, в реализации языка С для архитектуры х51 (байтовой в своей основе) указатели имели размер 2 (в случае включения определенной прагмы) или даже 3 байта (по умолчанию), и тот факт, что старший байт был просто тэгом размещения и в адресных операциях не участвовал, намного легче жизнь нам не делал. Кстати, не очень понимаю, почему GCC не создает кода для x51, хотя для довольно таки близкой архитектуры AVR вполне работает.

Более того, многие приемы, повышающие быстродействие получения следующего значения станут недоступны при использовании указателя. А если еще учесть мнение, что указатели более сложны для понимания (не то, чтобы мне это мнение представлялось верным, но оно существует), то выбор однозначен — индексы.

А вот что именно должны показывать индексы — тут простор для фантазии безграничен в пределах размера буфера Мах (и даже более него), но практический смысл имеет весьма небольшой набор вариантов. Для индекса записи это две возможности: 1) указывать на место, куда был записан последний элемент и 2) указывать на место, куда будет записан следующий элемент. Поскольку сразу после создания очереди первый вариант выглядит несколько странно, то выбираем второй, тем более, что это сулит нам ощутимый выигрыш в будущем. Для индекса чтения сразу примем, что он указывает на элемент, который будет прочитан при очередном чтении. Сразу же возникает простой (в смысле проверки) критерий того, что очередь не пуста — индексы не равны. Но возникает вторая проблема — если мы запишем в очередь ровно Мах элементов, то индексы совпадут и мы не сможем отличить эту ситуацию от пустой очереди.

Первое решение данной проблемы («очевидное, всем понятное и простое неправильное решение») применялось множество раз и заключается в заведении счетчика количества помещенных в буфер элементов, либо в продвинутом случае флаге полноты. Почему я к нему отношусь неодобрительно — это 1) дополнительное место в памяти, 2) затраты времени на работу с ним (они невелики, но есть) 3) до наступления момента совпадения индексов значение счетчика избыточно, поскольку совпадает с разностью индексов, 4) в случае с размером буфера в 256 элементов счетчик должен быть длиннее индексов и может оказаться не нативным типом, 5) есть еще один недостаток (почти фатальный), но об этом позже. Как было сказано выше, частично можно ослабить указанные недостатки, организовав не счетчик, а флаг заполненности, но есть решение намного лучше.

Нам всего лишь следует не допускать ситуации, когда индексы могут совпасть после очередной записи (то, что они могут совпасть после чтения, очевидно), а для этого потребуется ограничить возможное количество элементов в буфере значением на 1 меньше возможного. Вот его реализация:

#define NeedOverflowControl YES

typedef uint8_t Data_t;

static Data_t BufferData[Max];
static Counter_t BufferWriteCounter=0, BufferReadCounter=BufferWriteCounter;

void BufferWrite(const data_t Data) { 
   BufferData[BuffWriteCounter] = Data;
   register counter_t Tmp = NextCount(BufferWriteCounter);
#if (NeedOverflowControl == YES) 
   if (Tmp == BufferReadCounter) {BufferOverflow();} else 
#endif
   { BufferWriteCounter = Tmp; }
};

В предыдущей функции есть небольшая неправильность, предлагаю найти ее и устранить самостоятельно, хотя… все таки есть, но продолжим:

 inline int BufferIsEmpty(void) { return ( BufferReadCounter == BufferWriteCounter ); };

inline int BufferIsFull(void) { return ( BufferReadCounter == NextCounter(BufferWriteCounter) ); };

#define DataSizeIsSmaller (sizeof(data_t) < sizeof(counter_t))
data_t BufferRead(void) { 
#if DataSizeIsSmaller
    register data_t Tmp = BufferData[BufferReadCounter];
#else
    register counter_t Tmp = BufferReadCounter;
#endif
    BufferReadCounter = NextCount(BufferReadCounter);
#if DataSizeIsSmaller
    return Tmp;
#else 
    return BufferData[Tmp];
#endif
};

Обратим внимание на ситуацию, в которой мы вызвали процедуру обработки переполнения (если мы поставили флаг необходимости обработки) — когда мы попытались записать последний незанятый байт буфера, мы не сообщили об этом продвижением индекса записи, соответственно, мы не сможем его прочитать — как я и предупреждал, при выбранном варианте реализации емкость буфера снижается на единицу. Также отметим, что мы сначала заносим очередной элемент в очередь, и только потом сообщаем об этом продвижением индекса, обратный порядок мог бы привести к весьма неприятным последствиям.

Прежде, чем посмотреть код с флагом, поговорим немного о переполнении — оно возникает, когда мы не можем поместить очередной элемент в буфер, и у нас есть различные способы решения возникшей проблемы, среди которых есть хорошие и так себе.

Прежде всего, (правильный и хороший) способ номер

1) не допускать подобной ситуации в принципе путем выбора правильного размера буфера (есть подвариант данного способа — увеличивать размер буфера при необходимости, но в мире встроенного программирования он не прижился, да и вообще выглядит сомнительно — раз нам пришлось увеличивать размер буфера, где гарантия, что не придется делать это снова и снова).
Следующий способ (правильный, но похуже, хотя все равно хороший) номер

2) сообщать о возникшем переполнении возвращаемым значением и приостанавливать запись в буфер — так называемая блокирующая запись, но он не всегда реализуем.

А вот дальше идут два неправильных и плохих способа:

3) и 4) игнорировать возникшую проблему и делать вид, что все хорошо («улыбаемся и машем»). Почему они плохие — потому что мы только делаем вид, что все отлично, на самом деле принцип Дирихле (задача о N клетках и N+1 птице) нарушить нельзя и мы теряем элемент данных, а два способа потому? что мы можем

3) потерять последний записанный элемент данных, а можем

4) потерять первый из еще не переданных элементов.

Какой их этих способов хуже — «оба хуже», хотя для конкретной задачи какой-то из них может оказаться более приемлемым, но главный недостаток неустраним — мы вынуждены терять информацию. Поэтому чаще всего применяют способ 3, поскольку его проще реализовать (для этого достаточно оставить неизменным индекс записи), что я и сделал в предыдущем примере, если обработка переполнения пуста.

Есть и еще один способ — вообще не контролировать ситуацию, (в нашем примере закоментировать строчку с дефайном, но не строчку с собственно проверкой), при этом мы

5) будем терять весь заполненный буфер — на первый взгляд он кажется самым плохим, поскольку потери самые большие, на самом деле это не вполне так, поскольку любая потеря данных есть зло, но у него есть несомненное преимущество — данный метод быстрее, поскольку вообще переполнение не контролирует.

Интересное наблюдение — быстрый поиск в Инете не обнаружил алгоритма восстановления данных в случае потери элемента, в отличие от случая искажения элемента, где прекрасно работают блоковые коды.

Вариант с флагом переполнения, на удивление, проигрывает по быстродействию совсем немного, если его правильно написать, но, тем не менее, проигрывает, причем по памяти мы, конечно, выигрываем один элемент, но нужно отвести место под флаг, так что экономия под вопросом. Вариант со счетчиком мы расматривать просто не будем, потому что я уже перечислил 4 его недостатка и наступило время припомнить пятый, как я и обещал, помимо фатального. В предложенной ранее реализации индексы обладают свойствами MRSW (Multi-Reader Single-Writer) по классификации «The Art of Mulpiprocessor Programming» (настоятельно рекомендую к прочтению, совершенно потрясающий труд) и в случае атомарных операций изменения индексов (для нативного типа) вообще не требуют никаких средств синхронизации. Необходимое и очень важное замечание — синхронизация не требуется только с точки зрения взаимодействия записи и чтения, обе функции ни в коей степени не являются реентрабельными и небезопасны с этой точки зрения, что важно помнить.

А вот счетчик будет иметь свойство MRMW и без синхронизации с ним работать просто нельзя, от слова «совсем» (если, конечно, Вашей целью не является создание «внезапно глючной» программы). Если же учесть тот факт, что мы пишем модуль для работы с периферией и, соответственно, запись либо чтение могут вызываться из прерывания, то вопрос синхронизации совершенно необходимо рассматривать. Что интересно, флаг, вроде бы имеющий аналогичные свойства, тем не менее допускает работу с ним без средств синхронизации (забавно, не правда ли, но имеет вполне научное объясние — операция изменения становится атомарной, а логика работы флага допускает, и даже навязывает, перекрытие записи), что иллюстрируется следующим фрагментом программы.

Обратим внимание, что подобный подход (флаг без средств синхронизации) возможен лишь при соблюдении определенных условий, которые следует тщательно проверять в Вашем случае. Подробности можно посмотреть в литературе, я их приводить не буду, поскольку данный способ организации буфера считаю не слишком хорошим, и привожу только для того, чтобы показать, что и не самое удачное решение можно реализовать аккуратно, а также продемонстрировать еще одну концепцию, которую считаю весьма полезной и которой намерен придерживаться.

typedef uint8_t data_t;
static data_t BufferData[Max];
static counter_t BufferWriteCounter=0, BufferReadCounter=WriteCounter;
static int8_t BufferHaveData = 0;

void BufferWrite(const data_t Data) { 
 if ((BufferWriteCounter == BufferReadCounter) && (BufferHaveDataFlag == 1)) {BufferOverflow();} else {
  BufferData[BufferWriteCounter] = Data;
  BufferHaveDataFlag = 1;
  BufferWriteCounter = NextCounter(BufferWriteCounter);
 };
};

inline int BufferIsEmpty(void) { return ((BufferReadCounter==BufferWriteCounter) && (BufferHaveDataFlag == 0));};

data_t BufferRead(void) { 
 register counter_t Tmp;
 Tmp = BufferReadCounter;
 BufferReadCounter = NextCount(BufferReadCounter);
 if (BufferReadCount == BufferWriteCounter) { BufferHaveDataFlag = 1; };
 return BufferData[Tmp];
};

Обратим опять внимание, что в процедуре записи мы сначала ставим флаг, а потом модифицируем индекс, а в процедуре чтения сначала проверяем индексы, а потом контролируем флаг, что опять таки бережет нас от неприятностей и чем то перекликается с обращением с ресурсами для исключения взаимной блокировки.

Вообще говоря, данный фрагмент следовало бы переписать в правильном стиле (с исключением магических констант 0 и 1, если Вы думаете, что это не магические константы, то Вы ошибаетесь), и если Вы будете использовать его, то так и делайте, я прячу исправленный код в спойлер, не потому, что стесняюсь, а чтобы не разжигать очередной раунд священной войны (бессмысленной и беспощадной), ненавистники перечислений, Вам не следует открывать данную кнопку,

остальным - можно
typedef (NoBufferHaveData= 0, BufferHaveData =1) BufferHave DataFlag_t;
BufferHaveData_t BufferYaveDataFlag;
inline void BufferHaveDataFlagSet(void) {BufferHaveDataFlag = NoBufferHaveData;};
inline void BufferHaveDataFlagClr(void) {BufferHaveDataFlag = BufferHaveData;};
inline int BufferHaveDataFlagIsSet(void) {return (int)(BufferHaveDataFlag == BufferHaveData);};


Что интересно, код для данного подхода будет ровно такой же, как и для прямых констант 0 и 1, но зато все предельно прозрачно и понятно и не оставляет места для толкований.Заранее соглашусь, что пример выглядит надуманным и, если наружу выдать только функции работы с флагом, то внутри вполне себе можно использовать константы 0 и 1. Все это верно, единственное, на чем я настаиваю, это именно на таком поведении флага, Вы можете назвать BufferFullFlag и поменять логику работы с ним, но ни в коем случае он не должен называться BufferIsNotEmptyFlag с вытекающими отсюда последствиями в виде загадочных логических операций. Еще раз подчеркну, принцип KISS неоднократно продемонстрировал свою безусловную верность и, если нас интересует, пуст ли буфер, мы так прямо и должны писать в программе, а не задавать вопрос, «не неполнен ли он».

В любом случае, я не считаю реализацию с флагом хорошей, поэтому настоятельно рекомендую смириться с незадействованным на весь размер буфером и принять реализацию с двумя индексами и без дополнительных полей.

Вообще то неожиданно обширный пост получился для столь простой темы, я думал еще про синхронизацию и критические секции написать, но это уже в следующий раз.

P.S. Ну и в заключение, что именно мне не понравилось в упомянутой библиотеке, но в то же время авторы отечественной РТОС это забрали к себе в код без малейших сомнений:

  1. Даны две реализации буфера — одна для размера степени двойки (надеюся, я показал, что это совершенно не нужно), вторая — для остальных случаев, но выбирать версию придется ручками, конечно, ошибиться не дадут, повсюду проверки.
  2. Сделаны совершенно не нужные методы вроде удаления последнего элемента или прямого доступа с элементу буфера.
  3. Буфер данных выравнивается на целое число.
  4. В реализации степени 2 ошибка в проверке заполненности.
  5. В реализацию произвольного размера затесался счетчик
  6. Совершенно не организованы критические секции, которые в правильной реализации (с двумя индексами) просто не нужны, но тут то без них не обойтись, применение вместо них атомарных операций явно недостаточно.
  7. Некоторая небрежность в стиле, строки вида

    return ((_writeCount - Atomic::Fetch(&_readCount)) &
    	(size_type)~(_mask)) != 0;

    особенно ее вторая половина — это как раз то, за что С ругают, причем сам язык не виноват, он лишь позволяет написать подобное вместо более понятного

    size_type(~(_mask))

    но не заставляет это делать.

P.P.S. Надеюсь, автор библиотеки согласится считать данную критику конструктивной и внесет соответствующие коррекции.

Комментарии (18)


  1. boblenin
    01.02.2019 17:41

    Интересно, а почему вопрос не ставится так: «Если затраты на разработку архитектуры НЕ кажутся Вам чрезмерными, подумайте, во сколько Вам может обойтись неправильная архитектура».

    Я возможно не прав, но правильную архитектуру можно спроектировать тогда, когда вы подобное уже строили, при этом условия рынка остались неизменными.


    1. GarryC Автор
      01.02.2019 17:49

      Есть у кого то из великих, вроде бы у Кнутта, фраза «Лучшее, что может случиться с программистом, это когда он теряет все исходники готовой и работающей программы и вынужден писать ее заново, но уже зная, как это должно быть сделано»


      1. boblenin
        01.02.2019 18:04

        Даже если это ваша цитата, все-равно не плохо, пусть и с оговорками :). Собственно в значительной мере начальный успех Open Source движения как раз в этом и заключался (90-е годы). Переписывали софт, который кто-то когда-то уже сделал.


      1. slonpts
        02.02.2019 08:10

        А потом начальство сообразит, что можно декомпилировать бинарники, и будет тот же архитектурный ужас, только еще имена локальных переменных теперь все __CS_jnfdUYGioj_1 да Ss$8nNcLxzaD, а приватные методы называются m1(), m2(), m3()


  1. firk
    01.02.2019 18:32

    Ужасно, как можно на такой элементарный вопрос написать столько текста.
    Ну и


    Ну и напоследок — как выделить младший бит числа (не знаю, зачем это может потребоваться, но вдруг пригодится)
    define LowerBit(N) ((((N) — 1) ^ (N)) & (N)).

    define LowerBit(N) ((N) & 1)

    А то что у вас — это не младший бит, а младший единичный бит.


    1. jcmvbkbc
      02.02.2019 01:32

      Ну и напоследок — как выделить младший бит числа (не знаю, зачем это может потребоваться, но вдруг пригодится)
      define LowerBit(N) ((((N) — 1) ^ (N)) & (N)).


      Для этого есть более короткая и понятная запись: #define LowerBit(N) (-(N) & (N))


    1. GarryC Автор
      02.02.2019 14:04

      Если бы я раз за разом на гитхабе не натыкался на элементарные ошибки, я бы не стал писать такой длинный текст. В этом посте я постарался объяснить, почему надо делать так, а не иначе.
      Ну и конечно же, младший значащий бит, спасибо за правку.


  1. vintage
    01.02.2019 23:13
    +2

    Пока вы занимались всеми этими микрооптимизациями, вы пропустили критическую ошибку из-за которой потребитель будет периодически читать мусор из очереди. Всё дело в том, процессор волен (и будет) переставлять независимые операции как угодно, а про эту зависимость он без барьеров памяти никак не сможет узнать:


    Также отметим, что мы сначала заносим очередной элемент в очередь, и только потом сообщаем об этом продвижением индекса, обратный порядок мог бы привести к весьма неприятным последствиям.

    Так что правильное смещение индекса должно выглядеть как-то так: https://github.com/nin-jin/go.d/blob/lock-free/source/jin/go.d#L261


    1. GarryC Автор
      02.02.2019 14:02
      -1

      Вы совершенно правы, я рассматривал достаточно простые архитектуры, а в продвинутых надо гарантировать порядок, но я предпочитаю барьеры атомикам.


      1. jcmvbkbc
        03.02.2019 07:22

        я предпочитаю барьеры атомикам

        Звучит как «я предпочитаю тёплое мягкому»: одних только барьеров недостаточно для реализации атомарных изменений памяти, сами по себе атомики не дают гарантий упорядочивания других обращений к памяти.


  1. jcmvbkbc
    02.02.2019 01:35
    +2

    Данное поведение переменной можно реализовать при помощи следующей конструкции:

    volatile int Counter = 0;
    Counter = (++Counter) % (Max+1);


    и именно такой код мы можем видеть в множестве (то есть весьма часто) случаев. Что тут неправильного

    Неопределённое поведение, школьный пример.


  1. CoolCmd
    02.02.2019 12:00

    например, ссылочная очередь

    это что за зверь?


    1. GarryC Автор
      02.02.2019 14:00

      Список с указателями на голову и хвост.


      1. CoolCmd
        02.02.2019 16:29

        связаный список?


  1. u_235
    02.02.2019 14:00
    -1

    AVR врядли научится переставлять инструкции. А на десктопе об этом должен компилятор беспокоиться.


    1. vintage
      02.02.2019 14:15
      +1

      Каким образом компилятор тут поможет? Расстановкой по барьеру после каждой инструкции?


      1. u_235
        02.02.2019 22:07

        Я упустил из виду многозадачность. А в однозадачной программе барьеры памяти не нужны.


        1. vintage
          02.02.2019 22:24

          Если не работать со внешними для процессора устройствами, то конечно.