Требования:
- Решение должно отрабатывать как можно быстрее (иначе в нем теряется смысл)
- Решение должно быть потокобезопасным
В результате у меня получилась функция (естественно в составе отдельного класса), которая возвращает true либо false (разрешение для выполнения).
Общий принцип таков. Есть коллекция отпечатков
Изначально я делал все на основе листов (List<>), но листы имеют динамический набор данных, а значит они расширяются по надобности и при добавлении новых элементов. А надо еще и удалять. Поэтому я решил использовать коллекцию заданного размера. Поэтому от листа решил отказаться в пользу массива. В результате у меня получился функционал круговой коллекции, основанной на массиве, поэтому в итоге я сделал круговую коллекцию.
Получился пул из отпечатков даты заданного размера на основе круговой коллекции. Класс пула содержит текущий элемент, в котором есть отпечаток даты и ссылка на следующий элемент. Изначально все даты имеют значение по умолчанию.
При запросе разрешения берется следующий от текущего элемент, и к его дате прибавляется 1 секунда. Если дата получается больше текущей даты, значит в пуле нет места и запрос возвращает ложь. Иначе в качестве текущего элемента устанавливается следующий, в новом текущем элементе устанавливается слепок даты и возвращается истина.
Теперь о потоках. Я не придумал ничего лучше, как поставить lock на функцию разрешения.
Вот и все.
Для тестирования я создал метод, который выполняет заданное количество запросов с заданным количеством потоков. Запустил его со следующими параметрами. Количество операций в секунду — 6, количество потоков — 4, количество запросов — 50.
Получились вот такие результаты:
> Ссылка на гитхаб
По скрину видно, что код отработал верно.
Комментарии (72)
skyramp
25.06.2017 18:38на какой диапазон допустимых значений для «количества операций в секунду» вы ориентировались, разрабатывая этот алгоритм? вы уверены что вам нужно именно точное решение этой задачи и не достаточно приближенного?
MoreBeauty
25.06.2017 18:48Я ориентировался на небольшие (меньше 100). Но в целом можно использовать и для бОльших значений. Тут все будет зависеть от конкретной задачи.
А почему Вы спросили про приближенное? Вообще мне нужно ограничить количество определенных запросов от пользователя конкретной цифрой.skyramp
25.06.2017 18:54основной вопрос — зачем вам хранить все таймстемпы запросов за последнюю секунду? почему бы не использовать leaky bucket?
MoreBeauty
25.06.2017 19:05Потому что я не вижу способа использовать здесь данный алгоритм. Но если вы знаете, подскажите.
skyramp
25.06.2017 19:17Берете счетчик, устанавливаете его в число rps. Допустим 10 — для 10 rps. Допустим последний запрос был в таймстемп t_last. Счетчик будет пополняться со скоростью 10 единиц в секунду (до максимального значения равного 10).
Когда приходит новый запрос в таймстемп t_current — увеличиваете счетчик на (t_current-t_last) * 10 и уменьшаете на 1. если получилось меньше 0 — значит количество запросов превышено. Если больше — то ок, сохраняете t_last = t_current.MoreBeauty
25.06.2017 19:27Я не понимаю. Устанавливаю счетчик на значение 10 (для 10 rps). Счетчик будет пополняться со скоростью 10 единиц в секунду до максимального значения равного 10. Он же уже 10. И зачем его дергать каждую секунду без надобности?
И как t_current (t_current-t_last) * 10 может получиться меньше 0?
Не понимаю короче.skyramp
25.06.2017 19:31Дергаете только когда новый запрос приходит. C_current = C_last-1+(t_current-t_last)*10. Достаточно хранить только t_last (таймстемп последнего запроса) и С_last (последний счетчик)
MoreBeauty
25.06.2017 19:44Что такое t_current и t_last? Если это отпечатки времени в миллисекундах, то
(t_current-t_last)*10 при разнице в полсекунды будет равно 5000.
Извините, может мне уже пора спать.skyramp
25.06.2017 19:49Вещественное число, в секундах. Ну если хотите чтобы было целое и в миллисекундах — то счетчик должен будет прирастать за 1000 мс на 10 единиц, т.е. на 0.01 единиц за 1 мс. Формула преобразуется в C_current = C_last-1+(t_current-t_last)*0.01
MoreBeauty
25.06.2017 20:06-2Не должен метод дергаться по таймеру. Да и с расчетами беда у вас. но общий смысл вроде уловил. Подумаю над этим завтра. Сегодня голова уже не варит.
skyramp
25.06.2017 20:36хм. whatever… ключевые слова в гугле: leaky bucket, token bucket. в качестве примера можно взять исходники модуля limit_req в nginx'е.
MoreBeauty
26.06.2017 04:45Все, я вас понял. Я попробовал сделать через ведро размером rps. Каждый запрос добавлял в ведро 1 и вычитал дельту времени, поделенную на rps и умноженную на 1000.
c_count = l_count + 1 — (c_time-l_time) / rps * 1000
Плюс данного метода в том, что нет пула, то есть нет расхода оперативной памяти и сложность инициализации О(1).
А вот минусы:
- (c_time-l_time) / rps * 1000 — теряет точность при rps кратному простому числу начиная с 3.
- Количество математических операций больше (то есть расчеты выполняются дольше чем в моем способе)
Так вот для моей задачи эти минусы критичны, а лишняя оперативка — нет.
MoreBeauty
26.06.2017 08:29Блин, как тут удалять комменты? :)
Конечно же, формула будет такой
c_count = Math.Max(l_count + 1 — (c_time — l_time) /1000 * rps, 0)
Минуса с неточностью, следовательно, не будет. Но вот по производительности надо будет проверить. К ней требования выше, чем к жору оперативной памяти.
MoreBeauty
26.06.2017 09:20В общем способ тут не подходит. Совсем.
Проблема в том, что в ведро приходит целая операция, а уходит какая-то ее часть (эквивалент дельты времени).
В результате из ведра может утечь половина, но реально количество выполненных за последнюю секунду операций останется неизменным. В итоге функция пропускает операции, которые не должна была пропустить.
ЗЫ Я прежде чем чего то предлагать людям, стараюсь проверить сам. Попробуйте и вы. На данный момент способ видится мне нерабочим.
kekekeks
25.06.2017 18:48+2Есть коллекция отпечатков DateTime
А теперь подумайте, что случится при смене системного времени по тем или иным причинам. Для таких целей следует использовать монотонный таймер. В .NET таковым является
Stopwatch
.
rPman
25.06.2017 20:22+1Как то усложнено, не так ли?
Достаточно хранить круговой массив из N дат последних запусков задач, если дата последнего больше периода — запускаете задачу, иначе ждете паузу в разницу между этим периодом и текущим временем.MoreBeauty
26.06.2017 04:48Да, если вы посмотрите предыдущие ревизии на гитхабе, то увидите, что до этого использовался как раз круговой массив из дат :)
Но мне еще важна сложность поддержки. Поэтому я сделал очевидный круговой набор данных.
MoreBeauty
26.06.2017 04:52Кстати, ничего ждать мне не нужно. Вместо ожидания функция просто возвращает false
omickron
26.06.2017 10:28На java я бы использовал BlockingQueue, в который раз в секунду клал нужное количество "разрешений".
Поток пробует взять "разрешение" из очереди. Если не может — ждёт или возвращает формальный ответ, смотря что требуется. Если может — полноценно выполняет запрос.MoreBeauty
26.06.2017 10:32Повезло вам.
omickron
26.06.2017 10:34В плане? Разве в C# нет аналога блокирующий очереди?
MoreBeauty
26.06.2017 10:45Я не знаю что делает BlockingQueue в Java, но в C# нет ничего, что мне бы подошло из коробки.
lisper
26.06.2017 11:17MoreBeauty
26.06.2017 11:23Вы знаете, весь код умещается у меня на ладошке. То есть его немного. Так что приведите конкретную реализацию. А до тех пор я не вижу плюсов. Из минусов сразу скажу, размер коллекции динамический, что значит, что при добавлении нового элемента происходит выделение памяти. А это ненужная мне дополнительная нагрузка.
omickron
26.06.2017 12:05Вы ошибаетесь:
— java.util.concurrent.ArrayBlockingQueue — реализация BlockingQueue — имеет статический размер.
— если же вы хотите динамически в runtime менять RPM, можно использовать реализацию java.util.concurrent.LinkedBlockingQueue.
— «разрешением» может быть что угодно, хоть один и тот же объект, вроде Boolean.TRUE, которому не требуется выделять дополнительную память.
Ну и про ладошку: мой код, конечно, лаконичнее, т. к. использует только стандартные библиотеки:
— одна строка на иницилизацию queue
— пара строк на null-проверку результата queue.poll()
— ну и ~ 10 строк на самостоятельное создание таймера на java. А в случае использования Spring это будет метод из одной строки. Не думаю, что в C# создание потока с таймером будет сильно длинее.
И никаких индексов ;)MoreBeauty
26.06.2017 12:07Я же вам сказал, повезло вам. Я в шарпе работаю. И ответил я не вам, а lisper'у :)
А про таймер, тут вы не правы ИМХО. Таймер = дополнительная нагрузка.omickron
26.06.2017 12:16Мне думается, что C# и Java сильно похожи в общем плане.
Смысл моего ответа в том, что вместо велосипеда лучше использовать стандартные инструменты, которые предоставляет язык и его внутренние библиотеки.
Это будет надёжнее и понятнее другим, кто будет поддерживать ваш код. В том числе и вам, через пару лет ;)
А про поток — ну если считать отдельный поток, который 99.9% времени спит, дополнительной нагрузкой — тогда да, соглашусь :)MoreBeauty
26.06.2017 12:41Стандартные инструменты подходят для большинства стандартных задач.
В моем случае задача не стандартная и стандартных инструментов без потери в производительности для нее нет. В первую очередь код должен соответствовать своей задаче, не выполнять ничего лишнего (особенно там, где каждый тик на счету), и только во вторую должен быть понятным.
Поток будет спать не 99,9% времени в данном случае. Хотя это зависит от его периодичности. Тут во избежание ошибок периодичность должна быть 1 такт в миллисекунду. И то этого мало, это урезает точность выполнения, когда за 1 миллисекунду происходит больше одного запроса. И кроме всего прочего таймер (в .NET по крайней мере) гарантирует выполнение не чаще, чем установленная периодичность, а вот реже вполне может. Значит точность будет еще ниже.
Но как обстоят дела на Java, уважаемый Senior Java Developer, я не знаю. тут вам виднее.omickron
26.06.2017 13:02Ну раз в C# всё так печально, как вы описываете, действительно, мне повезло в своё время с выбором ;)
MoreBeauty
26.06.2017 13:14Или вы не привыкли замерять производительность вашего кода.
omickron
26.06.2017 13:44:)
Думаю, об этом имеет смысл дискутировать, говоря об одной и той же реализации в одном и том же языке.MoreBeauty
26.06.2017 13:47Покажите код вашего таймера
omickron
26.06.2017 14:04Я не знаю тонкостей C#, поэтому полагаюсь на ваше мнение, что для этой задачи нельзя обойтись стандартными средствами и приходится писать своё.
А может, ваша задача настолько специфична, но я не знаю всех деталей?
Код таймера тривиальный:
TImeUnit.SECONDS.sleep(1); queue.putAll(collection);
Кстати, мне пришло в голову ещё более простое решение на AtomicInteger:
— раз в секунду делать atomic.set(rpmCount);
— по запросу если atomic.decrementAndGet() отрицательный, не выполнять его.
Ни очередей, ни массивов — лишь один счётчик.MoreBeauty
26.06.2017 17:16Я не уверен, как в яве разруливаются потоки, но в .NET количество активных потоков не может превышать количество ядер процессора. Если потоков запущено больше, то они будут чередоваться (один уснул, другой проснулся). То есть если я запускаю таймер с интервалом 1мс и в системе запущено еще несколько потоков, то таймер может не проснуться вовремя, так как другие активные потоки могут надолго зависнуть и не отдавать эстафету системе (среде выполнения). Почему то мне кажется, что это ограничение железа и системы по-другому обойти проблематично и в яве все устроено так же.
В любом случае вы можете это проверить, запустив 100 потоков параллельно на цикл 1-100 с выводом счетчика и номера потока.
Но даже если таймер в яве работает точно как швейцарские часы, появляется просадка по точности. И точность эта 1мс. Хотя за 1 мс может быть выполнено довольно много запросов.
В моем коде на данный момент то же самое ограничение, но я легко могу его преодолеть, заменим штамп с ElapseMilliseconds на ElapsedTicks. Количество тиков за миллисекунду зависит от процессора и настроек системы. На моем домашнем (слабеньком) компе это значение равно около 300000. То есть точность будет максимальной (за 1 тик точно больше одной операции не будет выполнено).
Как то так.
А про AtomicInteger я не в курсе. В шарпе такого нет. Так что я не понимаю как это работает.Szer
27.06.2017 11:07Я не знаю что делает BlockingQueue в Java, но в C# нет ничего, что мне бы подошло из коробки.
А про AtomicInteger я не в курсе. В шарпе такого нет.Вам бы книжку какую почитать, а то получается что Вы не в курсе, а виноват почему-то C#
BlockingCollection Overview
Interlocked OperationsMoreBeauty
27.06.2017 11:09Очень замечательно. А теперь предложите решение на основе BlockingCollection, которое не проигрывает по скорости.
Szer
27.06.2017 13:06Проигрывает по скорости чего? Скорость выполнения CanExecute()?
Нужен критерий перформанса.
Через коллекцию токенов можно посмотреть как в Akka.Net реализован Throttle.
А через атомарные операции и таймер:
https://pastebin.com/fZ7EBGe3
Вызов таймера выполняется через ThreadPool
MoreBeauty
27.06.2017 13:10Да хватит пихать сюда таймер! Я уже описал пробелмы таймера. Если ThreadPool не найдет свободного потока, то таймер будет опаздывать! И это при том, что частоты 1 раз за 1мс и так недостаточно!
Szer
27.06.2017 13:20Тогда могу сказать что в Вашей задаче не хватает требований, которые Вы придумываете на лету, и которые никто кроме Вас не знает.
Способов реализации того, что описано в начале статьи (2 пункта) — масса. В том числе с помощью стандартных инструментов, с которыми Вы незнакомы.
Что в данном случае понимается под скоростью — остаётся загадкой.
MoreBeauty
27.06.2017 13:25Под скоростью в данном случае я имею ввиду время выполнения CanExecute()
Требования — она должна выполняться максимально быстро. Если в моем решении она выполняется быстрее, чем в вашем, вы проигрываете.
Я много с чем не знаком в платформе .NET. Как и вы, думаю. И если бы данная коллекция могла бы мне помочь, я бы с ней познакомился. Потому что я, прежде чем «изобретать велосипед», прочитал описание всех коллекций, которые смог найти. На некоторых останавливался чтобы изучить подробнее. И ничего мне не подошло.Szer
27.06.2017 13:36Под скоростью в данном случае я имею ввиду время выполнения CanExecute()
Тогда спешу сообщить, что на моей машине вызов CanExecute() из моего решения с атомарными операциями занимает 12нс, а из вашего — 42нс.
Измерял BenchmarkDotNet.
MoreBeauty
27.06.2017 13:46Но я так понимаю в CanExecute() вы только добавляете слепок времени? А очистка происходит по таймеру?
Szer
27.06.2017 13:51Но я так понимаю в CanExecute() вы только добавляете слепок времени? А очистка происходит по таймеру?
Если что код я выложил: https://pastebin.com/fZ7EBGe3
В CanExecute() происходит атомарный декремент числа (никаких походов в heap, всё на стеке). Если число меньше 0, то атомарно инкрементим обратно.
По таймеру происходит обратное. Инкрементим, и если токенов стало очень много — декрементим.
Переменную можно сделать volatile чтобы она случайно незакешировалась в проце.
skyramp
27.06.2017 14:05тут даже наверное и тред с таймером не нужен — можно обрабатывать возможное срабатывание таймера в том месте где запрос пришел (непосредственно в CanExecute). но опять таки, автору это решение как я понимаю не подходит, т.к. например в ситуации ограничения на 1 рпс последовательность «putToken -> sleep(0.9с) -> request -> sleep(0.1с) -> putToken -> sleep(0.1с) -> request» выполнит оба реквеста и между ними интервал будет всего 0.2с, что не вписывается в ограничение по рпс. нужно ли именно такое ограничение на самом деле и стоит ли ради его точного соблюдения тратить лишнюю память на хранение каждого таймстемпа запросов за последнюю секунду — это уже вопрос к автору
MoreBeauty
27.06.2017 14:09Что я должен передать в конструктор в качестве perInterval?
Szer
27.06.2017 14:16Что я должен передать в конструктор в качестве perInterval?
Допустим TimeSpan.FromSeconds(1)
MoreBeauty
27.06.2017 16:51Ваша реализация ломает ограничение на количество операций в секунду.
Szer
27.06.2017 17:21Ваша реализация ломает ограничение на количество операций в секунду.
Нет, не ломает. Желательно скриншот и объяснение.
MoreBeauty
28.06.2017 04:34Нет, не ломает. Желательно скриншот и объяснение.
Фактически, то же самое предложил skyramp только без таймера. Математическая модель та же. Вместо таймера мы просто замеряем дельту времени между запросами при выполнении CanExecute(). То есть у вас происходят вычисления каждые Х мс, а без таймера просто при запросе эти вычисления умножатся на на количество мс между запросами.
Так вот такое решение на том же самом тесте, на котором я тестировал свое, пропустило 8 задач вместо 6. Все дело в том, что запрос при таком подходе оценивается как усредненное значение времени. Это будет работать только в том случае, если запросы будут поступать каждые s / rps секунд. То есть такой подход ломает ограничение. Более того, он позволит пользователям обойти его намеренно.
lisper
26.06.2017 18:39Точно вижу один плюс — использование стандартных средств языка, а не написание своего велосипеда. Экономия на выделении памяти под элементы — экономия на спичках. Никто не запрещает перед основной работой сразу создать нужное количество элементов. Кстати, не очень понятно, зачем в условии задачи привязка к количеству запусков в секунду. Очень похоже, что это переформулированная задача об одновременном существовании определенного количества объектов в нескольких потоках. Для таких вещей придуманы семафоры. В частности в шарпе есть модификация SemaphoreSlim, которая позволяет задать сколько именно потоков будут иметь параллельный доступ к ресурсу. И нет, писать код за вас я не стану. На MSDN отличный пример для SemaphoreSlim
MoreBeauty
26.06.2017 18:59Я уже где то отвечал на это. У меня задача не стандартная и стандартные средства тут не годятся.
Экономия на выделении памяти под элементы — экономия на спичках.
Я не экономлю на памяти как раз.
Никто не запрещает перед основной работой сразу создать нужное количество элементов.
Да, я именно так и делаю.
Кстати, не очень понятно, зачем в условии задачи привязка к количеству запусков в секунду. Очень похоже, что это переформулированная задача об одновременном существовании определенного количества объектов в нескольких потоках.
Есть API, некоторые функции для каждого пользователя ограничены количеством выполнений в секунду. Ограничение это взято из статистики. В итоге сервер успевает обработать все входящие запросы и пользователи не ждут в очереди, пока отработают куча ненужных запросов других пользователей, которые, кстати, могут так наспамить, что мало не покажется. А так им придется организовывать работу под ограничение. Ну и плюс это какая никакая защита от ddos.
Для таких вещей придуманы семафоры. В частности в шарпе есть модификация SemaphoreSlim, которая позволяет задать сколько именно потоков будут иметь параллельный доступ к ресурсу. И нет, писать код за вас я не стану. На MSDN отличный пример для SemaphoreSlim
Вы мне рассказываете о чем моя задача? Зачем? Я не писал изначально о ее предназначении, потому что это не имеет значения.
Nekroido
26.06.2017 10:52Есть, BlockingCollection.
MoreBeauty
26.06.2017 10:53Она никак не помогает решить мою задачу.
Nekroido
26.06.2017 11:15Товарищ omikron описал очень интересный алгоритм (который я возьму на вооружение). Раз в секунду в коллекцию кладётся нужное количество "разрешений". Например, всего 10 раз в секунду, метод в прошлую секунду выполнился 7 раз, значит в коллекции осталось 3 разрешения. Другой поток проходится по этой коллекции и "спит", если количество разрешений равно нулю. Особенность BlockingCollection в том, что цикл не заканчивается, если коллекция пустая.
Алгоритм очень лаконичный. Из математики там только одна операция вычитания.MoreBeauty
26.06.2017 11:26Я тут рассуждаю про минимизацию нагрузки, а вы собираетесь вешать на функцию таймер! Еще и нагрузить выделением памяти для добавления нового элемента в очередь.
omickron
26.06.2017 11:45Вы достаточно чётко описали задачу и требования.
Моё решение, как и ваше, имеет сложность O(1) по скорости и O(N) по памяти. Т. к. количество запросов в секунду предполагается небольшим, O(N) по памяти мне кажется допустимым.
Плюс моего решения в гибкости: вы можете в runtime менять количество операций в секунду.MoreBeauty
26.06.2017 12:04Я вам мне про сложность, а про динамические коллекции. Вы забываете про выделение памяти для добавляемых элементов. В результате при сложности O(1) у вас выполняется инструкций больше, чем вы рассчитывали. Помножим это на количество обращений, в итоге картинка получится не радужная.
omickron
26.06.2017 12:10Какую коллекцию использовать: динамическую или статическую, решать разработчику. Я об этом написал выше.
Хотите статику — используйте коллекцию на массиве.
Хотите динамически настраивать — используйте связый список, в котором добавление/удаление элементов практически бесплатно.MoreBeauty
26.06.2017 12:16Зачем мне использовать массив, если описание элемента связного кругового списка в моем случае занимает 3 строки и вносит ясность? К тому же не требует больше никакой логики на проверку переполнения. К тому же ничего не нужно удалять/добавлять.
Не всегда лаконичный код является наиболее верным. Далеко не всегда.
А выделять дополнительный поток для таймера — это вообще моветон! Я тут на копейках экономлю, а вы предлагаете косарик вложить :)
Nekroido
26.06.2017 16:05По моему скромному опыту реализация цикла даже с такой проверкой сильно бъёт по процессорному времени и нужно это дело искусственно ограничивать через Thread.Sleep(). Уберите его в своих тестах и посмотрите что произойдёт. Уверен, погоня за памятью сразу уйдёт на второй план.
MoreBeauty
26.06.2017 18:47Зачем убирать Thread.Sleep()? у меня же запросы будут не сплошным потоком. Я же не файлы копирую :)
Между запросами в любом случае будет пауза. Хоть короткая, но будет.Nekroido
26.06.2017 20:08По дороге с работы подумал над конкретными примерами применения обоих алгоритмов. Ваш стоит использовать для контроля количества запросов к API в мобильном приложении. Вариант с BlockingCollection я возьму для демона, который отправляет на вебхук агреггированные данные.
Собственно, да, разные цели и разные приоритеты — память или процессорное время. :)
IL_Agent
26.06.2017 13:24На RX можно так попробовать:
//исходная последовательность событий IObservable<MyEvent> source= ...; //ограничения var count = 5; TimeSpan period = TimeSpan.FromSeconds(1); //"обрезанная" последовательность IObservable<MyEvent> cuttedStream = source .WithLatestFrom(Observable.Interval(period).StartWith(-1), (src, timer) => new { src, timer }) .GroupBy(p => p.timer) .SelectMany(p => p.Take(count)) .Select(p => p.src);
MonkAlex
MoreBeauty
Что? Нет. Эти две величины вообще между собой не связаны. Количество операций в секунду это ограничение, выставленное мной для барьера, а количество потоков относится к тестовому методу. Метод запускает проверку на 4х потоках.
И да, 6х4=24 :)