Пошаговый гайд по созданию полноценной исследовательской установки для Wi-Fi sensing с нуля
Привет, Хабр! Меня зовут Алексей, и сегодня мы переходим от теории к практике. В прошлой статье мы разобрались с концепцией Wi-Fi sensing, а сейчас я покажу, как собрать полноценную исследовательскую лабораторию в домашних условиях не дороже 9990 рублей. Статья содержит пошаговые инструкции от выбора оборудования до написания кода для детекции падений.
Что такое ESP32 и почему именно он?
Прежде чем перейти к сборке, давайте разберёмся, что такое ESP32 и почему именно эта платформа стала стандартом для IoT-проектов.
ESP32 — это микроконтроллер семейства System-on-Chip (SoC) от компании Espressif Systems, который произвёл революцию в мире любительской электроники. Представьте: на одном чипе размером с монету умещается полноценный компьютер с Wi-Fi, Bluetooth и мощным процессором.

Почему ESP32 — идеальный выбор для новичков:
1. Невысокая цена при мощных характеристиках:
Двухъядерный процессор Xtensa LX6 с частотой до 240 МГц
520 КБ SRAM и до 16 МБ Flash памяти
Wi-Fi 802.11 b/g/n и Bluetooth 4.2/BLE встроенные
Цена от 550 рублей за модуль!
2. Простота программирования: ESP32 можно программировать в привычной среде Arduino IDE, что делает его доступным даже для школьников. Не нужно изучать сложные фреймворки — весь код пишется на знакомом C++.
3. Огромное сообщество: Миллионы разработчиков по всему миру используют ESP32. Это означает море примеров кода, библиотек и готовых решений. Застряли с проблемой? Скорее всего, кто-то уже решил её и выложил решение в интернет.
Почему ESP32 идеален для Wi-Fi Sensing:
Нативная поддержка CSI (Channel State Information) — это ключевая особенность ESP32. В отличие от большинства микроконтроллеров, ESP32 может извлекать детальную информацию о состоянии Wi-Fi канала прямо из чипа. Обычно эти данные скрыты глубоко в прошивке роутера, но ESP32 предоставляет к ним прямой доступ.
Представьте CSI как «отпечатки пальцев» радиосигнала. Каждое движение в комнате, каждый новый объект оставляет свой след в этих данных. ESP32 умеет считывать эти «отпечатки» и передавать их вашему коду для анализа.
Обзор библиотек и фреймворков
Прежде чем перейти к сборке оборудования, давайте разберёмся с программной частью. Экосистема ESP32 и Python предлагает богатый выбор инструментов.
ESP32 библиотеки и инструменты:
1. ESP-CSI (GitHub)
Автор: Espressif Systems (создатели ESP32)
Что делает: Официальная библиотека для работы с CSI данными
Особенности: Поддержка всех режимов Wi-Fi, готовые примеры кода
Использование: Основа для любых проектов Wi-Fi sensing на ESP32
2. ESP32-CSI-Tool (GitHub)
Автор: Steven M. Hernandez, исследователь из США
Что делает: Исследовательский toolkit с готовыми проектами
Особенности: Активный/пассивный режимы сбора, автоматическое сохранение на SD-карту
Использование: Отличная стартовая точка для экспериментов
3. Arduino Core for ESP32 (Документация)
Автор: Espressif Systems
Что делает: Позволяет программировать ESP32 в привычной Arduino IDE
Особенности: Простой синтаксис, тысячи готовых библиотек
Использование: Для тех, кто не хочет изучать сложный ESP-IDF
Python библиотеки для анализа данных:
1. NumPy (numpy.org)
Что делает: Быстрые математические операции с массивами данных
Почему важно: CSI данные — это многомерные массивы комплексных чисел
Пример использования:
np.abs(csi_data)
для получения амплитуд сигнала
2. Matplotlib (matplotlib.org)
Что делает: Создание графиков и визуализация данных в реальном времени
Особенности: Поддержка анимации, интерактивные графики
В нашем проекте: Отображение CSI данных в реальном времени
3. Scikit-learn (scikit-learn.org)
Что делает: Машинное обучение без глубокого погружения в математику
Применение: Классификация движений, детекция аномалий (падений)
Плюсы: Простой API, отличная документация
4. PySerial (PyPI)
Что делает: Связь с ESP32 через USB/UART
Важность: Без него невозможно получать данные с микроконтроллера
Установка:
pip install pyserial
5. MQTT (Paho-MQTT) (PyPI)
Что делает: Беспроводная передача данных между устройствами
Применение: Сбор данных с нескольких ESP32 одновременно
Альтернатива: Если не хотите возиться с проводами
Список оборудования и где купить
Теперь переходим к самому интересному — закупке оборудования. Я тщательно подобрал компоненты по соотношению цена/качество, проверил наличие в российских магазинах и составил детальный список.Чтобы меня не обвинили в рекламе, я оставлю в таблице названия и примерные цены. Найти товары можно на WB и Озон или если хотите сэкономить бюджет и готовы подождать ищите на Aliexpress. Ну а если нужно очень срочно ищите оборудование в местных радиолюбительских магазинах.
Основные компоненты с актуальными ценами:
Компонент |
Количество |
Цена за шт. |
Общая стоимость |
---|---|---|---|
ESP32-WROOM-32D с внешним антенным коннектором |
3 шт |
550₽ |
1650₽ |
Антенна всенаправленная 2.4GHz 3dBi |
1 шт |
300₽ |
300₽ |
Антенна панельная секторная 10dBi |
1 шт |
1400₽ |
1400₽ |
Антенна Yagi направленная 24dBi |
1 шт |
1800₽ |
1800₽ |
Кабель IPEX-SMA 20см |
3 шт |
580₽ |
1740₽ |
Штативы для антенн регулируемые |
3 шт |
400₽ |
1200₽ |
Breadboard и провода |
1 комплект |
1200₽ |
1200₽ |
Общая стоимость: 9290₽ — уложились в бюджет!
Почему именно эти компоненты:
ESP32-WROOM-32D с внешним коннектором — обратите внимание, что нужна именно версия с возможностью подключения внешней антенны. Обычные ESP32 с встроенной антенной-змейкой не подойдут для серьёзных экспериментов.
Три типа антенн для разных задач:

Всенаправленная (3dBi) — для общего мониторинга комнаты
Панельная (10dBi) — для направленного покрытия определённого сектора
Yagi (24dBi) — для точечного наблюдения с максимальной чувствительностью
Штативы — критически важны! Антенны должны стоять устойчиво и их положение должно легко регулироваться. Не экономьте на этом.
Пошаговая сборка лаборатории
Шаг 1: Подготовка ESP32 — подключение и прошивка
Этот шаг — самый важный и одновременно самый сложный для новичков. Я подробно распишу каждое действие.
Установка среды разработки
Вариант 1: Arduino IDE (проще для новичков)
-
Скачиваем Arduino IDE
Идём на arduino.cc
Скачиваем версию 2.0 или выше
Устанавливаем как обычную программу
-
Добавляем поддержку ESP32
Открываем Arduino IDE
Идём в меню Файл → Настройки (File → Preferences)
-
В поле "Дополнительные ссылки для Менеджера плат" вставляем:
https://raw.githubusercontent.com/espressif/arduino-esp32/gh-pages/package_esp32_index.json
Нажимаем OK
-
Устанавливаем ESP32 плату
Идём в Инструменты → Плата → Менеджер плат
В поиске вводим "ESP32"
Находим "ESP32 by Espressif Systems" и нажимаем Установить
Ждём 5-10 минут (зависит от скорости интернета)
Вариант 2: ESP-IDF (для продвинутых)
ESP-IDF — это профессиональная среда разработки от Espressif. Она сложнее, но даёт больше возможностей для работы с CSI.
-
Установка через VS Code (рекомендуется)
Устанавливаем Visual Studio Code
Идём в Extensions (Ctrl+Shift+X)
Ищем "ESP-IDF" и устанавливаем расширение от Espressif
Нажимаем Ctrl+Shift+P, выбираем "ESP-IDF: Configure ESP-IDF Extension"
Выбираем Express, указываем версию ESP-IDF (рекомендую v5.1)
Нажимаем Install и ждём завершения (15-30 минут)
Подключение ESP32 к компьютеру
-
Подключение по USB
Берём качественный USB кабель (дешёвые часто не работают!)
Подключаем ESP32 к компьютеру
Устанавливаем драйверы, если Windows их не находит автоматически
-
Поиск COM-порта
Windows: Диспетчер устройств → Порты (COM и LPT) → ищем что-то вроде "Silicon Labs CP210x" или "CH340"
Linux/Mac: открываем терминал, вводим
ls /dev/tty*
и ищем/dev/ttyUSB0
или похожий
-
Проверка связи
Открываем Arduino IDE
Выбираем плату: Инструменты → Плата → ESP32 Arduino → DOIT ESP32 DEVKIT V1
Выбираем порт: Инструменты → Порт → COM3 (ваш номер может отличаться)
Открываем пример: Файл → Примеры → WiFi → WiFiScan
Нажимаем стрелочку (загрузить)
Если всё хорошо, увидим "Загрузка завершена"
Прошивка CSI библиотеки
Теперь самое интересное — загружаем специальную прошивку для работы с CSI.
Для Arduino IDE:
-
Устанавливаем ESP-CSI библиотеку
# В терминале (Windows: Git Bash) cd Documents/Arduino/libraries git clone https://github.com/espressif/esp-csi
-
Альтернативный способ:
Скачиваем ZIP с GitHub
В Arduino IDE: Скетч → Подключить библиотеку → Добавить .ZIP библиотеку
Выбираем скачанный файл
Для ESP-IDF:
# Клонируем репозиторий
git clone https://github.com/espressif/esp-csi
cd esp-csi
# Собираем и прошиваем
idf.py build
idf.py -p COM3 flash # Замените COM3 на ваш порт
Важно: При первой прошивке ESP32 может потребовать ручного входа в режим загрузки. Когда увидите "Connecting..." в консоли, зажмите кнопку BOOT на плате и держите до начала загрузки.
Подключение внешних антенн
Это самая деликатная операция — нужна аккуратность!
-
Подготовка платы
ВНИМАНИЕ: Отключите ESP32 от USB!
Найдите на плате небольшую антенну-змейку (обычно в углу)
Рядом с ней должна быть контактная площадка с надписью "IPEX" или "U.FL"
-
Отключение встроенной антенны
Аккуратно перережьте тонкую дорожку между встроенной антенной и основной платой
Альтернатива: отпаяйте резистор перемычку (если есть навыки пайки)
-
Подключение IPEX коннектора
Если коннектора нет — припаиваете его к контактной площадке
Если есть — просто подключаете кабель IPEX-SMA
Важно: не перетягивайте! Коннектор хрупкий
Шаг 2: Размещение оборудования в комнате

Правильное размещение антенн — половина успеха вашей лаборатории.
Идеальная конфигурация "треугольник":
ESP32 #1 с всенаправленной антенной:
Местоположение: Центр комнаты, высота 1.5-2 метра
Роль: Общий мониторинг активности
Особенности: Равномерное покрытие во все стороны
ESP32 #2 с панельной антенной:
Местоположение: Один из углов комнаты
Направление: На тестовую зону (где будете проводить эксперименты)
Роль: Детальный мониторинг определённой области
ESP32 #3 с Yagi антенной:
Местоположение: Противоположный угол от ESP32 #2
Направление: Узкий луч через тестовую зону
Роль: Высокочувствительная детекция мелких движений
Принципы размещения:
Избегайте металлических предметов рядом с антеннами
Высота имеет значение — чем выше, тем лучше обзор
Углы падения сигнала — располагайте антенны под разными углами
Минимум 1 метр между антеннами для избежания взаимных помех
Шаг 3: Настройка программного обеспечения
Теперь переходим к коду. Я приведу три варианта прошивки для разных уровней подготовки.
Прошивка для ESP32 (передатчик) — простая версия
Этот код превращает один ESP32 в источник сигнала. Он постоянно отправляет пакеты, которые будут принимать другие ESP32.
#include "freertos/FreeRTOS.h"
#include "esp_wifi.h"
#include "esp_log.h"
#include "esp_timer.h"
// Тег для логов (чтобы отличать сообщения этого кода)
static const char* TAG = "CSI_SENDER";
// Структура для конфигурации WiFi
wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
void wifi_init() {
// Инициализируем WiFi стек ESP32
ESP_ERROR_CHECK(esp_wifi_init(&cfg));
// Переводим в режим станции (подключается к другим точкам доступа)
ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA));
// Запускаем WiFi
ESP_ERROR_CHECK(esp_wifi_start());
ESP_LOGI(TAG, "WiFi инициализирован как передатчик");
}
void send_ping_task(void *pvParameters) {
// Простой ping-пакет (можно заменить на любые данные)
uint8_t ping_packet[] = {0x08, 0x00, 0x01, 0x02, 0x03, 0x04};
while(1) {
// Отправляем ping каждые 50ms для стабильного потока CSI
// Этот интервал можно регулировать в зависимости от задач
esp_wifi_80211_tx(WIFI_IF_STA, ping_packet, sizeof(ping_packet), false);
// Задержка между пакетами
vTaskDelay(pdMS_TO_TICKS(50)); // 50 миллисекунд
// Выводим статистику каждую секунду
static int counter = 0;
if (++counter % 20 == 0) {
ESP_LOGI(TAG, "Отправлено %d пакетов", counter);
}
}
}
void app_main() {
// Главная функция программы
ESP_LOGI(TAG, "Запуск CSI передатчика v1.0");
// Инициализируем WiFi
wifi_init();
// Создаём задачу для отправки пакетов в отдельном потоке
// Это позволяет основной программе заниматься другими делами
xTaskCreate(send_ping_task, "ping_task", 4096, NULL, 5, NULL);
ESP_LOGI(TAG, "CSI передатчик готов к работе!");
}
Что делает этот код:
Инициализирует WiFi на ESP32 в режиме станции
Каждые 50мс отправляет небольшой пакет данных
Эти пакеты будут принимать другие ESP32 и извлекать из них CSI
Выводит статистику в консоль для контроля работы
Как загрузить:
Копируете код в новый проект Arduino IDE
Выбираете правильную плату и порт
Нажимаете "Загрузить"
В Serial Monitor (Ctrl+Shift+M) увидите логи работы
Прошивка для ESP32 (приемник) — с извлечением CSI
Этот код более сложный — он принимает пакеты и извлекает из них CSI данные.
#include "freertos/FreeRTOS.h"
#include "esp_wifi.h"
#include "esp_log.h"
#include "esp_csi.h"
#include <string.h>
static const char* TAG = "CSI_RECEIVER";
// Callback функция — вызывается каждый раз при получении CSI данных
static void csi_recv_cb(void *ctx, wifi_csi_info_t *info) {
// Проверяем, что данные корректны
if (!info || !info->buf) {
ESP_LOGW(TAG, "CSI данные пусты!");
return;
}
// Получаем указатель на сырые CSI данные
// Каждый элемент — это комплексное число (реальная + мнимая часть)
int16_t *csi_data = (int16_t*)info->buf;
int data_len = info->len / 2; // Длина в комплексных числах
// Получаем текущее время для меток времени
int64_t timestamp = esp_timer_get_time() / 1000; // в миллисекундах
// Отправляем данные в компьютер в CSV формате
// Формат: CSI,время,реальная1,мнимая1,реальная2,мнимая2,...
printf("CSI,%lld", timestamp);
// Выводим все CSI данные
for(int i = 0; i < data_len; i++) {
printf(",%d", csi_data[i]);
}
printf("\n"); // Завершаем строку
// Дополнительная информация о пакете
if (info->rx_ctrl.rssi != 0) {
printf("RSSI,%lld,%d\n", timestamp, info->rx_ctrl.rssi);
}
}
void csi_init() {
// Конфигурация CSI — какие данные мы хотим получать
wifi_csi_config_t csi_config = {
.lltf_en = true, // Включить Legacy Long Training Field
.htltf_en = true, // Включить HT Long Training Field
.stbc_htltf2_en = true, // STBC HT-LTF2
.ltf_merge_en = false, // Не объединять LTF
.channel_filter_en = false, // Не фильтровать каналы
.manu_scale = false // Автоматическое масштабирование
};
// Применяем конфигурацию
ESP_ERROR_CHECK(esp_wifi_set_csi_config(&csi_config));
// Регистрируем callback функцию для получения CSI
ESP_ERROR_CHECK(esp_wifi_set_csi_rx_cb(csi_recv_cb, NULL));
// Включаем сбор CSI данных
ESP_ERROR_CHECK(esp_wifi_set_csi(true));
ESP_LOGI(TAG, "CSI инициализирован успешно");
}
void wifi_init() {
wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
ESP_ERROR_CHECK(esp_wifi_init(&cfg));
ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA));
ESP_ERROR_CHECK(esp_wifi_start());
ESP_LOGI(TAG, "WiFi инициализирован как приемник");
}
void app_main() {
ESP_LOGI(TAG, "Запуск CSI приемника v1.0");
// Инициализируем WiFi
wifi_init();
// Настраиваем CSI
csi_init();
ESP_LOGI(TAG, "CSI приемник готов! Данные будут выводиться в Serial...");
// Основной цикл — просто ждём CSI данные
while(1) {
vTaskDelay(pdMS_TO_TICKS(1000)); // Спим 1 секунду
ESP_LOGI(TAG, "Работаем... (для остановки нажмите Reset)");
}
}
Что делает этот код:
Настраивает ESP32 для приёма и обработки CSI данных
При каждом полученном Wi-Fi пакете извлекает CSI и отправляет в Serial
Данные выходят в CSV формате — легко парсить в Python
Включает временные метки для синхронизации
Как использовать:
Загружаете код на ESP32
Открываете Serial Monitor (скорость 115200)
Увидите строки вида:
CSI,1634567890,124,-56,78,-123,...
Эти данные можно сохранять в файл или обрабатывать в реальном времени
Шаг 4: Python-скрипт для сбора и анализа данных
Теперь самое интересное — пишем Python код для сбора данных с ESP32 и их анализа. Давайте для того чтобы было интереснее попробуем сразу написать простейшую систему которая будут выполнять детекцию падений человека.

Базовая система сбора данных
import serial
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import threading
import time
import json
from datetime import datetime
from scipy import signal
from sklearn.preprocessing import StandardScaler
class WiFiSensingLab:
"""
Главный класс для управления Wi-Fi Sensing лабораторией
"""
def __init__(self, port='/dev/ttyUSB0', baudrate=115200):
"""
Инициализация лаборатории
Args:
port: COM-порт для подключения ESP32 (Windows: 'COM3', Linux: '/dev/ttyUSB0')
baudrate: Скорость передачи данных (должна совпадать с ESP32)
"""
try:
self.serial_conn = serial.Serial(port, baudrate, timeout=1)
print(f"✅ Подключился к {port} со скоростью {baudrate}")
except serial.SerialException as e:
print(f"❌ Ошибка подключения к {port}: {e}")
print("Проверьте:")
print("1. Правильность порта (Windows: COM3, Linux: /dev/ttyUSB0)")
print("2. Установлены ли драйверы USB-UART")
print("3. Не занят ли порт другой программой")
raise
# Буфер для хранения последних измерений
self.csi_buffer = deque(maxlen=1000)
# Флаг для остановки сбора данных
self.is_collecting = False
# Для детекции падений
self.fall_threshold = 3.0 # Порог детекции (настраивается)
self.baseline_window = deque(maxlen=50) # Окно для базовой линии
# Статистика
self.packets_received = 0
self.start_time = time.time()
def collect_csi_data(self):
"""
Основной цикл сбора CSI данных с ESP32
Запускается в отдельном потоке
"""
print("? Начинаем сбор CSI данных...")
while self.is_collecting:
try:
# Читаем строку из Serial порта
line = self.serial_conn.readline().decode('utf-8').strip()
if not line:
continue
# Парсим данные в зависимости от типа
if line.startswith('CSI,'):
self._parse_csi_data(line)
elif line.startswith('RSSI,'):
self._parse_rssi_data(line)
else:
# Это может быть лог сообщение от ESP32
print(f"? ESP32: {line}")
except UnicodeDecodeError:
# Иногда приходят поврежденные байты — просто игнорируем
continue
except Exception as e:
print(f"❌ Ошибка чтения данных: {e}")
break
print("⛔ Сбор данных остановлен")
def _parse_csi_data(self, line):
"""
Парсинг CSI данных из строки формата: CSI,timestamp,data1,data2,...
"""
try:
# Разбиваем строку по запятым
parts = line.split(',')
if len(parts) < 3:
return # Недостаточно данных
timestamp = int(parts[1])
csi_values = [int(x) for x in parts[2:]]
# Преобразуем в numpy массив для удобства
csi_array = np.array(csi_values)
# Вычисляем амплитуду (модуль комплексного числа)
# CSI данные приходят как [real1, imag1, real2, imag2, ...]
amplitudes = []
for i in range(0, len(csi_values), 2):
if i+1 < len(csi_values):
real = csi_values[i]
imag = csi_values[i+1]
amplitude = np.sqrt(real*real + imag*imag)
amplitudes.append(amplitude)
# Средняя амплитуда по всем поднесущим
mean_amplitude = np.mean(amplitudes) if amplitudes else 0
# Сохраняем в буфер
data_point = {
'timestamp': timestamp,
'csi_raw': csi_array,
'amplitudes': np.array(amplitudes),
'mean_amplitude': mean_amplitude,
'received_at': datetime.now()
}
self.csi_buffer.append(data_point)
self.packets_received += 1
# Анализ в реальном времени
self.analyze_fall_detection()
# Статистика каждые 100 пакетов
if self.packets_received % 100 == 0:
elapsed = time.time() - self.start_time
rate = self.packets_received / elapsed
print(f"? Получено {self.packets_received} пакетов, скорость: {rate:.1f} пакетов/сек")
except (ValueError, IndexError) as e:
print(f"⚠️ Ошибка парсинга CSI: {e}, строка: {line[:50]}...")
def _parse_rssi_data(self, line):
"""
Парсинг RSSI данных (мощность сигнала)
"""
try:
parts = line.split(',')
if len(parts) >= 3:
timestamp = int(parts[1])
rssi = int(parts[2])
print(f"? RSSI: {rssi} dBm в {timestamp}")
except (ValueError, IndexError):
pass
def analyze_fall_detection(self):
"""
Простой алгоритм детекции падения на основе резких изменений CSI
"""
if len(self.csi_buffer) < 10:
return
# Получаем последние амплитуды
recent_data = list(self.csi_buffer)[-10:]
recent_amplitudes = [data['mean_amplitude'] for data in recent_data]
if not recent_amplitudes:
return
current_amplitude = recent_amplitudes[-1]
# Обновляем базовую линию (медленно адаптируется к изменениям)
if len(self.baseline_window) > 0:
baseline = np.mean(self.baseline_window)
deviation = abs(current_amplitude - baseline)
# Детекция падения: резкое изменение сигнала
if deviation > self.fall_threshold:
self.send_alert(f"Возможное падение! Отклонение: {deviation:.2f}")
# Сохраняем данные о событии для анализа
self._save_event_data("fall_detected", recent_data)
# Медленно обновляем базовую линию
self.baseline_window.append(current_amplitude)
def send_alert(self, message):
"""
Отправка уведомления о событии
"""
timestamp = datetime.now().strftime('%H:%M:%S')
print(f"? ALERT [{timestamp}]: {message}")
# Здесь можно добавить:
# - Отправку SMS через API
# - Push-уведомления
# - Запись в лог файл
# - Отправку в Telegram бот
def _save_event_data(self, event_type, data):
"""
Сохранение данных о событии для последующего анализа
"""
filename = f"event_{event_type}_{int(time.time())}.json"
# Конвертируем numpy массивы в списки для JSON
serializable_data = []
for item in data:
serializable_item = {
'timestamp': item['timestamp'],
'csi_raw': item['csi_raw'].tolist(),
'amplitudes': item['amplitudes'].tolist(),
'mean_amplitude': float(item['mean_amplitude']),
'received_at': item['received_at'].isoformat()
}
serializable_data.append(serializable_item)
with open(filename, 'w') as f:
json.dump({
'event_type': event_type,
'data': serializable_data,
'saved_at': datetime.now().isoformat()
}, f, indent=2)
print(f"? Данные события сохранены в {filename}")
def start_monitoring(self):
"""
Запуск мониторинга в реальном времени
"""
print("? Запуск Wi-Fi Sensing мониторинга...")
self.is_collecting = True
# Запускаем сбор данных в отдельном потоке
collection_thread = threading.Thread(target=self.collect_csi_data)
collection_thread.daemon = True # Поток завершится при завершении программы
collection_thread.start()
return collection_thread
def stop_monitoring(self):
"""
Остановка мониторинга
"""
print("⛔ Остановка мониторинга...")
self.is_collecting = False
if self.serial_conn and self.serial_conn.is_open:
self.serial_conn.close()
Система визуализации в реальном времени
def create_realtime_visualizer(lab):
"""
Создание системы визуализации CSI данных в реальном времени
"""
plt.ion() # Включаем интерактивный режим matplotlib
# Создаём окно с двумя графиками
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
fig.suptitle('Wi-Fi Sensing Laboratory - Real-time CSI Data', fontsize=14)
def update_plots():
"""
Обновление графиков каждые 100мс
"""
while lab.is_collecting:
try:
if len(lab.csi_buffer) > 10:
# График 1: Амплитуды CSI во времени
recent_data = list(lab.csi_buffer)[-50:] # Последние 50 точек
timestamps = [data['timestamp'] for data in recent_data]
amplitudes = [data['mean_amplitude'] for data in recent_data]
ax1.clear()
ax1.plot(timestamps, amplitudes, 'b-', linewidth=2, label='CSI Amplitude')
# Добавляем базовую линию и порог детекции
if len(lab.baseline_window) > 0:
baseline = np.mean(lab.baseline_window)
ax1.axhline(y=baseline, color='g', linestyle='--', alpha=0.7, label='Baseline')
ax1.axhline(y=baseline + lab.fall_threshold, color='r', linestyle='--', alpha=0.7, label='Fall Threshold')
ax1.axhline(y=baseline - lab.fall_threshold, color='r', linestyle='--', alpha=0.7)
ax1.set_ylabel('Amplitude')
ax1.set_title('CSI Amplitude Over Time')
ax1.legend()
ax1.grid(True, alpha=0.3)
# График 2: Спектрограмма (тепловая карта CSI по поднесущим)
if len(recent_data) >= 20:
# Берём последние 20 измерений
csi_matrix = []
for data in recent_data[-20:]:
if len(data['amplitudes']) > 0:
csi_matrix.append(data['amplitudes'])
if csi_matrix:
csi_matrix = np.array(csi_matrix).T # Транспонируем для правильного отображения
ax2.clear()
im = ax2.imshow(csi_matrix, aspect='auto', cmap='viridis', interpolation='nearest')
ax2.set_ylabel('Subcarrier')
ax2.set_xlabel('Time (samples)')
ax2.set_title('CSI Spectrogram (Amplitude per Subcarrier)')
# Добавляем цветовую шкалу
if not hasattr(update_plots, 'colorbar'):
update_plots.colorbar = plt.colorbar(im, ax=ax2)
update_plots.colorbar.set_label('Amplitude')
plt.tight_layout()
plt.pause(0.1) # Обновляем каждые 100мс
else:
time.sleep(0.1) # Ждём появления данных
except Exception as e:
print(f"Ошибка визуализации: {e}")
break
# Запускаем визуализацию в отдельном потоке
viz_thread = threading.Thread(target=update_plots)
viz_thread.daemon = True
viz_thread.start()
return viz_thread
# Использование системы
if __name__ == "__main__":
print("=== Wi-Fi Sensing Laboratory v1.0 ===")
print("Автор: Алексей")
print("Нажмите Ctrl+C для остановки")
print()
# Создаём лабораторию (замените COM3 на ваш порт)
try:
lab = WiFiSensingLab(port='COM3', baudrate=115200) # Windows
# lab = WiFiSensingLab(port='/dev/ttyUSB0', baudrate=115200) # Linux
# Запускаем мониторинг
collection_thread = lab.start_monitoring()
# Запускаем визуализацию
viz_thread = create_realtime_visualizer(lab)
print("✅ Лаборатория запущена!")
print("? Графики откроются в отдельном окне")
print("? Система детекции падений активна")
print()
# Главный цикл программы
try:
while True:
time.sleep(1)
# Можно добавить интерактивные команды
# Например, изменение порога детекции падений
except KeyboardInterrupt:
print("\n⛔ Получен сигнал остановки...")
except Exception as e:
print(f"❌ Критическая ошибка: {e}")
finally:
# Чистое завершение работы
if 'lab' in locals():
lab.stop_monitoring()
print("? Лаборатория остановлена. До свидания!")
Сбор данных с нескольких ESP32 одновременно
Одно из главных преимуществ нашей лаборатории — возможность работы с несколькими ESP32 одновременно. Вот как это сделать:
Вариант 1: Несколько USB подключений
import threading
from queue import Queue
class MultiESP32Lab:
"""
Лаборатория для работы с несколькими ESP32 одновременно
"""
def __init__(self, ports_config):
"""
ports_config: список словарей с настройками для каждого ESP32
Пример: [
{'port': 'COM3', 'name': 'ESP32_Center', 'antenna': 'omni'},
{'port': 'COM4', 'name': 'ESP32_Corner', 'antenna': 'panel'},
{'port': 'COM5', 'name': 'ESP32_Yagi', 'antenna': 'yagi'}
]
"""
self.devices = []
self.data_queue = Queue() # Общая очередь для всех данных
self.is_collecting = False
# Инициализируем подключения ко всем ESP32
for config in ports_config:
try:
device = {
'name': config['name'],
'antenna': config['antenna'],
'serial': serial.Serial(config['port'], 115200, timeout=1),
'buffer': deque(maxlen=500),
'packets_count': 0
}
self.devices.append(device)
print(f"✅ {config['name']} подключен к {config['port']}")
except Exception as e:
print(f"❌ Ошибка подключения {config['name']}: {e}")
def collect_from_device(self, device):
"""
Сбор данных с одного ESP32 в отдельном потоке
"""
while self.is_collecting:
try:
line = device['serial'].readline().decode('utf-8').strip()
if line.startswith('CSI,'):
# Парсим данные (аналогично предыдущему коду)
parts = line.split(',')
timestamp = int(parts[1])
csi_values = [int(x) for x in parts[2:]]
# Добавляем информацию об устройстве
data_point = {
'device_name': device['name'],
'antenna_type': device['antenna'],
'timestamp': timestamp,
'csi_raw': np.array(csi_values),
'received_at': time.time()
}
# Сохраняем в буфер устройства и общую очередь
device['buffer'].append(data_point)
self.data_queue.put(data_point)
device['packets_count'] += 1
except Exception as e:
print(f"Ошибка чтения с {device['name']}: {e}")
break
def start_multi_collection(self):
"""
Запуск сбора данных со всех ESP32 одновременно
"""
self.is_collecting = True
threads = []
# Создаём отдельный поток для каждого ESP32
for device in self.devices:
thread = threading.Thread(target=self.collect_from_device, args=(device,))
thread.daemon = True
thread.start()
threads.append(thread)
print(f"? Запущен сбор данных с {len(self.devices)} устройств")
return threads
def get_synchronized_data(self, time_window=1000):
"""
Получение синхронизированных данных со всех устройств
time_window: окно времени в миллисекундах для синхронизации
"""
current_time = int(time.time() * 1000)
synchronized_data = {}
for device in self.devices:
# Ищем данные в временном окне
recent_data = []
for data_point in device['buffer']:
if abs(data_point['timestamp'] - current_time) <= time_window:
recent_data.append(data_point)
synchronized_data[device['name']] = recent_data
return synchronized_data
# Использование Multi-ESP32 системы
multi_lab_config = [
{'port': 'COM3', 'name': 'ESP32_Center', 'antenna': 'omni'},
{'port': 'COM4', 'name': 'ESP32_Corner', 'antenna': 'panel'},
{'port': 'COM5', 'name': 'ESP32_Yagi', 'antenna': 'yagi'}
]
multi_lab = MultiESP32Lab(multi_lab_config)
threads = multi_lab.start_multi_collection()
# Пример обработки синхронизированных данных
while True:
sync_data = multi_lab.get_synchronized_data()
# Анализируем данные с разных антенн
for device_name, data_list in sync_data.items():
if data_list:
print(f"{device_name}: получено {len(data_list)} пакетов")
time.sleep(1)
Вариант 2: MQTT для беспроводного сбора
Если не хочется возиться с проводами, можно настроить ESP32 на отправку данных по Wi-Fi через MQTT:
Код для ESP32 (отправка по MQTT):
#include <WiFi.h>
#include <PubSubClient.h>
#include "esp_csi.h"
const char* ssid = "ВашWiFi";
const char* password = "ВашПароль";
const char* mqtt_server = "192.168.1.100"; // IP вашего компьютера
WiFiClient espClient;
PubSubClient client(espClient);
void setup() {
Serial.begin(115200);
// Подключение к WiFi
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
// Подключение к MQTT брокеру
client.setServer(mqtt_server, 1883);
// Инициализация CSI
csi_init();
}
void csi_recv_cb(void *ctx, wifi_csi_info_t *info) {
// Формируем JSON с CSI данными
String json_data = "{";
json_data += "\"device\":\"ESP32_1\",";
json_data += "\"timestamp\":" + String(millis()) + ",";
json_data += "\"csi\":[";
int16_t *csi_data = (int16_t*)info->buf;
for(int i = 0; i < info->len/2; i++) {
if(i > 0) json_data += ",";
json_data += String(csi_data[i]);
}
json_data += "]}";
// Отправляем по MQTT
client.publish("wifi_sensing/csi", json_data.c_str());
}
Python код для приёма MQTT:
import paho.mqtt.client as mqtt
import json
def on_connect(client, userdata, flags, rc):
print(f"MQTT подключен с кодом {rc}")
client.subscribe("wifi_sensing/csi")
def on_message(client, userdata, msg):
try:
# Парсим JSON данные от ESP32
data = json.loads(msg.payload.decode())
device_name = data['device']
timestamp = data['timestamp']
csi_values = data['csi']
print(f"Получены данные от {device_name}: {len(csi_values)} значений CSI")
# Здесь обрабатываем данные аналогично Serial варианту
except Exception as e:
print(f"Ошибка парсинга MQTT: {e}")
# Настройка MQTT клиента
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.loop_forever()
Этот код позволяет собирать данные с ESP32 по беспроводной сети, что упрощает размещение устройств в комнате.
Пять экспериментов для вашей лаборатории
Теперь, когда у нас есть полноценная лаборатория, вот пять интересных экспериментов разной сложности:
1. Детекция дыхания человека
Сложность: ⭐⭐☆☆☆ Оборудование: ESP32 + Yagi антенна Цель: Подсчёт частоты дыхания бесконтактно.
def breathing_detection_experiment():
"""
Эксперимент по детекции дыхания
"""
print("? === ЭКСПЕРИМЕНТ: ДЕТЕКЦИЯ ДЫХАНИЯ ===")
lab = WiFiSensingLab(port='COM3')
lab.start_monitoring()
print("? Сядьте на расстоянии 1-2 метра от Yagi антенны")
print("? Дышите спокойно и равномерно")
input("Нажмите Enter когда будете готовы...")
# Собираем данные 60 секунд
breathing_data = []
start_time = time.time()
while time.time() - start_time < 60:
if len(lab.csi_buffer) > 0:
latest = list(lab.csi_buffer)[-1]
breathing_data.append({
'timestamp': latest['timestamp'],
'amplitude': latest['mean_amplitude']
})
time.sleep(0.1) # 10 Hz
# Анализируем дыхание
timestamps = [d['timestamp'] for d in breathing_data]
amplitudes = [d['amplitude'] for d in breathing_data]
# Фильтруем шум (дыхание: 0.1-0.5 Hz)
from scipy.signal import butter, filtfilt
# Полосовой фильтр для частот дыхания
nyquist = 5 # Частота Найквиста (10 Hz / 2)
low_freq = 0.1 / nyquist # 0.1 Hz (6 вдохов/мин)
high_freq = 0.8 / nyquist # 0.8 Hz (48 вдохов/мин)
b, a = butter(4, [low_freq, high_freq], btype='band')
filtered_signal = filtfilt(b, a, amplitudes)
# Подсчёт пиков (вдохов)
from scipy.signal import find_peaks
peaks, _ = find_peaks(filtered_signal, distance=10, prominence=0.1)
breathing_rate = len(peaks) / 60 * 60 # вдохов в минуту
print(f"? Обнаружено {len(peaks)} вдохов за 60 секунд")
print(f"? Частота дыхания: {breathing_rate:.1f} вдохов/мин")
print(f"? Норма: 12-20 вдохов/мин")
# Визуализация
plt.figure(figsize=(12, 6))
plt.subplot(2, 1, 1)
plt.plot(amplitudes, label='Сырой сигнал')
plt.title('CSI Amplitude')
plt.legend()
plt.subplot(2, 1, 2)
plt.plot(filtered_signal, label='Отфильтрованный сигнал')
plt.plot(peaks, filtered_signal[peaks], 'ro', label=f'Вдохи ({len(peaks)})')
plt.title('Детекция дыхания')
plt.legend()
plt.show()
2. Мониторинг сна
Сложность: ⭐⭐⭐☆☆ Оборудование: ESP32 + панельная антенна над кроватью Цель: Анализ качества сна и фаз
3. Система "умный дом"
Сложность: ⭐⭐⭐⭐☆ Оборудование: 3-5 ESP32 в разных комнатах Цель: Автоматизация освещения и климата
4. Детекция нескольких человек
Сложность: ⭐⭐⭐⭐⭐ Оборудование: Все антенны + машинное обучение Цель: Подсчёт и трекинг до 5 человек одновременно
5. Анализ походки
Сложность: ⭐⭐⭐☆☆ Оборудование: ESP32 + всенаправленная антенна Цель: Идентификация человека по походке
Заключение и дальнейшее развитие
Поздравляю! Вы создали полноценную исследовательскую лабораторию Wi-Fi sensing всего за 9290 рублей. Эта установка позволяет проводить серьёзные эксперименты и даже публиковать научные работы.
Что вы получили:
✅ Аппаратную платформу с тремя типами антенн
✅ Программное обеспечение для сбора и анализа данных
✅ Алгоритмы машинного обучения для классификации движений
✅ Систему детекции падений с практическим применением
✅ Базу для дальнейших экспериментов
Направления развития:
1. Edge AI на ESP32 Современные ESP32-S3 могут запускать небольшие нейросети прямо на борту. Это открывает возможности для создания полностью автономных систем мониторинга.
2. Mesh-сети Объединив десятки ESP32 в mesh-сеть, можно создать систему мониторинга целого здания или района.
3. Интеграция с IoT платформами Подключение к Home Assistant, Google Home или Apple HomeKit для создания по-настоящему умного дома.
4. Медицинские применения Система может быть адаптирована для мониторинга пожилых людей, пациентов с деменцией или людей с ограниченными возможностями.
Делитесь результатами экспериментов в комментариях! Особенно интересно увидеть:
Ваши конфигурации антенн
Результаты экспериментов
Модификации алгоритмов
Новые области применения
До новых встреч в мире Wi-Fi sensing! ?
P.S. Для скорости написания стати я выбрал примеры коды из разных своих проектов, что-то на ходу дописал, но не проверил работу кода в боевом режиме. Принципиально весь код верный, но где-то мог допустить ошибки и опечатки, так как все таки объем приведенного кода здесь достаточно большой. Заранее прошу сообщество меня извинить.
9a75sd
Есть еще идея: детекция присутствия в комнате, при этом отличая человека от кошки
+взять, например, ESP32S3, и собрать проект с поддержкой USB-хабов, и собирать эти данные. Поддержку хабов вроде как завезли в ESP-IDF v5.5