Давным давно, когда деревья компьютеры были большими, а процессоры одноядерными, все приложения запускались в один поток и не испытывали сложностей синхронизации.


Современные же приложения стремятся использовать все имеющиеся ресурсы, в частности, все доступные CPU.


К сожалению, использовать стандартные структуры данных при многопоточной обработке не представляется возможным, поэтому в Java 5 появились потокобезопасные структуры данных,
т.е. функционирующие исправно, при использовании из нескольких потоков одновременно, и расположились они в пакете java.util.concurrent.


Про Vector...

На самом деле, потокобезопасные, но неэффективные, структуры данных, как, например, Vector и Hashtable, появились еще в Java 1.0.
В настоящий момент, они не рекомендуются к использованию.


Однако, не взирая на всю технологическую мощь, заложенную в пакет java.util.concurrent, обработка информации потокобезопасными коллекциями возможна лишь в рамках одного компьютера, а это порождает проблему масштабируемости.


А что если нужно, в реальном времени, обрабатывать информацию о 100 миллионах клиентов,
когда датасет занимает 100Тб, а каждую секунду нужно совершить 100+ тысяч операций?
Вряд ли это возможно, даже на самом крутом современном железе, а если и возможно — только представьте себе его стоимость!


Намного дешевле добиться такой же вычислительной мощности объединив множество обычных компьютеров в кластер.



Остается лишь вопрос межкомпьютерного взаимодействия привычными средствами, схожими по API с потокобезопасными коллекциями из пакета java.util.concurrent и дающими те же гарантии, но не на одном компьютере, а на всем кластере.



Такие возможности и гарантии могут дать распределенные структуры данных.


Рассмотрим некоторые из распределенных структур данных, позволяющих, без особых сложностей, сделать из многопоточного алгоритма распределенный.


Дисклеймер

Рассматриваемые в дальнейших примерах, реализации распределенных структуры данных являются частью функционала распределенного кеша Apache Ignite.


AtomicReference и AtomicLong


IgniteAtomicReference предоставляет compare-and-set семантику.


Предположим, есть 2 компьютера, связаных общей сетью.


Запустим Apache Ignite на обоих (предварительно подключив библиотеки)


// Запустим экземпляр (node) Ignite локально.
// В зависимости от конфигурации, node станет частью кластера хранящего и обрабатывающего данные,
// либо клиентской node, позволяющей иметь доступ к этому кластеру.
Ignite ignite = Ignition.ignite();

// Создадим или получим, ранее созданный, IgniteAtomicReference
// со стартовым значением "someVal"
IgniteAtomicReference<String> ref = ignite.atomicReference("refName", "someVal", true);

На обоих компьютерах попробуем изменить хранимое значение


// Изменим значение если текущее соответствует ожидаемому.
boolean res = ref.compareAndSet("someVal", "someNewVal"); 

// Изменение, в рамках кластера Ignite, произойдет.
// Первый вызов изменит значение, и res будет равно true, 
// Второй вызов получит res равное false, т.к. текущее значение уже не равно "someVal"

Восстановим оригинальное значение


ref.compareAndSet("someNewVal", "someVal"); // Изменение произойдет.

IgniteAtomicLong расширяет семантику IgniteAtomicReference добавляя атомарные increment/decrement операции:


// Создадим или получим, ранее созданный, IgniteAtomicLong.
final IgniteAtomicLong atomicLong = ignite.atomicLong("atomicName", 0, true);

// Выведем инкрементированное значение.
System.out.println("Incremented value: " + atomicLong.incrementAndGet());

Подробная документация: https://apacheignite.readme.io/docs/atomic-types


Примеры на github



AtomicSequence


IgniteAtomicSequence позволяет получать уникальный идентификатор, причем уникальность гарантируется в рамках всего кластера.


IgniteAtomicSequence работает быстрее IgniteAtomicLong, т.к. вместо того чтобы синхронизироваться глобально на получении каждого идентификатора, получает сразу диапазон значений и, далее, выдает идентификаторы из этого диапазона.


// Создадим или получим, ранее созданный, IgniteAtomicSequence.
final IgniteAtomicSequence seq = ignite.atomicSequence("seqName", 0, true);

// Получим 20 уникальных идентификаторов.
for (int i = 0; i < 20; i++) {
  long currentValue = seq.get();
  long newValue = seq.incrementAndGet();  
  ...
}

Подробная документация: https://apacheignite.readme.io/docs/id-generator
Пример на githubIgniteAtomicSequenceExample


CountDownLatch


IgniteCountDownLatch позволяет синхронизировать потоки на разных компьютерах в рамках одного кластера.


Запустим следующий код на 10 компьютерах одного кластера


// Создадим или получим, ранее созданный, IgniteCountDownLatch
// установив значение счетчика в 10
IgniteCountDownLatch latch = ignite.countDownLatch("latchName", 10, false, true);

// Декрементируем счетчик
latch.countDown();

// Дождемся пока countDown() будет вызван 10 раз
latch.await();

В результате, все latch.await() выполнятся гарантированно позже того, как выполнятся все десять latch.countDown().


Подробная документация: https://apacheignite.readme.io/docs/countdownlatch
Пример на githubIgniteCountDownLatchExample


Semaphore


IgniteSemaphore позволяет лимитировать число одновременных действий в рамках одного кластера.


// Создадим или получим, ранее созданный, IgniteSemaphore
// установив значение счетчика в 20
IgniteSemaphore semaphore = ignite.semaphore("semName", 20,  true,  true);

// Получаем разрешение
semaphore.acquire();

try {
    // Семафор захвачен, возможно выполнение кода
}
finally {
    // Возвращаем разрешение
    semaphore.release();
}

Гарантируется, что, одновременно, не более 20 потоков, в рамках одного кластера, будут выполнять код внутри секции try.


Подробная документация: https://apacheignite.readme.io/docs/distributed-semaphore
Пример на githubIgniteSemaphoreExample


BlockingQueue


IgniteQueue предоставляет те же возможности, что и BlockingQueue, но в рамках целого кластера.


// Создадим или получим, ранее созданный, IgniteQueue.
IgniteQueue<String> queue = ignite.queue("queueName", 0, colCfg);

Попытаемся получить элемент из очереди


// Получим первй элемент в очереди
queue.take();

Выполнение приостановится на queue.take() до тех пор пока, в рамках того же кластера, не произойдет добавление в очередь


// Добавим объект в очередь
queue.put("data");

Подробная документация: https://apacheignite.readme.io/docs/queue-and-set
Пример на githubIgniteQueueExample


Вместо заключения


В связи с тем, что статья получилась исключительно обзорной, а многим, наверняка, интересно как же все это работает под капотом — в следующей статье я рассмотрю особенности реализации каждой из распределенных структур данных описанных в данной статье.

Поделиться с друзьями
-->

Комментарии (13)


  1. Crandel
    24.05.2017 14:54

    А что если нужно, в реальном времени, обрабатывать информацию о 100 миллионах клиентов,
    когда датасет занимает 100Тб, а каждую секунду нужно совершить 100+ тысяч операций?
    Вряд ли это возможно, даже на самом крутом современном железе, а если и возможно — только представьте себе его стоимость!

    А не лучше ли использовать Hadoop Map Reduce, Spark или Hive/Pig для таких задач?


    1. randoom
      24.05.2017 15:15
      +1

      Все перечисленные системы накладывают ряд ограничений связанных с парадигмой Map Reduce.
      В случае же с набором, описанным в статье, таких ограничений нет.
      Я бы, даже, сказал что это совершенно разные подходы к решению совершенно разных задач :)

      Например, у вас есть сервер и 1000 бакноматов, вам нужно чтобы все они дождались инициализации сервера.
      Очень простое, с точки зрения дизайна, решение (в одну строку!):
      Вы можете на каждом из них ждать latch.await(), а на сервере сделать latch.countDown().


      1. Crandel
        24.05.2017 15:20

        Тогда вам лучше поменять пример в статье, потому что для 100 Тб датасета и вычислений над ним как раз лучше всего подходит концепция MapReduce и вышеперечисленные технологии.


        1. randoom
          24.05.2017 15:31
          +1

          Я говорю не про вычисления, а именно про операции над данными.
          А операции бывают разные,
          Например, возьмем, все те же, 1000 банкоматов и раз в час нужно разыграть приз — первый кто оплатит на сумму больше тысячи — победитель.
          Подвох в том, что человек должен узнать что он победитель сразу после завершения операции,
          Возможны ли подобные гарантии/способы синхронизации на описанных технологиях?


          1. Crandel
            24.05.2017 15:40
            +1

            Тогда не понятно, зачем это должно обрабатываться одним кодом на сервере и банкоматах? Чем клиент-серверная архитектура не устраивает? Тем более что тысяча банкоматов для одного сервера — вообще не проблема. Может приведете более актуальный пример?


            1. randoom
              24.05.2017 15:47
              +1

              Да, продолжим с 1000 банкоматов.
              С каждого из них может поступить запрос на перевод денег, но заранее известно что можно совершать не более 20 переводов одновременно (ограничение в договоре с платежной системой).
              Представим также, что сервер у нас не один, а 10 и к каждому из них приписано по 100 банкоматов.

              вариант

              semaphore.acquire();
              doPayment(acc1, acc2, amount);
              semaphore.release();

              позволит ограничить нагрузку на платежную систему в соответствии с условиями договора в рамках всей системы.


              1. Crandel
                24.05.2017 15:49
                +2

                Вот теперь стало понятнее, спасибо


              1. krupt
                25.05.2017 12:52

                Немного не нравится мне Ваш пример. Как правило вводят ограничение на количество операций в какой-то промежуток времени.
                Есть в Ignite что-нибудь для ограничения одновременного выполнения в минуту/час/день?


                1. randoom
                  25.05.2017 12:54

                  Да, конечно, как вариант, вы можете использовать IgniteAtomicLong и выполнять операцию только если после инкремента получили число не превышающее заданное.
                  А раз в минуту/час/день сбрасывать его значение в ноль из отдельного потока.


  1. gurinderu
    25.05.2017 13:52
    +2

    А как медленно работает IgniteAtomicSequence на большом кластере?


    1. randoom
      25.05.2017 14:18
      +1

      Если кратко, то — размер кластера не важен, чем больше диапазон значений, выделяемых на один локальный экземпляр IgniteAtomicSequence, тем быстрее, стремясь с скорости простого чтения из памяти.

      Размер кластера не имеет значения, т.к. состояние будет хранится всего на 2-х node (Primary и Backup).
      Важно лишь — сколько будет запросов на изменение состояния в рамках одного кластера в единицу времени.
      Если диапазон значений, выделяемых на одну node, большой — то запросов будет мало, как следствие — не будет контеншена на глобальном изменении состояния IgniteAtomicSequence и получение нового диапазона будет занимать минимум времени.

      Итого, размер кластера не важен, а увеличение числа «клиентов» всегда можно компенсировать увеличением диапазона выделяемых значений.


      1. gurinderu
        25.05.2017 15:13
        +1

        Мне скорее интересно, сколько времени тратится на согласование диапазонов? Ведь несколько нод могут попросить один и тот же диапазон.


        1. randoom
          25.05.2017 15:50
          +1

          Подробнее как это работает я планирую рассказать в следующей статье, но, вкратце:

          Допустим, есть 10 локальных экземпляров IgniteAtomicSequence (по одном у и тому же имени, т.к. привязанные к одному глобальному состоянию).

          При создании каждого экземпляра ему выделается, допустим, 100 идентификаторов.
          Итого, первому дают номера с 0 по 99, второму с 100 до 199 и т.д.

          Глобально хранится информация что было выдано 1000 идентификаторов, т.е. не сколько реально было запрошено, а сколько доступно к выдаче (или уже было выдано) на всех локальных экземплярах.

          И, когда, какой то из локальных экземпляров выдает все свои 100 — он запрашивает новые 100 и глобально фиксируется что было выдано не X, а X+100 (например 1000 -> 1100).
          Для каждого следующего случая окончания диапазона ситуация повторяется.

          Единственным минусом подобной имплементации выступает сложность определения сколько идентификаторов реально используется, а сколько томится в ожидании, но уже отмечено как выданные.