Честно признаюсь, я долго думал, стоит ли публиковать этот материал. Для тех, кто умеет работать с AMI Asterisk, ничего интересного тут нет. Для тех, кто только начинает что-то делать, вряд ли разберётся в моём коде (хотя я старался писать понятно). Вангую комментарии вроде: «Зачем использовать Хабр для своих заметок?». С другой стороны, приведённый под катом скрипт может стать кому-то отправной точкой. Скрипт ничего не делает кроме того, что шлёт в консоль все события из AMI и умеет их фильтровать. Для примера, я показываю в консоле все звонки, которые попадают в любой из контекстов «zadarma-in» или «sibseti_in». Если заинтересовал, прошу под кат:

Появилась задача в режиме реального времени смотреть, с какого транка пришел звонок, на какие кнопки в ivr нажал пользователь, кто ответил на звонок и т.д. Давно хотел попробовать поработать с AMI из Python, до этого у меня был небольшой опыт работы из Bash и то для организации обратного звонка.

Покопавшись с различными готовыми библиотеками быстро пришло понимание, что ни одна из них мне не подходит. В итоге был изобретён свой «велосипед» в виде скрипта, который всю информацию из AMI отдаёт в json. Используются стандартные библиотеки Python. Плюс в том, что в таком виде легко получить и распарсить любые события и не потерять привязку к конкретному звонку.

Первый скрипт печатает только те события, которые попали в любой из контекстов «zadarma-in» или «sibseti_in».

Скрипт№1
import telnetlib
import json
import re

##
HOST = "192.168.10.10"
PORT = "5038"
user = "zabbix"
password = "password"
##

tn = telnetlib.Telnet(HOST,PORT)
tn.write("Action: login".encode('ascii') + b"\n")
username = "Username: " + user
tn.write(username.encode('ascii') + b"\n")
passWord = "Secret: " + password
string_NOW = ''
string_out = ''
cd = 0

tn.write(passWord.encode('ascii') + b"\n\n")

def telnet_for_string(string):
    global string_out
    string_out_def = ''
    for mes in string:
        try:
            if string[mes]['Context'] == 'zadarma-in' or string[mes]['Context'] == 'sibseti_in' or string[mes]['Context'] == 'IVR':
                Uniqueid = string[mes]['Uniqueid']
                CallerIDNum = string[mes]['CallerIDNum']
                Exten = string[mes]['Exten']
                CallerIDName = string[mes]['CallerIDName']
                try:
                    Digit = string[mes]['Digit']
                except KeyError:
                    Digit = ''
                # if Exten == 's' or Exten == 'h':
                #     Exten = ''
                # Context = string[mes]['Context']
                string_out_def = json.dumps({'Uniqueid': Uniqueid,
                           'CallerIDNum':CallerIDNum,
                           'CallerIDName':CallerIDName,
                           'Exten':Exten,
                           'Digit':Digit})

                # print(string_out_def)
        except UnboundLocalError:
            1+1
        except KeyError:
            1+1
    # print(string_out_def)
    if string_out_def:
        if string_out_def != string_out:
            print(string_out_def)

        string_out = string_out_def


while True:
    string = ''
    event_string = ''
    elements_string = ''
    c = 0

    read_some = tn.read_some()  # Получаем строчку из AMI

    string = read_some.decode('utf8', 'replace').replace('\r\n', '#')   # Декодируем строчки и заменяем переносы строк на #
    # print(string)

    # Отлавливаем начало строки и склеиваем строчку
    if not string.endswith('##'):
        string_NOW = string_NOW + string
        # print('1 --->',string_NOW)

    # Если строчка закончилась, то доклеиваем конец строки и
    # совершаем магию, которая двойной перенос строки в середине строки заменит на $,
    # а все одинарные переносы заменит на #, так-же удалим кавычки и обратные слеши
    if string.endswith('##'):
        string_NOW = string_NOW + string
        string_NOW = string_NOW.replace('##', '$')  # заменяем двойной перенос строки на $
        string_NOW = string_NOW.replace('\n', '#')  # Заменяем  перенос на #
        string_NOW = string_NOW.replace('\r', '#')  # Заменяем  перенос на #
        string_NOW = string_NOW.replace('"', '')    # Удаляем кавычки
        string_NOW = string_NOW.replace('\\', '')   # удаляем обратный слеш

        # print('string_NOW -->',string_NOW)
        # print()

        # Делим полученую строчку на Евенты т.к. двойной перенос как раз её так и делил
        events = re.findall(r'[A-Z][\w]+:\s[^$]+', string_NOW)
        for event in events:
            c+=1
            # print('event ---> ',event)

            event_elements = re.findall(r'[A-Z][\w]+:\s[^#]+', event)   # А тут делим евенты на елемены
            for element in event_elements:
                element = '\"' + element.replace(': ', '\": "') + '\", '# Вручную делаем словарь
                # print('element', element)
                elements_string = elements_string + element # Склеиваем строчки обратно, получаем словарь
            # event_string = event_string + '\"' + elements_string.split(':')[1].split(',')[0].replace('"','') + '\": ' + '{' + elements_string + '}'
            # print(elements_string)
            # print(str(elements_string.split(':')[1].split(',')[0]))

            # собираем обратно евенты попутно формирую json:
            event_string = event_string + '\"' + str(c) + '\": ' + '{' + elements_string + '}'
            event_string = event_string.replace('}{', '},{')    #   Добавляем запятую между евентами
            event_string = event_string.replace(', }', '}, ')   #
        event_string = '{' + event_string + '}'
        event_string = event_string.replace('}, }', '}}')

        # Превращаем полученую строчку в json, если вдруг есть ошибка в синтаксисе json, то выводим как сам невалидный
        # json, так и строчку  из которой не получилось его собрать.
        try:
            parsed_string = json.loads(event_string)
        except json.decoder.JSONDecodeError:
            print('#############################################', '\n\n\n')
            print(event_string, '\n\n\n')
            print(string_NOW, '\n\n\n')
            print('#############################################', '\n\n\n')

        # print(event_string)
        # print(parsed_string['1'])
        # Отправляем полученую строчку в функуию "telnet_for_string", в которой уже можно обработать полученую строчку.
        telnet_for_string(parsed_string)
        string_NOW = '' # Очищем строчку


И второй скрипт, который пишет в консоль все события, посмотрев на оба скрипта станет понятно что надо менять, чтоб достигнуть нужного результата. Если не совсем понятно, то парсить надо json «string[mes]» в функции «def telnet_for_string(string)»:

Скрипт №2
import telnetlib
import time
import json
import re

##
HOST = "192.168.10.10"
PORT = "5038"
user = "zabbix"
password = "password"
##

tn = telnetlib.Telnet(HOST,PORT)
tn.write("Action: login".encode('ascii') + b"\n")
username = "Username: " + user
tn.write(username.encode('ascii') + b"\n")
passWord = "Secret: " + password
string_NOW = ''
string_out = ''

tn.write(passWord.encode('ascii') + b"\n\n")


def telnet_for_string(string):
    for mes in string:
        print(string[mes])

while True:
    # time.sleep(0.1)
    string = ''
    event_string = ''
    elements_string = ''
    c = 0

    read_some = tn.read_some()

    string = read_some.decode('utf8', 'replace').replace('\r\n', '#')
    # print(string)

    if not string.endswith('##'):
        string_NOW = string_NOW + string


    if string.endswith('##'):
        string_NOW = string_NOW + string
        string_NOW = string_NOW.replace('##', '$')
        string_NOW = string_NOW.replace('\n', '#')
        string_NOW = string_NOW.replace('\r', '#')
        string_NOW = string_NOW.replace('"', '')
        string_NOW = string_NOW.replace('\\', '')


        events = re.findall(r'[A-Z][\w]+:\s[^$]+', string_NOW)
        for event in events:
            c+=1

            event_elements = re.findall(r'[A-Z][\w]+:\s[^#]+', event)
            for element in event_elements:
                element = '\"' + element.replace(': ', '\": "') + '\", '
                elements_string = elements_string + element

            event_string = event_string + '\"' + str(c) + '\": ' + '{' + elements_string + '}'

            event_string = event_string.replace('}{', '},{')
            event_string = event_string.replace(', }', '}, ')
        event_string = '{' + event_string + '}'
        event_string = event_string.replace('}, }', '}}')
        try:
            parsed_string = json.loads(event_string)
        except json.decoder.JSONDecodeError:
            print('#############################################', '\n\n\n')
            print(event_string, '\n\n\n')
            print(string_NOW, '\n\n\n')
            print('#############################################', '\n\n\n')


        telnet_for_string(parsed_string)
        string_NOW = ''


Комментарии (1)


  1. Tihon_V
    11.07.2018 13:29

    Чем обусловлен выбор AMI вместо ARI?
    Полтора года назад писал аналогичный логгер используя ARI — там это проще выглядело:
    1. Подключится к WS.
    2. Отправить несколько HTTP-запросов (информацию о устройствах, каналах, пирах на которые следует подписатся).
    3. Ответ будет приходить в виде JSON по WS.