Ругает VoIP, discord и др. за фатальный недостаток
Ругает VoIP, discord и др. за фатальный недостаток

Пару лет назад, когда все эти гитхабры для меня были птичьим пением, а делать мне было нечего — писал я значит небольшие проекты на python. Среди них был простенький голосовой чат на двоих через TCP. Но вот я нашел его и захотел допилить. Лучший способ допилить что‑то — это придумать заново. Всё что вы прочитаете далее — есть мой гайд по изготовлению велосипеда и не претендует на звание полноценного презентабельного проекта.


Вот основные логические поинты предстоящего проекта, на основании ошибок старого и полученного с того момента мной опыта:

  • Клиент делится на две независимых части:

    • 1. Sender (I) читает микрофон и отправляет на адреса получателей из списка

    • 2. Receiver (O) получает все аудиоданные и выводит их в наушники

  • Ты можешь отправить свой голос кому угодно, но не можешь случайно подслушать чужой.

  • Должно быть удобство использования при большом количестве пользователей

  • Рассчитываю на подобную схему:

    • Ты слышишь всё, что тебе приходит (но желательно иметь возможность отключать звук конкретных пользователей aka черный список)

    • Ты можешь отправить свой голос кому угодно, если знаешь его IP адрес.

TCP или UDP?

Интересующие меня в данном случае характеристики:

  • TCP:

    • + Сразу понятно как суммировать аудиопоток от множества людей - просто накладывать данные со всех сессий.

    • - Есть необходимость создавать по сессии с каждым участником - не удобно когда их много.

  • UDP:

    • + Не нужно создавать сессии (просто кидаешь данные напрямую по адресу)

    • - При получении данных не известно от какого кол-ва участников они приходят (все вперемешку)

UDP более схож с моим концептом и удобен когда участников много - его и реализуем.

Надёжный план работы voicechata через UDP
Надёжный план работы voicechata через UDP

Поскольку мы настроены серьёзно, наш проект будет солидно использовать venv.

Подготовка среды

mkdir pycord
cd pycord
python -m venv venv
venv\scripts\activate

pip install numpy
pip install pyaudio
pip install soundfile

Cоздаем папку, окружение, создаем один файлик PyCord.py.

Спойлер

Серьёзно, никаких, api, тестов — фулстак вам пухом.

Импортируем то, что точно понадобится.

import io
import pyaudio
import socket
import numpy as np
import soundfile as sf
from threading import Thread

Подготавливаем IO для аудиоданных.

Pyaudio — самая изящная библиотека работы со звуковыми устройствами, которую я только встречал. Создаешь один объект с буферами ввода и вывода — пишешь, читаешь, все интуитивно понятно.

CHUNK=2048
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
p = pyaudio.PyAudio()

stream=p.open(format=FORMAT, # Этот объект будет обеспечивать нам IO голоса
            channels=CHANNELS,
            rate=RATE,
            input=True,    
            output=True,
            frames_per_buffer=CHUNK)

Сжатие аудио кодеком

Я импортировал soundfile - библиотеку для работы с файлами MP3, FLAC, WAV, OGG и другими. Можно было бы так всё и оставить: для кодировки писать в файл нужного типа, пересылать его, а потом читать из файла заданного типа. Но это путь безумия.

Работа с файлами чревата работой с памятью на жестком диске - но мы же не хотим что бы весь аудиопоток гнался через диск, верно?

Для этого я реализовал энкодер и дэкодер, которые подсовывают soundfile io.BytesIO - ведь это file like object.

def RAW_2_OGG(raw_chunk):
  byte_io = io.BytesIO()
  signal = np.frombuffer(raw_chunk,dtype=np.float32)
  

  sf.write(byte_io, signal, RATE,format='OGG') 
  
  return bytes(byte_io.getbuffer( ))


def OGG_2_RAW(ogg_chunk):
  byte_io = io.BytesIO()
  byte_io.write(ogg_chunk)
  byte_io.seek(0)
  
  data, samplerate = sf.read(byte_io)
  
  return np.float32(data)

Soundfile сжимает голос в ~4 раза неявно используя Vorbis кодек (брат Opus, используемого в Discord)

На самом деле, чем больше размер одного чанка, тем эффективнее сжатие, но при этом растёт задержка.

Функции выглядят не сильно громоздко, верно? Но на их реализацию у меня ушло более 6 часов экспериментов - как на всё остальное вместе взятое. Но четырёхкратное сжатие того стоило.

У аудиоданных есть огромное количество представлений, а в документациях пишут что-то уровня "raw audio" или "numpy array"... дальше гадай сам.

Sender (Audio stream -> IPs from list)

Псевдокод:

  1. Читаем кусок данных (далее чанк) из аудиопотока.

  2. Отправляем каждому из списка clients, полученному из get_clients() — об этом далее.

  3. Повторяем до бесконечности пункты 1 и 2.

def Sender():
    UDP_PORT = 6006

    sock = socket.socket(socket.AF_INET,
                        socket.SOCK_DGRAM)
    while True:
        clients=get_clients() # Реализуем дальше
        data = stream.read(CHUNK, exception_on_overflow = False)
        data=RAW_2_OGG(data)
        for addr in clients:
            sock.sendto(data, (addr, UDP_PORT))

Receiver (Internet -> Audio stream)

Есть один нюанс, recvfrom блокирует выполнение до получения чанка данных, в диалоге двух человек проблем не будет:

def Receiver():
    me = "127.0.0.1"
    UDP_PORT = 6006

    sock = socket.socket(socket.AF_INET,
                        socket.SOCK_DGRAM)
    sock.bind((me, UDP_PORT))

    while True:
        num = get_num() # Не реализуем дальше (изменю идею)
        for i in range(num):
            new = sock.recvfrom(CHUNK*10)[0]
            new=OGG_2_RAW(d)
            if i == 0:
                data = new.copy()
            else:
                data += new
        if num != 0:
            print(data.shape)
            stream.write(data.tobytes())
  • человек №1 записал чанк за 1/n секунды и отправил его.

  • человек №2 получил его и воспроизводит в течение 1/n секунды.

Но, если принимать чанки сразу от двух человек:

  • человек №1 записал чанк №1 за 1/n секунды и отправил его

  • человек №2 записал чанк №2 за 1/n секунды и отправил его

  • человек №3 получает их последовательно и воспроизводит в течение 2*1/n секунды

  • Получается, что человек №3 тормоз! Его буфер получения растёт, он должен уметь воспроизводить чанки одновременно.

В случае с аудиоданными всё просто: суммируем их как массивы numpy.

Для решения этой проблемы отключаем блокировку recvfrom так: sock.setblocking(0)

  1. Суммируем чанки пока не выпадает исключение о пустоте буфера.

  2. Полученный наложенный звук с чистой совестью пишем в аудиопоток.

def Receiver():
    me = "127.0.0.1"
    UDP_PORT = 6006

    sock = socket.socket(socket.AF_INET,
                        socket.SOCK_DGRAM)
    sock.setblocking(0)
    sock.bind((me, UDP_PORT))


    while True:
        c=0
        first=True
        while True:
            blacklist=get_black() # Реализуем дальше
            try:
                d,addr=sock.recvfrom(CHUNK*10)
                if addr[0] in blacklist:
                    continue
                new=OGG_2_RAW(d)
                c+=1
                if first:
                    data=new.copy()
                    first=False
                else:
                    data+=new
            except:
                break
        if not first:
            stream.write(data.tobytes())

Как вы могли заметить, функции используют get_black() для ЧС и get_clients() для списка адресов отправки своего голоса

Getters & setters для динамического изменения clients, blacklist

black,clients=[],[]

def set_black(s):
    global black
    black = s

    
def get_black():
    global black
    return black

    
def set_clients(s):
    global clients
    clients = s

    
def get_clients():
    global clients
    return clients

Как способ общения с Reciver и Sender

sender = Thread(target=Sender, args=())
sender.start()

recv = Thread(target=Receiver, args=())
recv.start()

Обе функции запущены как потоки, и теперь работают автономно, как теперь контролировать их поведение?

set_black(["192.168.0.1"]) # Изменится количество принимаемых аудиопотоков на 2
set_clients(["127.0.0.1","192.168.0.15"]) #Теперь аудио отправляется на оба адреса

Подобная схема была выбрана, для удобства реализации взаимодействия с интерфейсом.

О нём сейчас и поговорим.

UI

Можно было бы идти по знакомой дорожке, именуемой Tk, или схожей, но чуть менее популярной Qt, можно было даже упороться в стиль с DearPyGui, но это всё заезжено и банально.

И тут я подумал: А почему бы не сделать веб-интерфейс?. Первое что пришло в голову - Gradio.

Просто было интересно сделать что-то с веб фронтом, а запариваться с отдельным фронтэндом на JS и даже api не уместно, если делаешь что-то десктопное вроде моего текущего клиента.

За следующие пару минут я не передумал - значит настрой серьёзен как никогда.

pip install gradio
import gradio as gr
Спойлер

Думаю, всё получиться, так ещё и возможность управления PyCord c телефона или другого устройства, по веб майке Gradio.

Gradio позволяет конструировать webui для своего backendа прямо в его коде, при помощи интуитивно понятного конструктора.

WebUI будет иметь две вкладки:

  • Отправлять

    • Слайдер выбора кол-ва адресов

    • Динамическое количество полей ввода под эти адреса

  • Чёрный список

    • То же самое для конфигурации чёрного списка

import gradio as gr

def variable_outputs(k):
    k = int(k)
    return [gr.Textbox.update(visible=True)]*k + [gr.Textbox.update(visible=False)]*(max_textboxes-k)

  
max_textboxes = 10
with gr.Blocks() as demo:
    gr.Markdown("PyCord")

    with gr.Tab("Отправлять"):
        snd = gr.Slider(0, max_textboxes, value=10, step=1, label="Адреса получателей")
        textboxes = []
        for i in range(max_textboxes):
            t = gr.Textbox(f"127.0.0.1")
            textboxes.append(t)
        snd.change(variable_outputs, snd, textboxes)
        button = gr.Button("Начать отправку")
        button.click(lambda *s:set_clients(s[:s[-1]]),textboxes+[snd])

    with gr.Tab("Чёрный список"):
        snd2 = gr.Slider(0, max_textboxes, value=10, step=1, label="Блокировать входящие с адресов")
        textboxes2 = []
        for i in range(max_textboxes):
            t = gr.Textbox(f"127.0.0.1")
            textboxes2.append(t)
        snd2.change(variable_outputs, snd2, textboxes2)
        button2 = gr.Button("БАН")
        button2.click(lambda *s:set_black(s[:s[-1]]),textboxes2+[snd2])

"Gradio is the fastest way to demo your machine learning model with a friendly web interface so that anyone can use it, anywhere!" - я знал на что шёл, честно

Sender
Sender
Receiver
Receiver

Делать Webui к этому велосипеду была дикость, знаю. Зато можно управлять клиентом на пк из браузера на телефоне!

как кроссплатформенность - но нет
как кроссплатформенность - но нет

Напоминаю, что клиент всё еще работает локально на ПК, а это «интерфейс на вынос».

На телефоне интерфейс даже не выглядит пустым.

Но есть одна проблема, которая перекрывает всё удобство использования gradio интерфейсов на телефоне — это случайные нажатия.

Ты просто проводишь пальцем мимо слайдера/кнопки и они могут на это среагировать... Но ощущается это только тогда, когда интерфейс не вмещается в один экран телефона, и ты вынужден перематывать его свайпами.

Всё работает, но

Но кажется я понял — почему gradio не используют в подобных кейсах: оно очень, очень медленное

Оно долго обновляет фреймы, оно очень долго стартует. Этот интерфейс максимально неотзывчивый, он ловит случайные нажатия при свайпах на телефоне. При всём этом переделать под Tk не составит труда (изначальная версия была именно под него, но потом я решил сделать нечто необычное).

Изначальный прототип на TCP
Полный код текущего проекта
import io
import pyaudio
import soundfile as sf
import sys

import socket
import numpy as np

from threading import Thread
import gradio as gr

def RAW_2_OGG(raw_chunk):
  byte_io = io.BytesIO()
  signal = np.frombuffer(raw_chunk,dtype=np.float32)
  old=sys.getsizeof(raw_chunk)

  sf.write(byte_io, signal, RATE,format='OGG') 
  b=bytes(byte_io.getbuffer( ))
  n=sys.getsizeof(b)
  print(n/old)
  return b


def OGG_2_RAW(ogg_chunk):
  byte_io = io.BytesIO()
  byte_io.write(ogg_chunk)
  byte_io.seek(0)
  
  data, samplerate = sf.read(byte_io)
  
  return np.float32(data)


CHUNK=4096
FORMAT = pyaudio.paFloat32
CHANNELS = 1
RATE = 16000
p = pyaudio.PyAudio()

stream=p.open(format=FORMAT,
            channels=CHANNELS,
            rate=RATE,
            input=True,
            output=True,
            frames_per_buffer=CHUNK)




black,clients=[],[]

def set_black(s):
    global black
    black=s

def get_black():
    global black
    return black

def set_clients(s):
    global clients
    clients=s

def get_clients():
    global clients
    return clients

def Sender():
    UDP_PORT = 6006

    sock = socket.socket(socket.AF_INET,
                        socket.SOCK_DGRAM)
    while True:
        clients=get_clients()
        data = stream.read(CHUNK, exception_on_overflow = False)
        data=RAW_2_OGG(data)
        #print(len(data))
        for addr in clients:
            sock.sendto(data, (addr, UDP_PORT))



def Receiver():
    me = "127.0.0.1"
    UDP_PORT = 6006

    sock = socket.socket(socket.AF_INET,
                        socket.SOCK_DGRAM)
    sock.setblocking(0)
    sock.bind((me, UDP_PORT))


    while True:
        c=0
        first=True
        while True:
            blacklist=get_black()
            try:
                d,addr=sock.recvfrom(CHUNK*10)
                if addr[0] in blacklist:
                    continue
                new=OGG_2_RAW(d)
                #new=np.frombuffer(d,dtype=np.uint16) # buffer size is 1024 bytes
                c+=1
                if first:
                    data=new.copy()
                    first=False
                else:
                    data+=new
            except:
                break
        if not first:
            #print(c)
            stream.write(data.tobytes())




def variable_outputs(k):
    k = int(k)
    return [gr.Textbox.update(visible=True)]*k + [gr.Textbox.update(visible=False)]*(max_textboxes-k)


max_textboxes=10
with gr.Blocks() as demo:
    gr.Markdown("# PyCord")
    with gr.Tab("Отправлять"):
        snd = gr.Slider(0, max_textboxes, value=10, step=1, label="Адреса получателей")
        textboxes = []
        for i in range(max_textboxes):
            t = gr.Textbox(f"127.0.0.1")
            textboxes.append(t)
        snd.change(variable_outputs, snd, textboxes)
        button = gr.Button("Начать отправку")
        button.click(lambda *s:set_clients(s[:s[-1]]),textboxes+[snd])

    with gr.Tab("Чёрный список"):
        snd2 = gr.Slider(0, max_textboxes, value=10, step=1, label="Блокировать входящие с адресов")
        textboxes2 = []
        for i in range(max_textboxes):
            t = gr.Textbox(f"127.0.0.1")
            textboxes2.append(t)
        snd2.change(variable_outputs, snd2, textboxes2)
        button2 = gr.Button("БАН")
        button2.click(lambda *s:set_black(s[:s[-1]]),textboxes2+[snd2])
    


sender = Thread(target=Sender, args=())
sender.start()

recv = Thread(target=Receiver, args=())
recv.start()

demo.launch()

Finally

Никому не советую использовать gradio не по назначению — вот и весь вывод.

Спасибо за прочтение, надеюсь вы смогли найти что‑то интересное в этой статье, раз уж дочитали до сюда. Критика и идеи ожидаю в комментариях, на этом у меня всё, всем понятных сокетов и добра.

Комментарии (8)


  1. Disinterpreter
    19.07.2023 09:54
    +2

    Вы неплохо усвоили программирование на питон, и даже можете написать такое. Но, хочу побыть душнилой.

    На данный момент это тянет только на PoC и не более, так как

    1. Как я заметил, сам чат не является асинхронным приложением, никаких aio библиотек не используется

    2. Не очень понятно, хорошо ли он пробивает CGNAT

    3. Не видно демонстрации работы с IPv6

    4. Нет ни слова про работу кодеков и сжатие данных

    Возможно, другие люди добавят что-то еще.


    1. CodeDroidX Автор
      19.07.2023 09:54

      Спасибо за рекомендации, учту ваши советы!

      1. Да, это скорее Proof of concept нежели полноценное приложение, что бы выйти в люди. Про работу кодеков или сжатие данных нет, ведь аудиоданные тут фактически сырые, и да, это действительно худшее решение для реального сетевого ПО.

      2. Не совсем понял зачем пробивать NAT... речь идёт о временном пробросе портов? Или о некой проблеме которая затрагивает непрерывный поток схожих пакетов при маршрутизации? Так же не совсем понимаю что может пойти не так при использовании IPv6, если с v4 все работает корректно.


      1. Disinterpreter
        19.07.2023 09:54

        По второму, ну если я хочу сделать аудиозвонок с человеком через интернет, при чем я буду с мобильного интернета. Есть ли гарантия что звонок состоится? Про IPv6 лучше сказать "пока не попробуешь - не узнаешь".


    1. adron_s
      19.07.2023 09:54

      Хотелось бы увидеть доработанный код с использованием кодеков. Сырой аудио поток передавать через интернет - это как то совсем не правильно.


      1. GreenVic
        19.07.2023 09:54

        Да не важно. Поток звуковой это копеечная полоса.


        1. Abobcum
          19.07.2023 09:54

          Вовсе нет, сырой поток - это примерно 1.5 Мбит/с всего лишь для двух участников. Если их 5, то будет уже 3.5 Мбит/с - а это сравнимо с скоростью мобильного интернета.


          1. GreenVic
            19.07.2023 09:54

            RATE = 44100. Для голоса это супер избыточно. Если разумно снизить этот параметр, то полоса здорово подрезается. А если включить VAD, то вообще остаются крохи


      1. CodeDroidX Автор
        19.07.2023 09:54

        Спасибо за идею, приделал кодировку