
В предыдущей статье я начал разработку open-source веб-приложения для стриминга видео с веб-камеры и управления роботом. Написал фронтенд, который принимает видеопоток от mjpg_streamer, а также отправляет команды через WebSocket на бэкенд, написанный на FastAPI.
Во второй части я расскажу, как отправлять команды роботу с WebSocket-бэкенда. Мой робот работает на плате Orange Pi Zero, передавая и принимая информацию через Wi-Fi. Я покажу, как настроить сервер Nginx на роботе в качестве обратного прокси, а также напишу Python-код для приёма команд с веб-приложения.
Статья будет полезна любителям робототехники и веб-программистам, интересующимся фреймворком FastAPI. Я продемонстрирую работу с несколькими WebSocket-соединениями в одном веб-приложении, а также покажу настройку Orange Pi Zero для работы.
Настройка одноплатника
В роли одноплатника выступит плата Orange pi zero. Это небольшой компьютер, на который может быть установлен консольный Linux Armbian или другой совместимый с ней.
Armbian — это оптимизированная сборка Linux, предназначенная для одноплатных компьютеров (SBC). Она обеспечивает стабильность и производительность на различных ARM-устройствах.
Он основан на Debian, потому что этот дистрибутив предоставляет надежную, гибкую основу с хорошей поддержкой для работы с серверными и встраиваемыми системами. Armbian адаптирует Debian, добавляя специфические оптимизации, драйверы и инструменты для удобного управления SBC.
Пример первоначальной настройки можно найти в статье Orange Pi Zero: Установка и настройка системы. Несмотря на то, что статье уже 8 лет, я вполне смог настроить Orange pi zero, опираясь на информацию оттуда.
Как это бывает обычно, я сделал некоторые вещи по-своему. Например, установил Armbian с помощью Raspberry Pi Imager. Впрочем, способ установки не имеет особого значения — главное, чтобы операционная система была установлена и готова к работе.
После установки ОС на microSD-карту (в моём случае объёмом 8 ГБ) её нужно вставить в Orange Pi и подключить плату к блоку питания. Я использую блок питания от Raspberry Pi с выходными параметрами 5.1V 2.5A.
Далее я подключил плату к сетевому кабелю в роутере и подал питание. В настройках роутера я нашёл Orange Pi как сетевое устройство и определил его IP-адрес, который нужен для подключения к одноплатнику по SSH через консоль.
Подключение к Armbian по ssh:
ssh root@ip_orange_pi
После успешного подключения появится информация об Armbian:

На скриншоте отображена системная информация, включая температуру процессора (CPU temp: 58 °C) и другие параметры.
Затем я выполнил первоначальную настройку платы. Пример настройки можно найти в статье, которую я прикрепил выше. Самое важное — настроить Wi-Fi и статический IP-адрес для устройства.
Дополнительно рекомендую настроить DNS. Без корректно настроенных DNS-серверов устройство не сможет преобразовывать доменные имена (например, example.com
) в IP-адреса, необходимые для связи с серверами в интернете.
Команда настройки DNS:
sudo nano /etc/systemd/resolved.conf
Содержимое resolved.conf
:
DNS=8.8.8.8 1.1.1.1
FallbackDNS=8.8.4.4 1.0.0.1
Я использовал DNS от GOOGLE, но можно использовать DNS от Яндекс — тут дело вкуса.
Перезапуск службы для разрешения доменных имён и команда для проверки пинга:
sudo systemctl restart systemd-resolved
ping -c 5 google.com
Результат выполнения команды ping
(часть информации):
--- google.com ping statistics ---
5 packets transmitted, 5 received, 0% packet loss, time 4005ms
Разбор информации:
5 packets transmitted — отправлено 5 пакетов;
5 received — все 5 пакетов успешно получены обратно;
0% packet loss — нет потерь пакетов, соединение стабильное;
time 4005ms — общее время теста.
Установка git
для работы с репозиторием утилиты mjpg_streamer
:
sudo apt install git-core
Теперь нужно установить все утилиты, зависимости для работы с видеопотоком и утилиту для видеострима mjpg_streamer
. Вся информация по установке есть в моей предыдущей статье.
После установки выключаю Orange pi:
sudo shutdown -h now
Далее подключаю веб-камеру к usb Orange pi, вытаскиваю интернет-кабель и подаю питание на блок питания, чтобы включить её.
На этот раз уже подключаюсь через Wi-Fi, используя статический ip:
ssh root@ip_orange_pi
Настройка виртуального окружения:
sudo apt install python3-dev -y
sudo apt install -y python3-venv
Разбор команд:
sudo apt install python3-dev -y
— устанавливает заголовочные файлы Python, необходимые для сборки некоторых зависимостей;sudo apt install -y python3-venv
— устанавливает модульvenv
, который позволяет создавать виртуальные окружения.
python3 -m venv venv_robot
source venv_robot/bin/activate
Разбор команд:
python3 -m venv venv_robot
— создаёт виртуальное окружение venv_robot, изолируя зависимости проекта;source venv_robot/bin/activate
— активирует виртуальное окружение, чтобы пакеты устанавливались только внутри него.
pip install websockets
Разбор команды:
pip install websockets
— загружает библиотеку websockets для работы с WebSocket в Python.
Проверка видеопотока
Проверяю, видит ли система камеру как устройство:
ls /dev/video*
Сиcтема показываeт два устройства, связанные с моей веб-камерой:
/dev/video0 /dev/video1
Далее я решил не проводить большое количество тестов для камеры — их я уже делал в предыдущей статье, когда проверял работу на ПК. Дело в том, что на консольном Linux нет возможности сразу увидеть видео — там нет графической оболочки. Можно сохранить видео в файл, но зачем, если можно сразу запустить видеострим и проверить видеопоток на ПК, подключившись к URL видеострима.
sudo mjpg_streamer -i "input_uvc.so -d /dev/video0 -r 640x480 -f 15 -q 80" -o "output_http.so -p port_number -w /usr/local/share/mjpg-streamer/www" &
Проверка на ПК браузера:
http://ip_orange_pi:port_number/?action=stream
Я проверял подключение как через кабель, так и через Wi-Fi, и результат оказался разным. При подключении по Ethernet видео 640×480 транслировалось без задержек и подвисаний. При использовании Wi-Fi нагрузка была слишком высокой, что приводило к задержкам и подвисаниям при передаче видео.
Есть много факторов, влияющих на качество видеопотока:
Сила сигнала и потеря пакетов — препятствия в помещении ухудшают связь.
Пропускная способность и интерференция — другие беспроводные устройства могут создавать помехи для Wi-Fi одноплатного компьютера.
Обработка пакетов — данные по Ethernet передаются напрямую, тогда как Wi-Fi использует радиоканалы, требует подтверждения пакетов и загружен служебным трафиком.
Для устранения задержек можно уменьшить размер видео. У камеры есть режим 320×240, который снижает объём передаваемых данных, что должно минимизировать лаги.
Запускаю видеопоток с разрешением 320×240:
sudo mjpg_streamer -i "input_uvc.so -d /dev/video0 -r 320x240 -f 15 -q 80" -o "output_http.so -p port_number -w /usr/local/share/mjpg-streamer/www" &
Обновляю страницу в браузере. Это помогло — картинка перестала обновляться с задержкой, исчезли лаги. Сначала я переживал, что размер видео стал меньше, но оказалось, что многие подобные программы работают именно в таком низком разрешении из-за ограничений Wi-Fi и мощности устройств.
Обратный прокси на nginx
Теперь перейду к процессу взаимодействия робота и веб-приложения. У меня было два пути:
Запуск веб-приложения на самой плате Orange Pi с выполнением команд робота прямо в его бэкенде;
Запуск веб-приложения на ПК или VPS, связь с роботом через обратный прокси на Nginx, а затем выполнение команд с помощью сервиса на Python.
Первый вариант сильно нагружает ресурсы Orange Pi. Обработка видеопотока уже требует значительных вычислительных мощностей. Кроме того, я хочу расширять функционал веб-приложения, не снижая производительность робота.
Второй вариант снимает нагрузку с Orange Pi, оставляя его ресурсы для более важных задач. Поэтому он лучше подходит под мои цели и позволяет запускать веб-приложение как на ПК, так и на VPS.
Nginx — это высокопроизводительный веб-сервер и обратный прокси, который эффективно обрабатывает множество одновременных подключений и распределяет нагрузку.
Установка nginx:
sudo apt install nginx -y
Проверка статуса nginx:
sudo systemctl status nginx
Результат команды:
○ nginx.service - A high performance web server and a reverse proxy server
Loaded: loaded (/lib/systemd/system/nginx.service; enabled; preset: enabled)
Active: inactive (dead)
Docs: man:nginx(8)
Nginx сейчас не работает — статус inactive (dead) означает, что служба загружена в систему (Loaded), но не запущена.
Запуск службы:
sudo systemctl start nginx
Проверка статуса nginx:
sudo systemctl status nginx
Результат команды:
● nginx.service - A high performance web server and a reverse proxy server
Loaded: loaded (/lib/systemd/system/nginx.service; enabled; preset: enabled)
Active: active (running) since Tue 2025-05-20 23:51:07 MSK; 2min 22s ago
Docs: man:nginx(8)
Process: 1480 ExecStartPre=/usr/sbin/nginx -t -q -g daemon on; master_process on; (code=exited, status=0/SUCCESS)
Process: 1485 ExecStart=/usr/sbin/nginx -g daemon on; master_process on; (code=exited, status=0/SUCCESS)
Main PID: 1486 (nginx)
Tasks: 5 (limit: 882)
Memory: 2.7M
CPU: 150ms
CGroup: /system.slice/nginx.service
├─1486 "nginx: master process /usr/sbin/nginx -g daemon on; master_process on;"
├─1487 "nginx: worker process"
├─1488 "nginx: worker process"
├─1489 "nginx: worker process"
└─1490 "nginx: worker process"
Теперь Active: active (running) служба для nginx запущена.
Если ввести на ПК http://ip_orange_pi (заменить ip_orange_pi на реальный ip устройства), то можно увидеть сообщение от nginx:

Настало время настроить обратный прокси для перенаправления видеострима. Для этого нужно создать конфигурационный файл.
Команда для его создания в nano: reverse-proxy:
sudo nano /etc/nginx/sites-available/reverse-proxy
Внутри конфига:
server {
listen 80;
server_name ip_orange_pi;
location /stream {
proxy_pass http://localhost:port_number/?action=stream;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
Разбор конфига:
listen 80;
— сервер принимает запросы на 80-й порт (стандартный HTTP).server_name ip_orange_pi;
— заменитьip_orange_pi
на реальный ip Orange pi, по которому будут подключаться клиенты;location /stream
— запросы, начинающиесяс /stream
, будут перенаправлены через прокси;proxy_pass http://localhost:port_number/?action=stream;
— входящие запросы проксируются к локальному сервису на портуport_number
(заменить на нужный порт), где MJPG-streamer транслирует видео.proxy_set_header
— передает заголовки запроса от клиента в бэкенд, обеспечивая корректную обработку IP-адресов и протокола.
Как это работает:
Пользователь открывает http://ip_orange_pi/stream;
Nginx принимает запрос на 80-й порт и перенаправляет его на порт
port_number
(ввести нужный например 8070), где идёт видеопоток;Браузер отображает видео без необходимости напрямую указывать порт.
Активация конфигурации:
sudo ln -s /etc/nginx/sites-available/reverse-proxy /etc/nginx/sites-enabled/
sudo systemctl restart nginx
Создание символической ссылки:
ln -s
— создаёт символическую ссылку вместо копирования файла;/etc/nginx/sites-available/reverse-proxy
— конфигурационный файл прокси;/etc/nginx/sites-enabled/
— активная папка, куда загружается конфиг.
sudo systemctl restart nginx
— перезапуск nginx:
Полностью перезапускает веб-сервер, чтобы применить новые настройки;
Запускает прокси с новым конфигом и проверяет ошибки.
Проверка:
sudo nginx -T | grep reverse-proxy
Результат:
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
# configuration file /etc/nginx/sites-enabled/reverse-proxy:
nginx: configuration file /etc/nginx/nginx.conf test is successful
Разбор результата:
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
— nginx проверил конфигурационный файл и не обнаружил ошибок в синтаксисе;# configuration file /etc/nginx/sites-enabled/reverse-proxy:
— был загружен конфигурационный файл reverse-proxy, который был ранее написан;nginx: configuration file /etc/nginx/nginx.conf test is successful
— проверка конфига успешна.
Теперь видеострим можно проверить в веб-браузере на ПК.
Ввожу для проверки http://ip_orange_pi/stream (заменить ip_orange_pi на реальный ip устройства):

Изначально настраивал на /, а не /stream
, поэтому URL на картинке выглядит короче, но это не влияет ни на что, кроме адреса. Картинка заметно уменьшилась, зато теперь нет задержек.
Код робота
Структура проекта:
ROBOT-PI-SERVICE
├── .vscode
├── robot_pi_service
│ ├── _init_.py
│ ├── robot_pi_service.py
│ ├── settings.py
├── .env
├── .env.example
├── .gitignore
├── LICENSE
├── Makefile
├── README.md
├── requirements.txt
Разбор основных элементов структуры:
robot_pi_service.py
— сервис, который будет принимать команды и выполнять их;settings.py
— настройки для проекта;Makefile
— тут будут храниться команды запуска для сервиса;
Файл документации README.md:
# Robot-pi-service
**Robot-pi-service** - сервис для управления роботом на Orange Pi, работающий на Linux Armbian.
**Запуск сервиса (как проекта python3)** - `make run`
<details>
<summary><strong>Как оформлять ветки и коммиты</strong></summary>
Пример ветки `user_name/name_task`
- **user_name** (имя пользователя);
- **name_task** (название задачи).
Пример коммита `refactor: renaming a variable`
- **feat:** (новая функциональность кода, без учёта функционала для сборок);
- **devops:** (функционал для сборки — добавление, удаление и исправление);
- **fix:** (исправление ошибок функционального кода);
- **docs:** (изменения в документации);
- **style:** (форматирование, отсутствующие точки с запятой и т. п., без изменения производственного кода);
- **refactor:** (рефакторинг производственного кода, например, переименование переменной);
- **test:** (добавление недостающих тестов, рефакторинг тестов — без изменения производственного кода);
- **chore:** (обновление рутинных задач и т. д. — без изменения производственного кода).
Оформление основано на [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/)
</details>
Файл документации содержит описание проекта, команду запуска, а также правила оформления веток и коммитов.
Для удобства восприятия весь код будет оформлен в виде отдельных блоков, расположенных в файле сверху вниз по порядку.
settings.py
from pydantic_settings import BaseSettings, SettingsConfigDict
Разбор импортов:
BaseSettings
— базовый класс для управления настройками через .env файлы или переменные окружения;SettingsConfigDict
— позволяет задавать конфигурацию модели, например, путь к.env
и кодировку.
class ModelConfig(BaseSettings):
"""Модель конфига"""
model_config = SettingsConfigDict(
env_file='.env',
env_file_encoding='utf-8',
extra='ignore'
)
Разбор кода:
Наследуется от
BaseSettings
— позволяет автоматически загружать переменные из.env
или окружения;-
Использует
SettingsConfigDict
— управляет настройками, включая:—
env_file='.env'
— загружает переменные из.env
файла;—
env_file_encoding='utf-8'
— поддерживает UTF-8 кодировку;—
extra='ignore'
— игнорирует неизвестные переменные, избегая ошибок.
class CommandsRobot(ModelConfig):
"""Класс с командами для робота"""
forward: str
backward: str
left: str
right: str
Разбор кода:
Наследуется от
ModelConfig
, значит, загружает значения из.env
;Определяет команды робота в виде строк
(str)
, которые могут быть загружены из конфигурации:
—forward
— команда движения вперёд;
—backward
— команда движения назад;
—left
— команда поворота налево;
—right
— команда поворота направо.
class Settings(ModelConfig):
"""Класс для данных конфига"""
websocket_host: str
websocket_port: int
commands_robot: CommandsRobot = CommandsRobot()
settings = Settings()
Разбор кода:
Наследуется от
ModelConfig,
значит, подгружает.env
;Хранит основные параметры сервиса:
—websocket_host: str
— адрес WebSocket-сервера;
—websocket_port: int
— порт WebSocket-сервера;
—commands_robot: CommandsRobot
— вложенный класс команд для управления роботом;
—settings = Settings()
— создаёт объектsettings
, доступный для использования в коде.
robot_pi_service.py
import asyncio
from websockets import serve, exceptions
from websockets.legacy.server import WebSocketServerProtocol
from settings import settings
Разбор импортов:
asyncio
— стандартная библиотека Python для асинхронного программирования.serve
— функция для создания WebSocket-сервера, который слушает входящие подключения.exceptions
— набор исключений, связанных с WebSocket, например разрыв соединения или невалидное сообщение.WebSocketServerProtocol
— класс, описывающий протокол WebSocket для управления соединениями.settings:
— Загружает конфигурацию из файлаsettings.py
, содержащего параметры, такие какwebsocket_host
иwebsocket_port
;
— Позволяет использовать команды для робота, например,settings.commands_robot.forward
.
async def robot_control_gpio(websocket: WebSocketServerProtocol):
"""
Асинхронная функция для управлением gpio робота (через websocket)
"""
try:
while True:
try:
command = await asyncio.wait_for(websocket.recv(), timeout=30.0)
except asyncio.TimeoutError as err:
print('Таймаут ожидания команды от клиента')
continue # продолжение цикла, чтобы не закрывать соединение
match command:
case settings.commands_robot.forward:
action = 'Робот едет вперёд'
print(action)
await websocket.send(message=action)
case settings.commands_robot.backward:
action = 'Робот едет назад'
print(action)
await websocket.send(message=action)
case settings.commands_robot.left:
action = 'Робот едет налево'
print(action)
await websocket.send(message=action)
case settings.commands_robot.right:
action = 'Робот едет направо'
print(action)
await websocket.send(message=action)
except exceptions.ConnectionClosed:
pass
except (exceptions.ConnectionClosedOK, exceptions.InvalidMessage, exceptions.InvalidState) as err:
message_err = f'{err.__class__.__name__}: {err}'
print(message_err)
await websocket.send(message=message_err)
except Exception as err:
message_err = f'{err.__class__.__name__}: {err}'
print(message_err)
await websocket.send(message=message_err)
Разбор кода:
Асинхронная функция
robot_control_gpio
:
— Ожидает WebSocket-соединение (websocket: WebSocketServerProtocol
);
— Работает в бесконечном цикле, ожидая команды от клиента.
Получение команды от клиента:
— Используетawait websocket.recv()
для получения данных;
— Устанавливает таймаут 30 секунд(asyncio.wait_for)
;
— Если таймаут истекает, цикл продолжается без закрытия соединения.
Обработка команд (
match command
):
—forward
→ Робот едет вперёд;
—backward
→ Робот едет назад;
—left
→ Робот поворачивает налево;
—right
→ Робот поворачивает направо;
— Каждую команду отправляет обратно клиенту (await websocket.send
).
Обработка ошибок:
—ConnectionClosed
→ Игнорируется без действий;
—ConnectionClosedOK
,InvalidMessage
,InvalidState
→ Логирует ошибку и отправляет сообщение клиенту;
— Общий обработчик (Exception
) → Ловит любые неожиданные ошибки и логирует их.
async def start():
"""Асинхронная функция запуска вебсокета и бесконечного цикла"""
async with serve(handler=robot_control_gpio, host=settings.websocket_host, port=settings.websocket_port):
# бесконечный цикл
await asyncio.Future()
Разбор кода:
async def start():
— определяет асинхронную функциюstart
, которая запускает WebSocket-сервер.async with serve(handler=robot_control_gpio, host=settings.websocket_host, port=settings.websocket_port)
:
— Создаёт WebSocket-сервер, используяserve
;
—handler=robot_control_gpio
— указывает функцию-обработчик входящих сообщений;
—host=settings.websocket_host
— задаёт адрес сервера из настроек;
—port=settings.websocket_port
— устанавливает порт для подключения.
await asyncio.Future()
:
— Запускает бесконечный цикл, чтобы сервер продолжал работать;
—asyncio.Future()
— создаёт объект, который никогда не завершается, удерживая сервер активным.
def run_app():
"""Функция старата приложения"""
try:
asyncio.run(start())
except KeyboardInterrupt:
pass
Разбор кода:
def run_app():
— функция запускает приложение, вызывая asyncio.run(start());try:
— перехватывает возможные ошибки при запуске;except KeyboardInterrupt:
— позволяет корректно завершить работу при нажатии Ctrl+C.
if __name__ == '__main__':
run_app()
Разбор кода:
if name == '__main__':
— проверяет, что скрипт запущен напрямую, а не импортирован;run_app()
— запускает приложение.
robot_pi_service.py
является сервисом, потому что он запускается как WebSocket-сервер, постоянно работает в фоне и обрабатывает команды управления роботом в реальном времени.
Makefile
run:
chmod +x robot_pi_service/robot_pi_service.py
python3 robot_pi_service/robot_pi_service.py
Разбор файла:
Makefile
содержит правилоrun
, которое запускает сервис робота;chmod +x robot_pi_service/robot_pi_service.py
– делает файлrobot_pi_service.py
исполняемым, чтобы можно было запустить его напрямую;python3 robot_pi_service/robot_pi_service.py
– выполняет основной скрипт сервис, который управляет роботом через WebSocket.
Ссылка на получившийся в итоге open-source проект robot-pi-service.
Код веб-приложения
Речь пойдёт о веб-приложении web-robot_control, которое я начал писать в предыдущей статье. Я буду говорить лишь о новых частях кода.
Обновлённый settings.py
class CommandsRobot(ModelConfig):
"""Класс с командами для робота"""
forward: str
backward: str
left: str
right: str
def get_list_commands(self):
"""Метод вернёт список всех команд"""
return list(self.model_dump().values())
Код своей структурой повторяет аналогичный из robot_pi_service кроме метода get_list_commands
.
Разбор нового кода:
get_list_commands()
— вернёт все команды из класса CommandsRobot в виде списка;return list(self.model_dump().values())
— создаст словарь с атрибутами класса, а затем вернёт все значения в списке.
class Settings(ModelConfig):
"""Класс для данных конфига"""
stream_url: str
websocket_url_robot: str
commands_robot: CommandsRobot = CommandsRobot()
settings = Settings()
Разбор нового кода:
websocket_url_robot: str
— url для вебсокета робота;commands_robot: CommandsRobot = CommandsRobot()
:
—commands_robot: CommandsRobot
— объявляет атрибут классаSettings
, который должен быть объектомCommandsRobot
;
—= CommandsRobot()
— создаёт новый экземпляр классаCommandsRobot
и сразу назначает его этому атрибуту;
— Теперьsettings.commands_robot
будет содержать объект с командами для управления роботом.
Обновлённый views.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, WebSocketException
from fastapi.requests import Request
from fastapi.responses import HTMLResponse, Response
from fastapi.templating import Jinja2Templates
from websockets import exceptions, connect
import asyncio
import socket
from web_robot_control.settings import settings
Разбор новых импортов:
WebSocketDisconnect
— перехватывает событие разрыва WebSocket-соединения, например, если клиент отключился;WebSocketException
— обрабатывает ошибки, связанные с WebSocket, например неверный формат сообщений;exceptions
— содержит набор исключений WebSocket, таких как неверный URI или закрытие соединения;connect
— используется для установки WebSocket-соединения с удалённым сервером;asyncio
— библиотека для работы с асинхронным кодом, позволяет управлять задачами и потоками;socket
— модуль для низкоуровневой работы с сетевыми соединениями, включая WebSocket.
async def command_to_robot(command: str) -> str:
"""Асинхронная функция для отправки команды роботу через websockets"""
try:
async with connect(settings.websocket_url_robot) as robot_ws:
await robot_ws.send(command)
response = await robot_ws.recv()
return response
# Todo: для каждой ошибки написать своё сообщение
except (
exceptions.InvalidURI,
asyncio.TimeoutError,
exceptions.ConnectionClosedError,
exceptions.ConnectionClosedOK,
exceptions.InvalidHandshake,
ConnectionRefusedError,
socket.gaierror,
exceptions.InvalidMessage
) as err:
return f'{err.__class__.__name__}: {err}'
Разбор кода:
объявляет асинхронную функцию для отправки команды роботу через websockets;
устанавливает соединение с роботом по веб-сокету с помощью
connect(settings.websocket_url_robot)
;отправляет команду роботу через
robot_ws.send(command)
;получает ответ от робота
robot_ws.recv()
и возвращает его;перехватывает возможные ошибки, включая неверный uri, таймаут, закрытие соединения, отказ в подключении;
если возникает ошибка, возвращает её класс и описание в виде строки.
@router.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket) -> None:
# Установка соединения по веб-сокету
await websocket.accept()
try:
while True:
# Получение команды от клиента (с веб-сокета)
command = await websocket.receive_text()
valid_commands = settings.commands_robot.get_list_commands()
if command in valid_commands:
# Отправка команды роботу
robot_answer = await command_to_robot(command=command)
if robot_answer:
# Отправка ответа робота на веб-сокет фронтенда
await websocket.send_text(f'Получена команда: {command}, ответ робота: {robot_answer}')
print(f'Ответ робота: {robot_answer}')
except WebSocketDisconnect:
print('WebSocket отключен') # Todo: для вывода ошибок будет настроен logger
# Todo: для каждой ошибки написать своё сообщение
except (WebSocketException, exceptions.InvalidMessage) as err:
print(f'{err.__class__.__name__}: {err}')
Я убрал строку кода async with httpx.AsyncClient() as client:
, так как использую websocket для связи с роботом, а не http-запрос. Запросы http имеют большую задержку чем работа по вебсокет. Это критично для команды робота, которая должна срабатывать вовремя.
Разбор нового кода:
valid_commands = settings.commands_robot.get_list_commands()
— загружает список доступных команд робота из конфигурации;robot_answer = await command_to_robot(command=command)
— отправляет команду роботу и получает ответ через WebSocket;if robot_answer:
— проверяет, получил ли сервер ответ от робота перед отправкой клиенту;await websocket.send_text(f'Получена команда: {command}, ответ робота: {robot_answer}')
— отправляет ответ робота обратно на клиентский WebSocket;except (WebSocketException, exceptions.InvalidMessage) as err:
— перехватывает ошибки, связанные с WebSocket, например невалидные сообщения или неожиданные разрывы соединения;print(f'{err.__class__.__name__}: {err}')
— логирует ошибку, выводя её класс и описание.
websockets
используется вместе с WebSocket
из FastAPI, потому что FastAPI обрабатывает только серверную часть WebSocket, а для создания WebSocket-клиента** требуется websockets.connect()
. Это позволяет приложению не только принимать соединения от фронтенда, но и отправлять команды роботу по WebSocket.
Ссылка на получившийся в итоге open-source проект web-robot-control.
Веб-приложение в действии
Теперь настало время проверить приходят ли команды роботу и его ответ обратно на веб-приложение.
Запуск сервиса робота (из терминала Armbian на Orange pi):
source venv_robot/bin/activate
make run
Запуск веб-приложения (из терминала ПК):
poetry run uvicorn web_robot_control.main:app
Окно веб-приложения теперь похоже на ретро консоль:

При нажатии на кнопки получаю ответ от робота:

Веб-приложение успешно вывело видеопоток, а также передало команды роботу и получило от него ответ через WebSocket. В конце я отключил веб-приложение комбинацией клавиш Ctrl + C.
Заключение и планы на будущее
Я настроил плату Orange Pi Zero для работы. Настроил обратный прокси для видеопотока на Nginx. Написал код для робота, который принимает команды и отправляет ответы веб-приложению. Также написал код веб-приложения, которое передаёт команды роботу и получает его ответы. Робот успешно обработал команду и отправил ответ веб-приложению.
В следующей статье я планирую научить сервис робота управлять портами GPIO платы Orange Pi. GPIO — это управляющие пины на плате, которые могут взаимодействовать с датчиками и моторами. Также хочу подключить светодиоды, протестировать их управление и настроить задержку нажатия. Кроме того, я планирую использовать закодированные команды вместо передачи их в виде обычного текста.
Если у вас есть идеи по развитию проекта, поделитесь ими в комментариях — буду рад услышать ваши предложения!
Автор статьи @Arduinum
НЛО прилетело и оставило здесь промокод для читателей нашего блога:
— 15% на заказ любого VDS (кроме тарифа Прогрев) — HABRFIRSTVDS.