
Иногда мы с товарищами собираемся, чтобы сыграть несколько забегов в игре PUBG: Battlegrounds. Взаимодействие между участниками команды (сквада) тут возведено в абсолют, и голосовая коммуникация жизненно необходима для успешного выживания. В PUBG нельзя расслабляться ни на секунду. Частенько бывают ситуации, когда нужно на слух определять местоположение соперника. Любой посторонний звук в такой момент способен испортить матч всей команде.
Чтобы минимизировать ситуации, когда мои домочадцы пытаются со мной заговорить во время игры, я решил поставить световое табло ON AIR, похожее на то, которое используют на радиостанциях (и да, после такой наглости я даже выжил). Задумка была в том, чтобы табло автоматически зажигалось, когда я нахожусь на сервере TeamSpeak, и отключалось после дисконнекта. Что в итоге у меня получилось — читайте дальше.

Прежде всего я раздобыл готовое табло. Разумеется, можно было бы заморочиться с неоном, но я решил обойтись обычным LED. Цена на такую штуку не слишком велика, но вот с автоматизацией придется повозиться. Первое, что я сделал, — проверил ток потребления:

1,44 А — это действительно много, чтобы брать напрямую с RPI. Запитывать придется от отдельного блока. Управлять буду с помощью реле. Нормальные люди обычно покупают готовые модули, но ждать три недели с маркетплейса не хотелось. Под рукой же лежало реле TRJ-5VDC-SA-CD:

Достаточно подать на управляющие контакты 5 В 40 мА — и оно успешно сработает. Проблемы две: GPIO Raspberry Pi оперирует напряжением 3,3 V, а максимум c такого пина можно отдать ~16 мА. Тем не менее на самой гребенке два пина 5 V, которые по факту выдают до 1–2 А в зависимости от блока питания и загруженности платы. Само реле для удержания в открытом состоянии будет потреблять 40–70 мА. Получается, нужно собрать простейший драйвер.
Плюс не стоит забывать, что реле управляется катушкой, которая благодаря коварной физике накапливает в себе магнитную энергию. Достаточно резко оборвать ток — и напряжение на выводах способно подскочить и выжечь нафиг наш нежный GPIO (привет, обратная ЭДС). Так что надо будет подумать о защите в виде диода.
Делаю драйвер
Порывшись в коробочках с радиодеталями (у каждого мужчины такая со временем появляется), я нашел NPN-транзистор КТ3102 в корпусе TO-92 и сигнальный кремниевый диод 1N4148. Вначале я рассчитывал, что мне для открытия транзистора будет достаточно ~2,6 мА, поэтому ошибочно взял резистор на 1кΩ.
Дальше собрал схему:
+5 V (pin 2) — анод 1N4148;
катод 1N4148 — катушка управления;
катушка управления — эмиттер транзистора;
GPIO 17 (pin 11) — резистор;
резистор — база транзистора;
GND (pin 6) — коллектор транзистора.

Быстро спаял навесным монтажом и подключил к «малинке». Чтобы было проще, установил Node-Red при помощи волшебного скрипта:
$ bash <(curl -sL https://github.com/node-red/linux-installers/releases/latest/download/update-nodejs-and-nodered-deb)
Для автостарта Node-Red выполнил:
$ sudo systemctl enable nodered.service
Теперь можно проверить переключение реле, подавая 1 и 0 на GPIO 17 с помощью штатной ноды Inject в связке с rpi-gpio out:

Первый блин комом — вместо отчетливого щелчка я услышал еле-еле заметную попытку. Померял напряжение мультиметром, а там всего лишь 2 V. Значит, транзистор не ушел в насыщение и не открылся. Поэтому заменил номинал резистора (поставил 470Ω вместо 1кΩ). Ток с GPIO-пина вырос до ~5,6 мА — вполне безопасное значение. Проверил — реле уверенно переключается.
MQTT
Теперь пришла пора поставить брокер сообщений Mosquitto и настроить базовую авторизацию по логину и паролю:
$ sudo apt install -y mosquitto mosquitto-clients
Как и Node-Red, имеет смысл добавить Mosquitto в автостарт:
$ sudo systemctl enable mosquitto
В комплекте есть специальная утилита, позволяющая безопасно хранить заданный пароль для указанного пользователя:
$ sudo mosquitto_passwd -c /etc/mosquitto/passwd nodered
По умолчанию Mosquitto читает конфиги в директории /etc/mosquitto/conf.d/:
$ sudo nano /etc/mosquitto/conf.d/local.conf
Прописываем туда порт, запрещаем подключаться без авторизации и говорим, где лежит файл с хешем пароля:
listener 1883
allow_anonymous false
password_file /etc/mosquitto/passwd
Перечитываем конфиг и перезапускаем брокер:
$ sudo systemctl restart mosquitto
Теперь можно возвращаться в веб-интерфейс Node-Red и настроить работу через MQTT. В первую очередь надо добавить сервер. Так как все запущено на Raspberry, прописываем адрес 127.0.0.1 (или localhost):

Переходим на вкладку Security и указываем там ранее созданного пользователя и пароль nodered / password:

Слушать мы будем определенный топик. Назовем его для примера state:

Переделываем тестовую схему — вместо прямого соединения добавляем ноды MQTT в качестве посредника. При нажатии кнопки Deploy снизу каждой из них появится актуальный статус. Если все настроено правильно, то будет надпись connected:

Проверив еще раз, что отправка сообщения переключает реле, соединяем его выводы COM и NO (Normally Open) в разрыв жилы +5 V USB-кабеля. Проще всего взять отдельный USB-удлинитель и именно у него перерезать жилу. Это даст возможность управлять так любым USB-прибором, а не только конкретным табло.
Итак, первая часть проекта завершена. Поднят рабочий сервер Node-Red с брокером Mosquitto, который ожидает сообщения в топике state с 1 или 0 и дает команду на реле для включения табло. Теперь пора написать приложение, которое будет считывать данные с сервера TeamSpeak и отсылать сообщение MQTT.
Soft
У TeamSpeak есть два режима ServerQuery:
RAW (читай Telnet) на 10011.
SSH (да-да, он самый) на 10022.
Первый вариант хорош тем, что туда можно подключаться обычным сокетом, читать и писать строки, а также вытворять прочие безумства. Альтернатива — SSH. Все то же самое, только безопасно. Приложение будет на Python 3.11, к которому я дополнительно установлю пару пакетов с клиентами SSH (paramiko) и MQTT (paho-mqtt).
$ sudo apt install python3-paho-mqtt python3-paramiko
Код, разумеется, писался в нормальной IDE, но для переноса воспользовался редактором nano:
$ sudo nano ts3_presence_to_mqtt.py
Полный код:
import socket, time, sys
from typing import Optional, Tuple
import paho.mqtt.client as mqtt
import paramiko
# ================== НАСТРОЙКИ ==================
TS3_HOST = "192.168.88.105"
TS3_PROTOCOL = "ssh"
TS3_QUERY_PORT = 10022
TS3_LOGIN = "serveradmin"
TS3_PASSWORD = "CHANGE_ME"
TS3_SID = 1
TS3_TARGET_NICK = "Test"
MQTT_HOST = "192.168.88.27"
MQTT_PORT = 1883
MQTT_USERNAME = "nodered"
MQTT_PASSWORD = "password"
MQTT_TOPIC = "state"
MQTT_QOS = 1
CHECK_PERIOD_SEC = 1.0
CONNECT_TIMEOUT = 5.0
RECONNECT_DELAY_SEC = 3.0
# ===============================================
def ts3_unescape(s: str) -> str:
rep = {r"\\s":" ", r"\\p":"|", r"\\/":"/", r"\\n":"\n", r"\\r":"\r", r"\\t":"\t", r"\\v":"\v", r"\\\\":"\\"}
for k,v in rep.items(): s = s.replace(k,v)
return s
def parse_presence(payload: str, target_nick: str) -> bool:
if not payload: return False
for rec in payload.split("|"):
kv = {}
for part in rec.split():
if "=" in part:
k,v = part.split("=",1)
kv[k] = ts3_unescape(v)
if kv.get("client_nickname") == target_nick:
return True
return False
class TS3Raw:
def __init__(self, host, port, user, password, sid):
self.host, self.port, self.user, self.password, self.sid = host, port, user, password, sid
self.sock: Optional[socket.socket] = None
def _send(self, cmd: str):
self.sock.sendall((cmd.strip()+"\n").encode())
def _read_until_ok(self) -> Tuple[str, dict]:
self.sock.settimeout(CONNECT_TIMEOUT)
buf = b""
while True:
chunk = self.sock.recv(4096)
if not chunk: raise ConnectionError("TS3 socket closed")
buf += chunk
text = buf.decode("utf-8","replace")
if "\nerror " in text or text.endswith("error "):
lines = text.strip().splitlines()
if not lines or not lines[-1].startswith("error "): continue
err_line = lines[-1][6:]
err = {}
for p in err_line.split():
if "=" in p:
k,v = p.split("=",1)
err[k] = ts3_unescape(v)
payload = "\n".join(lines[:-1])
return payload, err
def connect(self):
self.sock = socket.create_connection((self.host, self.port), timeout=CONNECT_TIMEOUT)
self.sock.settimeout(1.0)
try: _ = self.sock.recv(4096)
except Exception: pass
self._send(f"login client_login_name={self.user} client_login_password={self.password}")
_, err = self._read_until_ok()
if err.get("id") != "0": raise PermissionError(f"login failed: {err}")
self._send(f"use sid={self.sid}")
_, err = self._read_until_ok()
if err.get("id") != "0": raise RuntimeError(f"use failed: {err}")
def clientlist(self) -> str:
self._send("clientlist")
payload, err = self._read_until_ok()
if err.get("id") != "0": raise RuntimeError(f"clientlist failed: {err}")
return payload
def close(self):
try:
if self.sock:
try: self._send("quit")
except Exception: pass
self.sock.close()
finally:
self.sock = None
class TS3SSH:
def __init__(self, host, port, user, password, sid):
self.host, self.port, self.user, self.password, self.sid = host, port, user, password, sid
self.ssh: Optional[paramiko.SSHClient] = None
self.chan: Optional[paramiko.Channel] = None
self.buf = ""
def connect(self):
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(self.host, port=self.port, username=self.user, password=self.password, timeout=CONNECT_TIMEOUT, allow_agent=False, look_for_keys=False)
self.chan = self.ssh.invoke_shell(width=160, height=24)
self.chan.settimeout(CONNECT_TIMEOUT)
time.sleep(0.2)
self._drain()
self._send(f"use sid={self.sid}")
self._read_until_ok()
def _send(self, cmd: str):
self.chan.send((cmd.strip()+"\n").encode())
def _drain(self):
try:
while self.chan.recv_ready():
self.buf += self.chan.recv(4096).decode("utf-8","replace")
except Exception:
pass
def _read_until_ok(self) -> Tuple[str, dict]:
deadline = time.time() + CONNECT_TIMEOUT
while time.time() < deadline:
self._drain()
if "\nerror " in self.buf:
text = self.buf
self.buf = ""
lines = text.strip().splitlines()
if not lines: continue
idx = None
for i in range(len(lines)-1, -1, -1):
if lines[i].startswith("error "):
idx = i; break
if idx is None: continue
err_line = lines[idx][6:]
err = {}
for p in err_line.split():
if "=" in p:
k,v = p.split("=",1)
err[k] = ts3_unescape(v)
payload = "\n".join(lines[:idx])
return payload, err
time.sleep(0.05)
raise TimeoutError("TS3 SSH read timeout")
def clientlist(self) -> str:
self._send("clientlist")
payload, err = self._read_until_ok()
if err.get("id") != "0": raise RuntimeError(f"clientlist failed: {err}")
return payload
def close(self):
try:
if self.chan:
try: self._send("quit")
except Exception: pass
self.chan.close()
finally:
if self.ssh:
self.ssh.close()
self.chan = None
self.ssh = None
class MqttPublisher:
def __init__(self, host, port, username, password, topic, qos=1):
self.client = mqtt.Client()
if username: self.client.username_pw_set(username, password)
self.topic, self.qos = topic, qos
self.client.connect_async(host, port, keepalive=30)
self.client.loop_start()
def publish_state(self, v:int): self.client.publish(self.topic, str(v), qos=self.qos, retain=True)
def make_ts3():
if TS3_PROTOCOL == "ssh":
return TS3SSH(TS3_HOST, TS3_QUERY_PORT, TS3_LOGIN, TS3_PASSWORD, TS3_SID)
else:
return TS3Raw(TS3_HOST, TS3_QUERY_PORT, TS3_LOGIN, TS3_PASSWORD, TS3_SID)
def main():
ts3 = make_ts3()
mqtt_pub = MqttPublisher(MQTT_HOST, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD, MQTT_TOPIC, MQTT_QOS)
last = None
while True:
try:
if isinstance(ts3, TS3Raw) and ts3.sock is None: ts3.connect()
if isinstance(ts3, TS3SSH) and ts3.chan is None: ts3.connect()
payload = ts3.clientlist()
value = 1 if parse_presence(payload, TS3_TARGET_NICK) else 0
except Exception as e:
sys.stderr.write(f"[WARN] TS3 check failed: {e}\n")
value = 0
try: ts3.close()
except Exception: pass
time.sleep(RECONNECT_DELAY_SEC)
if value != last:
mqtt_pub.publish_state(value)
last = value
time.sleep(CHECK_PERIOD_SEC)
if __name__ == "__main__":
try: main()
except KeyboardInterrupt: pass
Теперь достаточно запустить приложение в фоновом режиме следующей командой:
$ python ts3_presence_to_mqtt.py &
Как только вы подключитесь к серверу TeamSpeak с именем Test, реле щелкнет и табло незамедлительно загорится. Ну а при дисконнекте — потухнет. По умолчанию процесс подключения к серверу происходит разово, а далее оно запрашивает статус каждую секунду. При необходимости интервал можно увеличить, заменив значение переменной CHECK_PERIOD_SEC.
Что получилось
Этот небольшой DIY-проект оказался довольно интересным в реализации. Разумеется, было бы проще взять модуль с одним или несколькими реле, но и самодельный драйвер работает без каких-либо проблем. Программный код в целом тоже получился несложным — нужно было учесть, что TS3 Server Query экранирует спецсимволы, — и накидать собственный вариант «декодера». Для однозначного определения завершения любого ответа реализован парсинг до строки вида error id=[код] msg=[сообщение].
Чтобы комфортно работать с постоянным подключением к серверу, был написан отдельный класс, который занимается всем — от установки соединения с чтения баннера приветствия (при его наличии) до корректного завершения с закрытием сокета. Еще один класс — минимальная обертка над paho-mqtt с асинхронным подключением к брокеру и фоновым loop_start(), при котором клиент сам держит соединение и отправляет keepalive.
От сервера мы получаем полный ответ clientlist, который парсим с помощью функции parse_presence. Записи там идут с разделителем «|», а каждая из них по факту пара «ключ=значение» через пробел. В итоге достаточно собрать словарь полей, снять экранирование спецсимволов и проверить соответствие никнейма из «Настроек» полученному ответу. На основании этого отправить 1 или 0 по MQTT.
Ну что, как вам самоделка?
vesowoma
Обычно диод подключают параллельно обмотке реле, так чтобы штатно через диод ток не шел.
В приведенной в статье схеме диод защищать будет, но на нем впустую будет падение напряжения.