Asterisk — фи, это же моветон


Здравствуйте уважаемые читатели этого замечательного ресурса. По уже сложившейся традиции — являюсь давним читателем habr'а, но только сейчас решил сделать пост. Что, собственно, побудило к написанию? Честно сказать, и сам не знаю. То ли притянутые статьи о производительности FreeSWITCH/Yate/3CX/etc в сравнении с Asterisk, то ли действительные, реальные проблемы архитектуры последнего, а, возможно, желание сделать что-нибудь уникальное.


И что удивительно, в первом случае, как правило, сравнивают мягкое и теплое, так сказать, FreeSWITCH/Yate/etc и FreePBX. Да-да, именно FreePBX. Это не опечатка. Причем интересно, что во всех сравнениях зачастую один Asterisk в дефолтной конфигурации. Ну, вы знаете, эта конфигурация — загруженные все имеющиеся модули, кривой диалплан (FreePBX как бы способствует) и куча остальной необъективщины. Что до родовых болячек Asterisk'а — да, объективно их вагон и маленькая тележка.


Что со всем этим делать? Разрушать стереотипы и исправлять родовые травмы. Этим и займемся.


Скрещиваем ежа с ужом


Многие из новичков испытывают дискомфорт, глядя на синтаксис описания диалплана в Asterisk'е, а некоторые на полном серьезе обосновывают выбор другого сервера телефонии именно необходимостью писать диалплан в том виде, в котором он есть по дефолту. Типа, перелопачивать многострочный XML — это верх комфорта. Да, есть возможность юзать LUA/AEL, и это хорошо. Но лично я отнес бы эту возможность в минусы и в частности то, что касается pbx_lua.


Как уже было сказано, иметь возможность описывать диалплан полноценным языком программирования — это хорошо. Проблема в том, что время жизни скрипта и его окружения равно времени жизни канала. Для каждого нового канала запускается свой экземпляр скрипта, следовательно, прощай, разделяемые между каналами переменные, единичная загрузка сторонних модулей, один разделяемый коннект к базе и т.д., и т.п. Строго говоря, от встраиваемого языка описания сценария этого и не нужно, но уж сильно хочется. А если хочется – значит, надо.


Итак, от классического Asterisk'а возьмем принципы pbx_lua, от Yate возьмем модель маршрутизации, а от FreeSWITCH ничего брать не будем, ибо "overhead" не нужен. ОК, с тем, что нам нужно родить, определились. Что же будем использовать для генетических экспериментов:


  • Asterisk, причем хотелось бы без привязок к версии. Тот же ARI был анонсирован, если мне не изменяет память, с 12-й версии. Если учесть, что до сих пор где то юзаются 1.8/1.6, а возможно и 1.4, то зависимость от версионных плюшек нам не нужна.
  • Lua — замечательный, гибкий и крайне функциональный скриптовый язык. Сам бог велел, так сказать, без комментариев.
  • Lunapark — интересный проект на github'е, своего рода сервер voip-приложений.

Про Lunapark стоит рассказать подробнее. Это сервер, реализующий потенциал AMI-протокола в связке с классическим FastAGI, что немаловажно в едином пространстве выполнения. То есть, получаем аналог ARI посредством тесной кооперации AGI и AMI в одном флаконе.


Предвижу логичный вопрос: для чего это все? Есть же Asterisk REST Interface, чей функционал ты тут пытаешься переизобрести! Ответ на этот вопрос неоднозначен. Согласен, ARI декларирует ряд преимуществ: да, он асинхронен, да, позволяет работать с "сырыми" примитивами, WebSockets и да, стильный, модный, молодежный XML/JSON — куда ж без него. Но, черт возьми, часть этих так называемых преимуществ крайне сомнительна и добавляет один, а то и более уровней абстракции. Другая же часть — вообще не преимущества. Преимущества — это когда что-то свойственно только тебе, ниже мы это увидим на примере той же асинхронности.


Как это работает? Стандартными средствами заворачиваем канал в FastAGI-приложение, внутри которого получаем возможность управлять звонком, как будто юзаем pbx_lua с незначительным изменением синтаксиса. Вишенкой на торте является возможность управлять состоянием самого Asterisk’а и окружением канала, для этого в распоряжении текущего FastAGI-приложения есть глобальный AMI-объект. Кстати, можно не заворачивать канал в FastAGI-приложение, а создать глобальный обработчик события, допустим, для NewChannel. А это уже преимущество по сравнению с ARI, там как известно, вне stasis'а ARI слеп.


Реализован Lunapark в лучших традициях кооперативной многозадачности, а именно всеми любимая асинхронность на сопрограммах. И как следствие отсутствие проблем с "shared data". То есть плюсы присутствуют, но и проблемы появляются. Одна из них — это необходимость описывать логику с оглядкой на асинхронность, но я думаю, это мы как-нибудь переживем.


Что дальше?


Синтаксис описания контекстов — а что же с ним не так? Да все с ним нормально, более того, практику написания диалплана нужно прописывать как профилактику для формирования структурированного мышления у новичков. Но, вместе с тем нужно понижать порог вхождения. Поэтому будем упрощать и в то же время добавлять функционала.


Простой пример:


[test]
exten => _XXX/102,1,Hangup()
exten => _XXX,1,Dial(SIP/${EXTEN})

В этом примере идет дозвон до трехзнака, кроме абонента 102. Вроде бы все логично и лаконично за исключением того, что шаблоны соответствия экстеншена ограничены небольшим набором правил, а так называемая extended маршрутизация возможна только по CallerID звонящего. А хотелось бы, к примеру, по CallerIDName или по текущему состоянию звонящего канала, а возможно по имени самого канала, а если реализовать полноценный regexp, так вообще красота. И да, я знаю, все эти хотелки можно реализовать, расписав контекст в таком виде:


[test]
exten => _XXX/102,1,Hangup()

; по CallerIDName
exten => _XXX,1,ExecIf($[ "${CALLERID(name)}" == "Vasya" ]?Hangup())

; По состоянию канала
exten => _XXX,n,ExecIf($[ "${CHANNEL(state)}" != "Ring" ]?Hangup())

; По имени канала
exten => _XXX,n,ExecIf($[ "${CUT(CUT(CHANNEL,-,1),/,2)}" == "333" ]?Hangup())

exten => _XXX,n,Dial(SIP/${EXTEN})

Но мой внутренний перфекционист начинает бунтовать при виде такого, а если представить аналогичную выборку по всем пользователям, да еще и действия нужны разные и посложнее Hangup'а, то extensions.conf превращается в длииинную портянку вызовов Goto, GoSub, Macro и, не дай бог, с каналами типа Local.


Выход один — прикручивать свои правила маршрутизации с подкидным и дамами с низкой социальной ответственностью.


В качестве примера:


${Exten}:match('%d%d%d')
           and 
(
  ${CallerIDNum}:match('201') or 
  ${CallerIDName}:match('Vasya') or 
  ${State}:lower() ~= 'ring' or 
  ${Channel}:match('^[^/]+/([^%-]+)') == '333'
) => Hangup();

${Exten}:match('%d%d%d') => Dial {callee = ('SIP/%s'):format(${Exten})};

Хм, вырвиглазненько получилось, но на удивление читается и понимается с первого взгляда. А самое главное, что у нас появился аналог regexp'ов и группировка правил на действие, что, несомненно, упростит составление маршрутов в будущем.


Что тут думать, прыгать надо.


В итоге имеем Lunapark как замену pbx_lua. Его средствами нам и нужно создать логику обработки нашей модели маршрутизации. Для начала нужно распарсить набор правил и заменить все вхождения ${...} на соответствующие им значения, то есть привести к виду ('...'). Значения будут браться из окружения текущего канала.
Затем приводим каждое правило к виду условного оператора, чтобы получить нечто похожее:


-- Exten = 123
-- Sate = Ring
-- CallerIDNum = 100
-- CallerIDName = Test
-- Channel = SIP/100-00000012c

if ('123'):match('%d%d%d') and
(
  ('100'):match('201') or
  ('Test'):match('Vasya') or
  ('Ring'):lower() ~= 'ring' or
  ('SIP/100-00000012c'):match('^[^/]+/([^%-]+)') == '333'
) then
  Hangup()
end

if ('123'):match('%d%d%d') then
  Dial {callee = ('SIP/%s'):format(('123'))}
end

Делать это будут две функции fmt и syntax соответственно:


local fmt = function(str, tab)
 return (str:gsub('(%${[^}{]+})', function(w)
  local mark = w:sub(3, -2) 

  return (mark:gsub('(.+)',function(v)
   local out = tab[v] or v

   return ("('%s')"):format(out)
  end))
 end))
end

local syntax = function(str)
 return (str:gsub('([^;]+)=>([^;]+)',function(p,r)
  return ([[ 
   if %s then
    %s
   end
  ]]):format(p,r)
 end))
end

В принципе ничего сложного, все просто и понятно. Идем дальше — считывать наши правила будем из файла в переменную при старте сервиса, а парсить их уже при звонке с актуальным окружением. Считыванием правил займется функция routes.


local routes = function(...)
  local conf, content = ...

  local f, err = io.open(conf, "r")

  if io.type(f) ~= 'file' then
   log.warn(err)  -- Глобальный LOG объект доступный благодаря Lunapark'у
   return ""
  else
   content = f:read('*all')
  end

  f:close() return content
end

Осталось сделать две вещи: завернуть звонки в Lunapark и соответственно их обработать с учетом наших маршрутов. Тут стоит немного пояснить такой момент — в Lunapark вся логика описывается в handler'е. Это текстовый файл, в котором мы будем определять наши FastAGI-приложения и работать с AMI и нашими маршрутами.


Как уже было сказано, объект AMI — глобальный и, помимо роли AMI-клиента, может устанавливать своего рода слушатели для конкретных AMI событий. Этим мы и воспользуемся, но для начала сделаем некоторые приготовления в extensions.conf.


[default]
exten => _[hit],1,NoOp()
exten => _.,n,Wait(5)

exten => _.,1,AGI(agi://127.0.0.1/${EXTEN}${IF($[ "X${PRMS}" != "X" ]?"?${PRMS}")})

Wait(5) в примере выше позволит нам не обрывать канал при завершении FastAGI-приложения, так как в маршрутах может быть описано несколько приложений, а выполнение их осуществляется по средствам Redirect на контекст default по ${EXTEN}.


Таким образом, беря во внимание все выше описанное и помня о кооперативной природе Lunapark'а, попробуем закодить логику обработки маршрутов через FastAGI-приложения.


-- Считываем наши правила в переменную rules
local rules = routes('routes.conf')
-- Очищаем все обработчики, таким образом очистятся только не именные обработчики
-- Это даст возможность не затирать цепочку выполнения при сигналах HUP/QUIT
ami.removeEvents('*')
-- Обработчик события создания нового канала
ami.addEvents {
 ['newchannel'] = function(e)
  -- Условия, только каналы с набором и каналы с контекстом users
  if (e['Context'] and e['Context']:match('users')) and e['Exten'] then
   -- Переменная, указывающая на выполнении, какого FastAGI приложения мы находимся
   local step
   -- Будущий порядковый номер FatsAGI приложения в цепочке выполнения
   local count = 0
   -- Парсим маршруты для текущего окружения канала
   local code, err = loadstring(syntax(fmt(rules,e))) 
   -- В описании маршрутов нет ошибок, двигаемся дальше
   if type(code) == 'function' then
    -- Проксируем будущие FastAGI приложения 
    setfenv(code,setmetatable({indexes = {}},{__index = function(t,k)
     -- Вот они последствия кооперативности
     return coroutine.wrap(
      function(...)
       local prms = {} -- Будущие параметры FastAGI приложения
       local owner = t -- Копия окружения
       local event = e -- Копия таблицы event
       local thread = coroutine.running() -- ID текущей сопрограммы 
       -- Парсим параметры и приводим к виду URI
       for p,v in pairs({...}) do
        if type(v) == 'table' then
         for key, val in pairs(v) do
          table.insert(prms,("%s=%s"):format(key,val))
         end
        else
         table.insert(prms,("%s=%s"):format(p,v))
        end
       end
       -- Если это не первое FastAGI приложение в цепочке
       if step then
        -- Запоминаем предыдущее перед этим
        local last = ("%s"):format(step)
        -- Добавляем ИМЕННЫЕ обработчики события UserEvent по доп. условиям
        -- И записываем в таблицу indexes(в окружении) их порядковые номера
        -- Именные обработчики требуют последующего удаления самостоятельно
        table.insert(owner['indexes'],ami.addEvent('UserEvent',function(evt)
         -- Ловим событие AGIStatus указывающее на завершение приложения
         -- Если это предыдущее перед нами, пробуждаем сопрограмму
         if (evt['Channel'] and evt['Channel'] == event['Channel'])
               and
          (evt['UserEvent'] and evt['UserEvent']:match('AGIStatus'))
               and
          (evt['Script'] and evt['Script'] == last)
         then
          -- Соответствие порядкового номера нашей сопрограмме
          -- В цепочке может быть вызов одного приложения несколько раз
          -- Это позволит выполнять сопрограммы в порядке их определения
          if owner['indexes'][count] == thread then
           if coroutine.status(thread) ~= 'dead' then
            coroutine.resume(thread)
           end
          end
         end
        end,thread))
        -- Устанавливаем маркер текущего FastAGI приложения
        step = k
        -- Приостанавливаем сопрограмму
        coroutine.yield()
       else -- Здесь обрабатывается первое FastAGI приложение в цепочке
        local index -- Индекс для обработчика Hangup события
        -- Устанавливаем маркер текущего FastAGI приложения
        step = k
        -- Добавляем ИМЕННОЙ обработчик события Hangup для канала
        -- В этом месте подчищаем за собой
        index = ami.addEvent('Hangup',function(evt)
         if evt['Channel'] and evt['Channel'] == event['Channel'] then
          -- Удаляем обработчик событие Hangup по ранее запомненному индексу
          ami.removeEvent('Hangup',index)
          -- Удаляем все обработчики цепочек выполнения по индексу
          for _,v in pairs(owner['indexes']) do
           ami.removeEvent('UserEvent',v)
          end
          -- Делаем приятно сборщику мусора
          owner = nil
         end
        end,thread)
       end
       -- По средствам AMI выставляем переменную для канала и вызова в цепочке
       ami.setvar{
        Value = table.concat(prms,'&'),
        Channel = event['Channel'],
        Variable = 'PRMS'
       }
       -- Перенаправляем канал на AGI-приложение через контекст default
       ami.redirect{
        Exten = k,
        Priority = 1,
        Channel = event['Channel'],
        Context = 'default'
       }
       -- Выставляем индекс приложения
       count = count + 1
      end)
    end}))()
   else
    -- Если что-то пошло не так
    log.warn(err)
   end
  end
 end
}

Для тех, кто не смотрел или не понял код, поясню простым языком, что же мы там накодили. Для каждого действия в маршруте создается сопрограмма с копированием всего, чего нужно через замыкания и в приостановленном состоянии. Кроме сопрограммы первого действия, она выполняется как есть. Приостанавливаем мы их для эмуляции последовательного выполнения, то есть строго одна за другой, вторая после первой, третья после второй и т.д.


Стоп, так они и так друг за другом выполняются. На самом деле нет — если не эмулировать синхронность, то асинхронный redirect пробежится по каждому действию и займет это доли секунды. В нашем же коде мы выполняем каждое действие, по наступлению определенного события, а именно завершении предыдущего FastAGI-приложения. Lunapark заботливо генерирует специальный UserEvent по окончании выполнения каждого FastAGI-приложения с соответствующими параметрами — вот на это событие и ориентируемся. Сами же сопрограммы просто редиректят текущий канал в контекст default с экстеншном, равным текущему действию, предварительно установив переменную канала PRMS.


Самое интересное, что звонок после redirect'а придет опять в handler, но уже в контексте выполнения AGI и на соответствующее приложение. В нашем случае это Hangup() и Dial(). Давайте же напишем их для полноты повествования.


function Hangup(...)
  local app, channel = ... -- В этом отличие от pbx_lua

  app.verbose(('The Channel %s does not match by routing rules'):format(channel.get('CHANNEL')))
  app.hangup()
end

function Dial(...)
  local app, channel = ...
  local leg = app.agi.params['callee'] or ''

  app.verbose(('Trying to make a call from %s to %s'):format(
   channel.get('CALLERID(num)'),
   leg:match('^[^/]+/([^%-]+)'))
  )
  app.dial(leg)
end

Ну, вот и все — допрыгались


Итак, давайте подытожим. Что же мы получили в результате этих генетических экспериментов?


  • гибкий, функциональный подход описания маршрутов;
  • возможность создания полнофункциональных VoIP-приложений. Теперь нам не нужно приложение Queue, мы можем его сами написать, не заморачиваясь с созданием своего собственного модуля для asterisk'а;
  • вынесли логику формирования, управления звонками на сторону сервера VoIP-приложений, тем самым сделав из asterisk'а Mediahub, что позволило повысить производительность VoIP-системы в целом;
  • возможность использовать достаточно простой, расширяемый и очень гибкий скриптовый язык для создания VoIP-приложений;
  • расширили возможности интеграции с внешними системами из VoIP-приложений.

Кому как, а мне пока все нравится.


handler целиком
local fmt = function(str, tab)
 return (str:gsub('(%${[^}{]+})', function(w)
  local mark = w:sub(3, -2) 

  return (mark:gsub('(.+)',function(v)
   local out = tab[v] or v

   return ("('%s')"):format(out)
  end))
 end))
end

local syntax = function(str)
 return (str:gsub('([^;]+)=>([^;]+)',function(p,r)
  return ([[ 
   if %s then
    %s
   end
  ]]):format(p,r)
 end))
end

local routes = function(...)
  local conf, content = ...

  local f, err = io.open(conf, "r")

  if io.type(f) ~= 'file' then
   log.warn(err)  -- Глобальный LOG объект доступный благодаря Lunapark'у
   return ""
  else
   content = f:read('*all')
  end

  f:close() return content
end

-- Считываем наши правила в переменную rules
local rules = routes('routes.conf')
-- Очищаем все обработчики, причем таким образом очистятся только неименные обработчики событий
-- Это даст возможность не затирать цепочку выполнения при сигналах HUP/QUIT
ami.removeEvents('*')
-- Обработчик события создания нового канала
ami.addEvents {
 ['newchannel'] = function(e)
  -- Условия, только каналы с набором и каналы с контекстом users
  if (e['Context'] and e['Context']:match('users')) and e['Exten'] then
   local step -- Переменная, указывающая на выполнении, какого FastAGI приложения мы находимся
   local count = 0 -- Будущий порядковый номер FatsAGI приложения в цепочке выполнения
   -- Парсим маршруты для текущего окружения канала
   local code, err = loadstring(syntax(fmt(rules,e))) 
   -- В описании маршрутов нет ошибок, двигаемся дальше
   if type(code) == 'function' then
    -- Проксируем будущие FastAGI приложения 
    setfenv(code,setmetatable({indexes = {}},{__index = function(t,k)
     -- Вот они последствия кооперативности
     return coroutine.wrap(
      function(...)
       local prms = {} -- Будущие параметры FastAGI приложения
       local owner = t -- Копия окружения
       local event = e -- Копия таблицы event
       local thread = coroutine.running() -- ID текущей сопрограммы 
       -- Парсим параметры и приводим к виду URI
       for p,v in pairs({...}) do
        if type(v) == 'table' then
         for key, val in pairs(v) do
          table.insert(prms,("%s=%s"):format(key,val))
         end
        else
         table.insert(prms,("%s=%s"):format(p,v))
        end
       end
       -- Если это не первое FastAGI приложение в цепочке
       if step then
        -- Запоминаем предыдущее перед этим
        local last = ("%s"):format(step)
        -- Добавляем ИМЕННЫЕ обработчики события UserEvent по доп. условиям
        -- И записываем в таблицу indexes(в окружении) их порядковые номера
        -- Именные обработчики требуют последующего удаления самостоятельно
        table.insert(owner['indexes'],ami.addEvent('UserEvent',function(evt)
         -- Ловим событие AGIStatus указывающее на завершение приложения
         -- Если это предыдущее перед нами, пробуждаем сопрограмму
         if (evt['Channel'] and evt['Channel'] == event['Channel'])
               and
          (evt['UserEvent'] and evt['UserEvent']:match('AGIStatus'))
               and
          (evt['Script'] and evt['Script'] == last)
         then
          -- Соответствие порядкового номера нашей сопрограмме
          -- В цепочке может быть вызов одного приложения несколько раз
          -- Это позволит выполнять сопрограммы в порядке их определения
          if owner['indexes'][count] == thread then
           if coroutine.status(thread) ~= 'dead' then
            coroutine.resume(thread)
           end
          end
         end
        end,thread))
        -- Устанавливаем маркер текущего FastAGI приложения
        step = k
        -- Приостанавливаем сопрограмму
        coroutine.yield()
       else -- Здесь обрабатывается первое FastAGI приложение в цепочке
        local index -- Индекс для обработчика Hangup события
        -- Устанавливаем маркер текущего FastAGI приложения
        step = k
        -- Добавляем ИМЕННОЙ обработчик события Hangup для канала
        -- В этом месте подчищаем за собой
        index = ami.addEvent('Hangup',function(evt)
         if evt['Channel'] and evt['Channel'] == event['Channel'] then
          -- Удаляем обработчик событие Hangup по ранее запомненному индексу
          ami.removeEvent('Hangup',index)
          -- Удаляем все обработчики цепочек выполнения по индексу
          for _,v in pairs(owner['indexes']) do
           ami.removeEvent('UserEvent',v)
          end
          -- Делаем приятно сборщику мусора
          owner = nil
         end
        end,thread)
       end
       -- По средствам AMI выставляем переменную для канала и вызова в цепочке
       ami.setvar{
        Value = table.concat(prms,'&'),
        Channel = event['Channel'],
        Variable = 'PRMS'
       }
       -- Перенаправляем канал на AGI-приложение через контекст default
       ami.redirect{
        Exten = k,
        Priority = 1,
        Channel = event['Channel'],
        Context = 'default'
       }
       -- Выставляем индекс приложения
       count = count + 1
      end)
    end}))()
   else
    -- Если что-то пошло не так
    log.warn(err)
   end
  end
 end
}

function Hangup(...)
  local app, channel = ... -- В этом отличие от pbx_lua

  app.verbose(('The Channel %s does not match by routing rules'):format(channel.get('CHANNEL')))
  app.hangup()
 end

function Dial(...)
  local app, channel = ...
  local leg = app.agi.params['callee'] or ''

  app.verbose(('Trying to make a call from %s to %s'):format(
   channel.get('CALLERID(num)'),
   leg:match('^[^/]+/([^%-]+)'))
  )
  app.dial(leg)
 end