Мне очень нравится концепция, когда можно расширить возможности восприятия для искусственного интеллекта. Сегодня формат чата самый понятный и популярный для взаимодействия с ИИ. Безусловно, общение только через чат греет мою интровертивную душу, взращенную на BBS'ках и рассказах о НашBOFH. Но, всё же, почему бы не сделать общение с ботами более "человечным", научить их слушать, слышать и говорить? Всё, о чём дальше пойдёт речь в статье не является какой-то уникальной киллер-фичей, и давно используется во многих сервисах, предоставляющих доступы к искусственному интеллекту (LLM). В этой статье я хочу рассказать о возможных решениях на Python, доступных для любого желающего.
openai.Audio.transcribe
Скорее всего, вы используете ChatGPT в качестве основного LLM движка. Если обратиться к его API, то можно найти специальную модель whisper-1, отвечающую в том числе и за транскрипцию текста. Всё, что вам нужно для того, чтобы перевести ваш голос в текст, нужно вызвать метод openai.Audio.atranscribe. Здесь и далее я буду использовать асинхронные методы, где это возможно. Это удобнее для организации параллельной работы.
import openai
async def transcript(file, prompt=None, language="en", response_format="text"):
"""
Wrapper for the transcribe function. Returns only the content of the message.
:param file: Path with filename to transcript.
:param prompt: Previous prompt. Default is None.
:param language: Language on which audio is. Default is 'en'.
:param response_format: default response format, by default is 'text'.
Possible values are: json, text, srt, verbose_json, or vtt.
:return: transcription (text, json, srt, verbose_json or vtt)
"""
kwargs = {}
if prompt is not None:
kwargs["prompt"] = prompt
return await openai.Audio.atranscribe(
model="whisper-1",
file=file,
language=language,
response_format=response_format,
temperature=1,
**kwargs,
)
Чтобы позвать эту функцию, передадим ей файл.
with open(file_path, "rb") as f:
transcript = await transcript(file=f, language="en")
response = await ask_chat(transcript) # this method is for prompting LLM using pure string
Использовать готовый файл - это конечно неплохо, но стоит для полноты картины предположить, что вы, вероятно, захотите записывать свой голос налету?
Самый простой способ - это использовать sounddevice. Так как sounddevice записывает файл в формате wav, разумно его для передачи через интернет всё-таки сконвертировать в mp3, для этого можно использовать, например, pydab. В итоге код будет выглядеть как-то так:
import os
import tempfile
import uuid
import sounddevice as sd
import soundfile as sf
from pydub import AudioSegment
def record_and_convert_audio(duration: int = 5, frequency_sample: int = 16000):
"""
Records audio for a specified duration and converts it to MP3 format.
This function records audio for a given duration (in seconds) with a specified frequency sample.
The audio is then saved as a temporary .wav file, converted to .mp3 format, and the .wav file is deleted.
The function returns the path to the .mp3 file.
:param duration: The duration of the audio recording in seconds. Default is 5 seconds.
:param frequency_sample: The frequency sample rate of the audio recording. Default is 16000 Hz.
:return: The path to the saved .mp3 file.
"""
print(f"Listening beginning for {duration}s...")
recording = sd.rec(int(duration * frequency_sample), samplerate=frequency_sample, channels=1)
sd.wait() # Wait until recording is finished
print("Recording complete!")
temp_dir = tempfile.gettempdir()
wave_file = f"{temp_dir}/{str(uuid.uuid4())}.wav"
sf.write(wave_file, recording, frequency_sample)
print(f"Temp audiofile saved: {wave_file}")
audio = AudioSegment.from_wav(wave_file)
os.remove(wave_file)
mp3_file = f"{temp_dir}/{str(uuid.uuid4())}.mp3"
audio.export(mp3_file, format="mp3")
print(f"Audio converted to MP3 and stored into {mp3_file}")
return mp3_file
Собственно полученный файл уже можно скормить модели. Но метод выглядит очень топорным, потому что запись продолжается фиксированное время, вне зависимости от того, как вы долго говорите - меньше, чем установленный промежуток и вам придётся ждать окончания записи или больше, что приведёт к обрезке фразы. Обычно самым разумным решением является реализация push-to-talk. Пока пользователь нажимает кнопку, идёт запись. Так работают мессенджеры и многие онлайн чаты. Но мне кажется это всё ещё недостаточно "человечным" решением, потому что не вписывается в концепцию имеющего уши ИИ. Мне, как пользователю консоли, было бы удобнее обойтись без каких-либо кнопок, и, по большому счёту, передать эту работу коду: то есть слушать постоянно, и если в шуме замечена речь, то распознавать её. Ну, почти так, как работает Google Assistant, Siri и умные колонки а-ля Алиса в вашем доме. Если вам не нужно реагировать на любой звук, вы всегда сможете фильтровать вашу catch-фразу, которая будет распознана первой (вначале записи).
import re
pattern = r"hellos*,?s*bunny"
if re.match(pattern, transcript, re.IGNORECASE):
prompt = re.sub(pattern, '', text, flags=re.IGNORECASE).lstrip()
response = await ask_chat(prompt)
Что ж, для этой задачи можно использовать например мой AudioRecorder, основанный на pyaudio. Он будет слушать микрофон и детектировать шум (речь) на фоне тишины используя среднеквадратичное отклонение. Полная под катом.
Hidden text
import math
import os
import struct
import tempfile
import time
import uuid
import wave
import pyaudio
from pydub import AudioSegment
class AudioRecorder:
"""
The AudioRecorder class is for managing an instance of the audio recording and conversion process.
Parameters:
pyaudio_obj (PyAudio): Instance of PyAudio. Default is pyaudio.PyAudio().
threshold (int): The RMS threshold for starting the recording. Default is 15.
channels (int): The number of channels in the audio stream. Default is 1.
chunk (int): The number of frames per buffer. Default is 1024.
f_format (int): The format of the audio stream. Default is pyaudio.paInt16.
rate (int): The sample rate of the audio stream. Default is 16000 Hz.
sample_width (int): The sample width (in bytes) of the audio stream. Default is 2.
timeout_length (int): The length of the timeout for the recording (in seconds). Default is 2 seconds.
temp_dir (str): The directory for storing the temporary .wav and .mp3 files. Default is the system's temporary dir.
normalize (float): The normalization factor for the audio samples. Default is 1.0 / 32768.0.
pa_input (bool): Specifies whether the stream is an input stream. Default is True.
pa_output (bool): Specifies whether the stream is an output stream. Default is True.
"""
def __init__(
self,
pyaudio_obj=pyaudio.PyAudio(),
threshold=15,
channels=1,
chunk=1024,
f_format=pyaudio.paInt16,
rate=16000,
sample_width=2,
timeout_length=2,
temp_dir=tempfile.gettempdir(),
normalize=(1.0 / 32768.0),
pa_input=True,
pa_output=True,
):
"""
General init.
This method initializes an instance of the AudioRecorder class with the specified parameters.
The default values are used for any parameters that are not provided.
:param pyaudio_obj: Instance of PyAudio. Default is pyaudio.PyAudio().
:param threshold: The RMS threshold for starting the recording. Default is 15.
:param channels: The number of channels in the audio stream. Default is 1.
:param chunk: The number of frames per buffer. Default is 1024.
:param f_format: The format of the audio stream. Default is pyaudio.paInt16.
:param rate: The sample rate of the audio stream. Default is 16000 Hz.
:param sample_width: The sample width (in bytes) of the audio stream. Default is 2.
:param timeout_length: The length of the timeout for the recording (in seconds). Default is 2 seconds.
:param temp_dir: The directory for storing the temporary .wav and .mp3 files. Default is temp dir.
:param normalize: The normalization factor for the audio samples. Default is 1.0 / 32768.0.
:param pa_input: Specifies whether the stream is an input stream. Default is True.
:param pa_output: Specifies whether the stream is an output stream. Default is True.
"""
self.___pyaudio = pyaudio_obj
self.___threshold = threshold
self.___channels = channels
self.___chunk = chunk
self.___format = f_format
self.___rate = rate
self.___sample_width = sample_width
self.___timeout_length = timeout_length
self.___temp_dir = temp_dir
self.___normalize = normalize
self.___input = pa_input
self.___output = pa_output
self.stream = self.init_stream(
f_format=self.___format,
channels=self.___channels,
rate=self.___rate,
pa_input=self.___input,
pa_output=self.___output,
frames_per_buffer=self.___chunk,
)
def init_stream(self, f_format, channels, rate, pa_input, pa_output, frames_per_buffer):
"""
Initializes an audio stream with the specified parameters.
This function uses PyAudio to open an audio stream with the given format, channels, rate, input, output,
and frames per buffer.
:param f_format: The format of the audio stream.
:param channels: The number of channels in the audio stream.
:param rate: The sample rate of the audio stream.
:param pa_input: Specifies whether the stream is an input stream. A true value indicates an input stream.
:param pa_output: Specifies whether the stream is an output stream. A true value indicates an output stream.
:param frames_per_buffer: The number of frames per buffer.
:type frames_per_buffer: int
:return: The initialized audio stream.
"""
return self.___pyaudio.open(
format=f_format,
channels=channels,
rate=rate,
input=pa_input,
output=pa_output,
frames_per_buffer=frames_per_buffer,
)
def record(self):
"""
Starts recording audio when noise is detected.
This function starts recording audio when noise above a certain threshold is detected.
The recording continues for a specified timeout length.
The recorded audio is then saved as a .wav file, converted to .mp3 format, and the .wav file is deleted.
The function returns the path to the .mp3 file.
:return: The path to the saved .mp3 file.
"""
print("Noise detected, recording beginning")
rec = []
current = time.time()
end = time.time() + self.___timeout_length
while current <= end:
data = self.stream.read(self.___chunk)
if self.rms(data) >= self.___threshold:
end = time.time() + self.___timeout_length
current = time.time()
rec.append(data)
filename = self.write(b"".join(rec))
return self.convert_to_mp3(filename)
def write(self, recording):
"""
Saves the recorded audio to a .wav file.
This function saves the recorded audio to a .wav file with a unique filename.
The .wav file is saved in the specified temporary directory.
:param recording: The recorded audio data.
:return: The path to the saved .wav file.
"""
filename = os.path.join(self.___temp_dir, f"{str(uuid.uuid4())}.wav")
wave_form = wave.open(filename, "wb")
wave_form.setnchannels(self.___channels)
wave_form.setsampwidth(self.___pyaudio.get_sample_size(self.___format))
wave_form.setframerate(self.___rate)
wave_form.writeframes(recording)
wave_form.close()
return filename
def convert_to_mp3(self, filename):
"""
Converts a .wav file to .mp3 format.
This function converts a .wav file to .mp3 format. The .wav file is deleted after the conversion.
The .mp3 file is saved with a unique filename in the specified temporary directory.
:param filename: The path to the .wav file to be converted.
:return: The path to the saved .mp3 file.
"""
audio = AudioSegment.from_wav(filename)
mp3_file_path = os.path.join(self.___temp_dir, f"{str(uuid.uuid4())}.mp3")
audio.export(mp3_file_path, format="mp3")
os.remove(filename)
return mp3_file_path
def listen(self):
"""
Starts listening for audio.
This function continuously listens for audio and starts recording when the
RMS value of the audio exceeds a certain threshold.
:return: The path to the saved .mp3 file if recording was triggered.
"""
print("Listening beginning...")
while True:
mic_input = self.stream.read(self.___chunk)
rms_val = self.rms(mic_input)
if rms_val > self.___threshold:
return self.record()
def rms(self, frame):
"""
Calculates the Root Mean Square (RMS) value of the audio frame.
This function calculates the RMS value of the audio frame, which is a measure of the power in the audio signal.
:param frame: The audio frame for which to calculate the RMS value.
:return: The RMS value of the audio frame.
"""
count = len(frame) / self.___sample_width
f_format = "%dh" % count
shorts = struct.unpack(f_format, frame)
sum_squares = 0.0
for sample in shorts:
normal_sample = sample * self.___normalize
sum_squares += normal_sample * normal_sample
rms = math.pow(sum_squares / count, 0.5)
return rms * 1000
Возможно, вам нужно будет экспериментально подобрать параметры threshold, timeout, channels, sample_length, chunk и rate в зависимости от вашего микрофона. Ну и наконец, код, чтобы получить запись для модели.
from utils.audio_recorder import AudioRecorder
file_path = AudioRecorder().listen()
speech_recognition
Использовать готовые методы от OpenAI конечно хорошо, но токены не сказать чтобы бесплатные, и, возможно, вы захотите использовать альтернативный подход. Либо вовсе вам не подходит этот метод, потому что вы используете например llama2 или Bard вместо ChatGPT. Тогда альтернативным решением может являться использования библиотеки speech_recognition. Я использую распознание от Google, но при желании вы можете использовать другие движки, например wit, azure, sphinx. В библиотеке есть всё необходимое, так что мы можем распознавать как аудиофайл, так и записывать напрямую, используя класс Microphone(). Так же как и мой AudioRecorder, удобно использовать активацию по голосу. Единственное, что вам следует указать, это язык аудиофайла. Да, это не так гибко и удобно, как в методе от OpenAI, где можно опустить параметр language и надеятся, что система сама выберет правильный язык, но я бы лично не рекомендовал не указывать язык во избежание ошибок. Примерный метод может выглядеть так:
import speech_recognition as sr
class CustomTranscriptor:
"""
This is wrapper class for Google Transcriptor which uses microphone to get audio sample.
"""
def __init__(self, language="en-EN"):
"""
General init.
:param language: Language, what needs to be transcripted.
"""
self.___recognizer = sr.Recognizer()
self.___source = sr.Microphone()
self.language = language
def transcript(self):
"""
This function transcripts audio (from microphone recording) to text using Google transcriptor.
:return: transcripted text (string).
"""
print("Listening beginning...")
with self.___source as source:
audio = self.___recognizer.listen(source, timeout=5)
user_input = None
try:
user_input = self.___recognizer.recognize_google(audio, language=self.language)
except sr.UnknownValueError:
print("Google Speech Recognition can't transcript audio")
except sr.RequestError as error:
print(f"Unable to fetch from resource Google Speech Recognition: {error}")
except sr.WaitTimeoutError as error:
print(f"Input timeout, only silence is get: {error}")
return user_input
Наконец, код для работы с чатом через голос может выглядеть так:
import asyncio
from utils.audio_recorder import AudioRecorder
from utils.transcriptors import CustomTranscriptor
method = "google" # or any to use speech_recognition instead of openai
async def main():
while True:
try:
if "google" not in method:
file_path = AudioRecorder().listen()
with open(file_path, "rb") as f:
transcript = await gpt.transcript(file=f, language="en")
else:
transcript = CustomTranscriptor(language="en-US").transcript()
if transcript:
print(transcript)
response = await ask_chat(transcript)
except KeyboardInterrupt:
break
asyncio.run(main())
Текст-в-речь
Настало время научить говорить наш искусственный интеллект. Здесь, к сожалению, нет решения из коробки, если вы работаете c LLM напрямую. Для преобразования текста в голос нужно использовать одну из реализаций TTS.
Первый вариант - использовать библиотеку gtts от Google. В таком случае, gtts создаст файл с озвучкой, который нужно будет воспроизвести в каком-либо плеере, а затем удалить. Чтобы не плодить сущностей, я использую pydub.playback.
import os
import tempfile
from uuid import uuid4
from gtts import gTTS
from pydub import AudioSegment, playback
def process_via_gtts(text):
temp_dir = tempfile.gettempdir()
tts = gTTS(text, lang="en")
raw_file = f"{temp_dir}/{str(uuid4())}.mp3"
tts.save(raw_file)
audio = AudioSegment.from_file(raw_file, format="mp3").speedup(1.3) # haste a bit
os.remove(raw_file)
playback.play(audio)
Второй вариант - использовать библиотеку pyttsx. В отличие от gtts, синтезация речи происходит на лету в цикле, что удобнее и быстрее при потоковой трансляции текста. Ну и плюс библиотека не требует подключения к интернету.
from time import sleep
from pyttsx4 import init as pyttsx_init
def process_via_pytts(text):
"""
Converts text to speach using python-tts text-to-speach method
:param text: Text needs to be converted to speach.
"""
engine = pyttsx_init()
engine.setProperty("voice", 'com.apple.voice.enhanced.ru-RU.Katya')
engine.say(text)
engine.startLoop(False)
while engine.isBusy():
engine.iterate()
sleep(0.1)
engine.endLoop()
Озвучка производится системными голосами, будет отличаться в зависимости от системы, но вы можете установить любой доступный. Например посмотреть список голосов можно так:
engine = pyttsx_init()
engine.getProperty("voices")
Собственно, чтобы собрать воедино, получаем ответ от чата и воспроизводим его через какой-либо tts движок.
import asyncio
from utils.audio_recorder import AudioRecorder
from utils.transcriptors import CustomTranscriptor
from utils.tts import process_via_gtts, process_via_pytts
async def tts_process(text, method):
"""
Converts text to speach using pre-defined model
:param text: Text needs to be converted to speach.
:param method: method of tts
"""
if "google" in method:
process_via_gtts(text)
else:
process_via_pytts(text)
async def main():
method = "google"
while True:
try:
if "google" not in method:
file_path = AudioRecorder().listen()
with open(file_path, "rb") as f:
transcript = await gpt.transcript(file=f, language="en")
else:
transcript = CustomTranscriptor(language="en-US").transcript()
if transcript:
print(transcript)
response = await ask_chat(transcript) # this method returns string of whole chatbot response
await tts_process(response, "not google")
except KeyboardInterrupt:
break
asyncio.run(main())
Нюансы человечности
Как в замечательном анекдоте про Чапаева: "но есть один нюанс". Получение ответа от чат-бота занимает какое-то время, в зависимости от модели, оно может быть и достаточно продолжительным. При использовании tts мы должны дождаться получения полного ответа и начать воспроизведение голоса, что ещё увеличивает время конечного ответа. Когда я только начинал свои эксперименты, это рушило всю магию живого общения и вызывало только раздражение и желание вернуться к старому-доброму текстовому общению. Но не всё так плохо. Скажу по правде, я просто влюблён в метод stream в ChatGPT, который возвращает ответ налету из ChatCompletion. Так что моя идея состоит в том, чтобы вызывать tts сразу, как только получено что-то в ответе от бота. Но наверное те, кто пользовались этой фичей, наверняка знают, что возвращаться может что угодно - как слова, так и предложения или отдельные буквы. И это проблема, если вы попытаетесь запустить tts на каждый получаемый chunk.
Hidden text
{
"id": "chatcmpl-ABCABC",
"object": "chat.completion.chunk",
"created": 1234567890,
"model": "gpt-3.5-turbo",
"choices": [
{
"index": 0,
"delta": {
"content": "Hel"
},
"finish_reason": null
}
]
}
{
"id": "chatcmpl-ABCABC",
"object": "chat.completion.chunk",
"created": 1234567890,
"model": "gpt-3.5-turbo",
"choices": [
{
"index": 1,
"delta": {
"content": "lo, "
},
"finish_reason": null
}
]
}
<...>
{
"id": "chatcmpl-ABCABC",
"object": "chat.completion.chunk",
"created": 1234567890,
"model": "gpt-3.5-turbo",
"choices": [
{
"index": 12,
"delta": {
"content": "ay?"
},
"finish_reason": null
}
]
}
Первая итерация: давайте дожидаться получения целого слова, и только потом начинать озвучку.
import string
import sys
from utils.tts import tts_process
async def ask_chat(user_input):
full_response = ""
word = ""
async for response in gpt.str_chat(user_input):
for char in response:
word += char
if char in string.whitespace or char in string.punctuation:
if word:
tts_process(word)
word = ""
sys.stdout.write(char) # I use direct stdout output to make output be printed on-the-fly.
sys.stdout.flush() # To get typewriter effect I forcefully flush output each time.
full_response += char
print("\n")
return full_response # if we'll need whole prompt for some reasons later
Результат, честно говоря, получится так себе - рваным. Пожалуй, неплохой идеей является дожидаться получения нескольких слов, например 2-3 и озвучивать их. Слова будем складывать в асинхронную очередь, и проверять в параллельно запущенной задаче.
import string
import sys
import asyncio
from utils.tts import tts_process
prompt_queue = asyncio.Queue()
async def ask_chat(user_input):
full_response = ""
word = ""
async for response in gpt.str_chat(user_input):
for char in response:
word += char
if char in string.whitespace or char in string.punctuation:
if word:
await prompt_queue.put(word)
word = ""
sys.stdout.write(char) # I use direct stdout output to make output be printed on-the-fly.
sys.stdout.flush() # To get typewriter effect I forcefully flush output each time.
full_response += char
print("\n")
return full_response # if we'll need whole prompt for some reasons later
async def tts_task():
limit = 3
empty_counter = 0
while True:
if prompt_queue.empty():
empty_counter += 1
if empty_counter >= 3:
limit = 3
empty_counter = 0
words = []
# Get all available words
limit_counter = 0
while len(words) < limit:
print(len(words))
try:
word = await asyncio.wait_for(prompt_queue.get(), timeout=1)
words.extend(word.split())
if len(words) >= limit:
break
except asyncio.TimeoutError:
limit_counter += 1
if limit_counter >= 10:
limit = 1
# If we have at least limit words or queue was empty 3 times, process them
if len(words) >= limit:
text = " ".join(words)
await tts.process(text)
limit = 1
async def main():
asyncio.create_task(tts_task())
# and rest of the code
Это звучит уже неплохо, но в процессе обработки теряются интонации и знаки препинания. Финально, давайте сделаем предположение, что следует обрабатывать только предложения, ну или их части, то есть куски, которые будут заканчиваться символами ".?!,;:".
import string
import sys
import asyncio
from utils.tts import tts_process
prompt_queue = asyncio.Queue()
async def ask_chat(user_input):
full_response = ""
word = ""
async for response in gpt.str_chat(user_input):
for char in response:
word += char
if char in string.whitespace or char in string.punctuation:
if word:
await prompt_queue.put(word)
word = ""
sys.stdout.write(char) # I use direct stdout output to make output be printed on-the-fly.
sys.stdout.flush() # To get typewriter effect I forcefully flush output each time.
full_response += char
print("\n")
return full_response # if we'll need whole prompt for some reasons later
async def tts_sentence_task():
punctuation_marks = ".?!,;:"
sentence = ""
while True:
try:
word = await asyncio.wait_for(prompt_queue.get(), timeout=0.5)
sentence += " " + word
# If the last character is a punctuation mark, process the sentence
if sentence[-1] in punctuation_marks:
await tts_process(sentence)
sentence = ""
except Exception as error:
pass
async def main():
asyncio.create_task(tts_sentence_task())
# and rest of the code
Если вы попробуете мои примеры, то обратите внимание, что во время озвучки прерывается вывод чата. Чтобы исправить это, нам нужно запускать tts в отдельном потоке. Чтобы это сделать, нам нужно будет формировать вторую очередь для tts. И заводить ещё одну параллельную задачу для обработчика.
import string
import sys
import asyncio
from utils.tts import tts_process
prompt_queue = asyncio.Queue()
tts_queue = asyncio.Queue()
async def ask_chat(user_input):
# same
async def tts_sentence_task():
punctuation_marks = ".?!,;:"
sentence = ""
while True:
try:
word = await asyncio.wait_for(prompt_queue.get(), timeout=0.5)
sentence += " " + word
# If the last character is a punctuation mark, process the sentence
if sentence[-1] in punctuation_marks:
await tts_queue.put(sentence)
sentence = ""
except Exception as error:
pass
async def tts_worker():
while True:
sentence = await tts_queue.get()
if sentence:
await tts_process(sentence)
tts_queue.task_done()
async def main():
asyncio.create_task(tts_sentence_task())
asyncio.create_task(tts_worker())
# and rest of the code
И тем не менее задача не решена, потому что, увы, методы tts (что gtts, что pyttsx) являются синхронными. Это значит, что на время озвучки выполнение основного цикла блокируется, и ожидает выполнения синхронной задачи. Чтобы решить эту проблему, следует, например, запускать плееры в отдельных потоках. Проще всего это сделать, используя библиотеку threading.
import threading
async def process_via_gtts(text):
"""
Converts text to speach using gtts text-to-speach method
:param text: Text needs to be converted to speach.
"""
temp_dir = tempfile.gettempdir()
tts = gTTS(text, lang="en")
raw_file = f"{temp_dir}/{str(uuid4())}.mp3"
tts.save(raw_file)
audio = AudioSegment.from_file(raw_file, format="mp3").speedup(1.3)
os.remove(raw_file)
player_thread = threading.Thread(target=playback.play(audio), args=(audio,))
player_thread.start()
async def tts_process(text):
"""
Converts text to speach using pre-defined model
:param text: Text needs to be converted to speach.
"""
if "google" in self.___method:
await self.__process_via_gtts(text)
else:
player_thread = threading.Thread(target=process_via_pytts, args=(text,))
player_thread.start()
В данном случае мы получим новую проблему - теперь tts будут воспроизводиться как только в очереди появится новое предложение. Если к тому времени, пока озвучивается первое предложение будет получено второе, то начнётся его воспроизведение, затем третье, и получится какофония. Чтобы этого избежать, финально, нужно использовать механизм семафоров. Перед произведением проверяем и ждём освобождения семафора, а по его завершению - отпускаем семафор.
import threading
semaphore = threading.Semaphore(1)
def play_audio(self, audio):
""" Service method to play audio in monopoly mode using pydub
:param audio: AudioSegment needs to be played.
"""
playback.play(audio)
semaphore.release()
async def process_via_gtts(text):
"""
Converts text to speach using gtts text-to-speach method
:param text: Text needs to be converted to speach.
"""
temp_dir = tempfile.gettempdir()
tts = gTTS(text, lang=self.___lang)
raw_file = f"{temp_dir}/{str(uuid4())}.mp3"
tts.save(raw_file)
audio = AudioSegment.from_file(raw_file, format="mp3").speedup(self.___speedup)
os.remove(raw_file)
semaphore.acquire()
player_thread = threading.Thread(target=self.play_audio, args=(audio,))
player_thread.start()
def process_via_pytts(text):
"""
Converts text to speach using python-tts text-to-speach method
:param text: Text needs to be converted to speach.
"""
engine = self.___pytts
engine.setProperty("voice", self.___voice)
engine.say(text)
engine.startLoop(False)
while engine.isBusy():
engine.iterate()
sleep(self.___frame)
engine.endLoop()
semaphore.release()
async def process(text):
"""
Converts text to speach using pre-defined model
:param text: Text needs to be converted to speach.
"""
if "google" in self.___method:
await self.__process_via_gtts(text)
else:
semaphore.acquire()
player_thread = threading.Thread(target=self.__process_via_pytts, args=(text,))
player_thread.start()
В заключение
Зачем оно нужно? Тут каждый может ответить в зависимости от своих задач и потребностей. Мне было любопытно поизучать возможные пути реализации "естественного общения" с чат-ботами. Например мой бот может быть персональным ассистентом, доступным под рукой в любой момент, и ведущий себя так, как я ожидаю от него. Ну, скажем, можно попросить налету узнать текущую погоду или нарисовать красивую девушку некроманта верхом на белой лошади.
На этом всё. Если этот материал был вам полезен, то не забудьте поставить лайк этому посту, написать комментарий, а так же, если воодушевитесь - поделиться монеткой. Оригинал статьи тут.
Комментарии (7)
theurus
07.09.2023 10:03+1Для tts еще можно использовать бесплатный микрософт https://pypi.org/project/edge-tts/
Там 2 русских голоса и лимиты намного больше чем у гугла, можно даже книги озвучивать.
Вместо чатгпт можно использовать гугол барда, он отвечает так быстро что ему не нужен стриминг.
Потестить 4 разных чатбота с поддержкой голоса можно в телеграме у бота https://t.me/kun4sun_bot
wwakabobik Автор
07.09.2023 10:03Да, спасибо, я вчера вечером как раз сидел, смотрел pypi, ещё ряд TTS-ок нашёл. На edge-tts тоже посмотрю.
А насчёт Барда, наверное рано пока что, пока нет официального api. Через установку cookie рано или поздно гугл временно банит и ловишь 429 через десяток запросов. Ну или я не знаю альтернивы методу https://github.com/dsdanielpark/Bard-API :)theurus
07.09.2023 10:03+1Альтернатива - барабан с несколькими ключами от барда Ж) Для чатГПТ тоже полезная штука, позволяет использовать подарочные ключи у которых ограничение в 3 запроса в минуту, с барабаном на несколько ключей это ограничение фактически снимается.
evgensoft
07.09.2023 10:03+1Если локально TTS будет работать - то и распознавать речь локально можно - High-performance inference of OpenAI's Whisper automatic speech recognition (ASR) model (https://github.com/ggerganov/whisper.cpp)
P.S. Да и запускать локально можно нейросетку - https://github.com/ggerganov/llama.cpp
AcckiyGerman
О, круто, сам о такой озвучке думал, но руки не дошли.
Есть ли демка или видеозапись о том, как это работает?
Нашёл: https://youtu.be/Ph7EQSZPmGc?t=844
wwakabobik Автор
Ага :) В видео видно, что AudioRecorder ждёт после окончания голоса 2с, по-хорошему надо подтюнить, чтобы быстрее реагировал.