image


Всё больше набирает популярность паттерн использования каналов при создании
многопоточных приложений. Идея не нова, ее дизайн заложен ещё в далёком 1978 году
в виде CSP.Наиболее известная реализация сейчас повсеместно используется в Golang.


Мы же в статье рассмотрим реализацию CSP в core.async для Clojure, если интересно, добро пожаловать под кат.



В статье будут рассмотрены простые и базовые практики для работы с core.async, описанного в статье будет достаточно для хорошего старта в многопоточное программирование.


В отличии от Golang где парадигма работы с потоками через каналы встроена в сам язык, core.async является просто библиотекой для Clojure, если вам импонирует другая парадигма, то выбор есть: pulsar, promesa, manifold


При этом core.async и promesa можно также использовать на стороне браузера в ClojureScript, естественно в этом случае ни о какой многопоточности говорить не приходится, так как все это добро компилируется в ES5 и исполняется в браузере, но знакомый интерфейс и удобная работа с асинхронностью может хорошо послужить.


Так что же нам дает core.async? Если объяснять на пальцах то core.async нам предоставляет диспетчеризацию через go-блоки в свой фиксированный Thread Pool состоящий из 8 тредов (размер Thread Pool можно менять через специальную опцию). При поступлении сообщения в канал, core.async сам найдет свободный поток, и передаст ему задачу, либо поставит сообщение в очередь. Кто впервые слышит про Thread Pool можно почитать хорошую заметку по паттерну Worker Thread




Пример № 1


(defonce log-chan (chan))

(defn loop-worker [msg]
  (println msg))

(go-loop []
  (let [msg (<! log-chan)]
    (loop-worker msg)
    (recur)))

В примере выше, мы создали канал log-chan, и определили функцию loop-worker которая будет обрабатывать сообщения из канала.Затем создали go-block с бесконечным циклом, поместив туда наш loop-worker.Теперь мы можем отправить данные в канал: (>!! log-chan "привет")


Функция loop-worker была вынесена отдельно за go-block умышленно, ради удобной ее отладки через REPL.


Само тело go-loop так как это макрос запекается где то внутри core.async, и перекомпиляция его на лету в REPL носит странный характер, поэтому обработчик проще вынести отдельно и жить спокойно.


Тут стоит заметить что никакого бесконечного цикла в привычном его понимании go-loop не делает.
После получения сообщения, происходит разовое выполнения функции обработчика, а затем go-block паркуется функцией <! которая будет ждать нового сообщения. Таким образом можно создавать сколько угодно много каналов и обработчиков к ним.


В пределах go-блока функция чтения из канала <! осуществляет парковку потока.
За пределами go блока есть возможность использовать для чтения из канала функцию <!! которая блокирует основной поток до получения сообщения. Поведение <!! можно сравнить с функцией await в ES7.


Parking go блока, это термин core.async означающий, что поток освобожден, и доступен для других задач. Также существует термин blocking, который означает что поток будет непосредственно заблокирован и недоступен для новых задач до его освобождения.


В примере №1 есть изъян, если в loop-worker будет вызван Exception, то произойдет прерывание выполнения формы, и (recur) никогда не будет вызван, следовательно ожидание данных из канала log-chan прекратится, исправим это в примере № 2.


Пример № 2


(defonce log-chan (chan))

(defn loop-worker [msg]
  (throw (Exception. "my exception message")))

(go-loop []
  (let [msg (<! log-chan)
        res (try
              (loop-worker msg)
              :ok
              (catch Exception e
                (println (.getMessage e))
                :error))]
    (recur)))

В этом примере мы обернули весь вызов loop-worker в форму try, а переменная res, будет содержать флаг, сообщающий об успешном выполнении формы или же об ошибке. Этот флаг может пригодится, например, если мы захотим закрыть канал в случае ошибки. Рабочий пример этого подхода можно посмотреть тут


Пример № 3


  (let [c1 (go (<! (timeout (rand-int 1000))) 5)
        c2 (go (<! (timeout (rand-int 1000))) 7)]
    (go (let [v1 (<! c1)
              v2 (<! c2)]
          (println {:v1 v1
                    :v2 v2
                    :summ (+ v1 v2)}))))

Данный пример будет ждать результата от всех асинхронных операций перечисленных в блоке let. Эта практика очень удобна для решения проблемы callback hell в JavaScript, и очередной повод порадоваться что это можно использовать на стороне браузера в лице ClojureScript.


Пример № 4


(defn upload 
 "upload emulator"
  [headshot c time]
  (go (Thread/sleep time)
      (>! c headshot)))

  (let [c1 (chan) c2 (chan)]
    (upload "pic1.jpg" c1 30)
    (upload "pic2.jpg" c2 40)
    (let [[headshot channel] (alts!! [c1 c2 (timeout 20)])]
      (if headshot
        (println "Sending headshot notification for" headshot)
        (println "Timed out!"))))

В этом примере мы создали функцию upload эмулирующую асинхронную операцию, в данном случае загрузку файла. Последним аргументом upload, принимает время задержки в миллисекундах. С помощью функции alts!!! мы можем получить первый же результат, который нам вернет один из перечисленных в векторе каналов. В нашем векторе, последним каналом идет (timeout 20), этот канал нам вернет результат через 20 миллисекунд, и это будет первым значением которое будет записано в переменную headshot и будет продолжено выполнение формы. Таким образом данный пример эмулирует установку времени на timeout, в течении которого мы будем ждать выполнения набора асинхронных операций.


Пример № 5


(def ping (chan))
(def pong (chan))

(go-loop []
  (let [msg (<! ping)]
    (when (= msg :ping)
      (println msg)
      (>! pong :pong)
      (Thread/sleep 1000))
    (recur)))

(go-loop []
  (let [msg (<! pong)]
    (when (= msg :pong)
      (println msg)
      (>! ping :ping)
      (Thread/sleep 1000))
    (recur)))

(>!! ping :ping)

Пример общения двух каналов, классический Ping-Pong.


Это был последний пример который я хотел показать. Отдельно так же стоит выделить наличие в clojure типов данных, созданных специально для записи туда информации в несколько потоков, это atom и agent а также общую иммутабельность остальных типов, всё это очень облегчает жизнь разработчика при разработке многопоточного приложения.




Полезные ссылки:


» http://clojure.com/blog/2013/06/28/clojure-core-async-channels.html
» https://github.com/clojure/core.async
» https://github.com/clojure/core.async/wiki/Getting-Started
» http://www.braveclojure.com/core-async/
» http://go.cognitect.com/core_async_webinar_recording
Поделиться с друзьями
-->

Комментарии (9)


  1. rotor
    15.10.2016 22:41
    +2

    мене, мене, текел, упарсин


  1. dagrotar
    15.10.2016 23:04

    Перед мной стоит задача запустить несколько потоков слежения за некими сущностями в сети. Сущностей может быть много(10,20,100). А частота опроса не должна сильно плавать. При всем при этом мы хотим этим сущностям посылать команды и контролировать результат выполнения команды.
    Первое желание сделать это через цикл в го форме с двумя каналами (in/out). Но так как они будут выполняться в общем пуле потоков мы можем получить залипание отдельных циклов.
    Можно запускать циклы в future (или вовсе оборачивать в Thread) а каналы реализовывать через общие атомы. Вот только в таком случае нам придется часть функционала core.async переписывать своими ручками.
    Вот и как быть? :)


    1. seryh
      15.10.2016 23:19

      В идеале ваши "сущности" стоило бы научить самих связываться с сервером. Блокировать долгими операциями go-блок не рекомендуется. Но можно найти баланс, создавайте каналы относительно количества "сущностей", например 1 канал на 10 "сущностей". Как описано в статье, количество каналов может быть любым, никакого залипания не будет, задания которым не хватило свободных тредов будут поставлены в очередь, главное что бы эта очередь своевременно просасывалась, если не хватает тредов можно посмотреть в сторону ограничения времени опроса по таймауту, либо увеличивать количество тредов и серверные мощности.


    1. zzzcpan
      16.10.2016 03:43
      +1

      Смотрите в сторону actor model. Okku или что там в Clojure. С CSP это ад будет, оно только для простеньких вещей годится.


  1. erlyvideo
    16.10.2016 12:15
    +1

    В примере №3 фраза «callback hall» звучит как попытка перестать бороться с нерешаемой проблемой и начать идолизировать её


    1. seryh
      16.10.2016 12:23

      У всех бывают опечатки, личка — отличное место куда можно сообщить о них.


      1. erlyvideo
        16.10.2016 23:40

        но уж очень клевая опечатка =)


  1. rageOfAxe
    16.10.2016 14:47

    Ого, привет, Олег))
    Вы кроме тестов нигде кложуру не протолкнули ещё?


    1. seryh
      16.10.2016 14:48

      привет, тесты + 2 небольших сервиса в продакшене