Пошаговый гайд по созданию полноценной исследовательской установки для Wi-Fi sensing с нуля

Привет, Хабр! Меня зовут Алексей, и сегодня мы переходим от теории к практике. В прошлой статье мы разобрались с концепцией Wi-Fi sensing, а сейчас я покажу, как собрать полноценную исследовательскую лабораторию в домашних условиях не дороже 9990 рублей. Статья содержит пошаговые инструкции от выбора оборудования до написания кода для детекции падений.

Что такое ESP32 и почему именно он?

Прежде чем перейти к сборке, давайте разберёмся, что такое ESP32 и почему именно эта платформа стала стандартом для IoT-проектов.

ESP32 — это микроконтроллер семейства System-on-Chip (SoC) от компании Espressif Systems, который произвёл революцию в мире любительской электроники. Представьте: на одном чипе размером с монету умещается полноценный компьютер с Wi-Fi, Bluetooth и мощным процессором.

Почему ESP32 — идеальный выбор для новичков:

1. Невысокая цена при мощных характеристиках:

  • Двухъядерный процессор Xtensa LX6 с частотой до 240 МГц

  • 520 КБ SRAM и до 16 МБ Flash памяти

  • Wi-Fi 802.11 b/g/n и Bluetooth 4.2/BLE встроенные

  • Цена от 550 рублей за модуль!

2. Простота программирования: ESP32 можно программировать в привычной среде Arduino IDE, что делает его доступным даже для школьников. Не нужно изучать сложные фреймворки — весь код пишется на знакомом C++.

3. Огромное сообщество: Миллионы разработчиков по всему миру используют ESP32. Это означает море примеров кода, библиотек и готовых решений. Застряли с проблемой? Скорее всего, кто-то уже решил её и выложил решение в интернет.

Почему ESP32 идеален для Wi-Fi Sensing:

Нативная поддержка CSI (Channel State Information) — это ключевая особенность ESP32. В отличие от большинства микроконтроллеров, ESP32 может извлекать детальную информацию о состоянии Wi-Fi канала прямо из чипа. Обычно эти данные скрыты глубоко в прошивке роутера, но ESP32 предоставляет к ним прямой доступ.

Представьте CSI как «отпечатки пальцев» радиосигнала. Каждое движение в комнате, каждый новый объект оставляет свой след в этих данных. ESP32 умеет считывать эти «отпечатки» и передавать их вашему коду для анализа.

Обзор библиотек и фреймворков

Прежде чем перейти к сборке оборудования, давайте разберёмся с программной частью. Экосистема ESP32 и Python предлагает богатый выбор инструментов.

ESP32 библиотеки и инструменты:

1. ESP-CSI (GitHub)

  • Автор: Espressif Systems (создатели ESP32)

  • Что делает: Официальная библиотека для работы с CSI данными

  • Особенности: Поддержка всех режимов Wi-Fi, готовые примеры кода

  • Использование: Основа для любых проектов Wi-Fi sensing на ESP32

2. ESP32-CSI-Tool (GitHub)

  • Автор: Steven M. Hernandez, исследователь из США

  • Что делает: Исследовательский toolkit с готовыми проектами

  • Особенности: Активный/пассивный режимы сбора, автоматическое сохранение на SD-карту

  • Использование: Отличная стартовая точка для экспериментов

3. Arduino Core for ESP32 (Документация)

  • Автор: Espressif Systems

  • Что делает: Позволяет программировать ESP32 в привычной Arduino IDE

  • Особенности: Простой синтаксис, тысячи готовых библиотек

  • Использование: Для тех, кто не хочет изучать сложный ESP-IDF

Python библиотеки для анализа данных:

1. NumPy (numpy.org)

  • Что делает: Быстрые математические операции с массивами данных

  • Почему важно: CSI данные — это многомерные массивы комплексных чисел

  • Пример использования: np.abs(csi_data) для получения амплитуд сигнала

2. Matplotlib (matplotlib.org)

  • Что делает: Создание графиков и визуализация данных в реальном времени

  • Особенности: Поддержка анимации, интерактивные графики

  • В нашем проекте: Отображение CSI данных в реальном времени

3. Scikit-learn (scikit-learn.org)

  • Что делает: Машинное обучение без глубокого погружения в математику

  • Применение: Классификация движений, детекция аномалий (падений)

  • Плюсы: Простой API, отличная документация

4. PySerial (PyPI)

  • Что делает: Связь с ESP32 через USB/UART

  • Важность: Без него невозможно получать данные с микроконтроллера

  • Установка: pip install pyserial

5. MQTT (Paho-MQTT) (PyPI)

  • Что делает: Беспроводная передача данных между устройствами

  • Применение: Сбор данных с нескольких ESP32 одновременно

  • Альтернатива: Если не хотите возиться с проводами

Список оборудования и где купить

Теперь переходим к самому интересному — закупке оборудования. Я тщательно подобрал компоненты по соотношению цена/качество, проверил наличие в российских магазинах и составил детальный список.Чтобы меня не обвинили в рекламе, я оставлю в таблице названия и примерные цены. Найти товары можно на WB и Озон или если хотите сэкономить бюджет и готовы подождать ищите на Aliexpress. Ну а если нужно очень срочно ищите оборудование в местных радиолюбительских магазинах.

Основные компоненты с актуальными ценами:

Компонент

Количество

Цена за шт.

Общая стоимость

ESP32-WROOM-32D с внешним антенным коннектором

3 шт

550₽

1650₽

Антенна всенаправленная 2.4GHz 3dBi

1 шт

300₽

300₽

Антенна панельная секторная 10dBi

1 шт

1400₽

1400₽

Антенна Yagi направленная 24dBi

1 шт

1800₽

1800₽

Кабель IPEX-SMA 20см

3 шт

580₽

1740₽

Штативы для антенн регулируемые

3 шт

400₽

1200₽

Breadboard и провода

1 комплект

1200₽

1200₽

Общая стоимость: 9290₽ — уложились в бюджет!

Почему именно эти компоненты:

ESP32-WROOM-32D с внешним коннектором — обратите внимание, что нужна именно версия с возможностью подключения внешней антенны. Обычные ESP32 с встроенной антенной-змейкой не подойдут для серьёзных экспериментов.

Три типа антенн для разных задач:

  • Всенаправленная (3dBi) — для общего мониторинга комнаты

  • Панельная (10dBi) — для направленного покрытия определённого сектора

  • Yagi (24dBi) — для точечного наблюдения с максимальной чувствительностью

Штативы — критически важны! Антенны должны стоять устойчиво и их положение должно легко регулироваться. Не экономьте на этом.

Пошаговая сборка лаборатории

Шаг 1: Подготовка ESP32 — подключение и прошивка

Этот шаг — самый важный и одновременно самый сложный для новичков. Я подробно распишу каждое действие.

Установка среды разработки

Вариант 1: Arduino IDE (проще для новичков)

  1. Скачиваем Arduino IDE

    • Идём на arduino.cc

    • Скачиваем версию 2.0 или выше

    • Устанавливаем как обычную программу

  2. Добавляем поддержку ESP32

    • Открываем Arduino IDE

    • Идём в меню Файл → Настройки (File → Preferences)

    • В поле "Дополнительные ссылки для Менеджера плат" вставляем:

      https://raw.githubusercontent.com/espressif/arduino-esp32/gh-pages/package_esp32_index.json
      
    • Нажимаем OK

  3. Устанавливаем ESP32 плату

    • Идём в Инструменты → Плата → Менеджер плат

    • В поиске вводим "ESP32"

    • Находим "ESP32 by Espressif Systems" и нажимаем Установить

    • Ждём 5-10 минут (зависит от скорости интернета)

Вариант 2: ESP-IDF (для продвинутых)

ESP-IDF — это профессиональная среда разработки от Espressif. Она сложнее, но даёт больше возможностей для работы с CSI.

  1. Установка через VS Code (рекомендуется)

    • Устанавливаем Visual Studio Code

    • Идём в Extensions (Ctrl+Shift+X)

    • Ищем "ESP-IDF" и устанавливаем расширение от Espressif

    • Нажимаем Ctrl+Shift+P, выбираем "ESP-IDF: Configure ESP-IDF Extension"

    • Выбираем Express, указываем версию ESP-IDF (рекомендую v5.1)

    • Нажимаем Install и ждём завершения (15-30 минут)

Подключение ESP32 к компьютеру

  1. Подключение по USB

    • Берём качественный USB кабель (дешёвые часто не работают!)

    • Подключаем ESP32 к компьютеру

    • Устанавливаем драйверы, если Windows их не находит автоматически

  2. Поиск COM-порта

    • Windows: Диспетчер устройств → Порты (COM и LPT) → ищем что-то вроде "Silicon Labs CP210x" или "CH340"

    • Linux/Mac: открываем терминал, вводим ls /dev/tty* и ищем /dev/ttyUSB0 или похожий

  3. Проверка связи

    • Открываем Arduino IDE

    • Выбираем плату: Инструменты → Плата → ESP32 Arduino → DOIT ESP32 DEVKIT V1

    • Выбираем порт: Инструменты → Порт → COM3 (ваш номер может отличаться)

    • Открываем пример: Файл → Примеры → WiFi → WiFiScan

    • Нажимаем стрелочку (загрузить)

    • Если всё хорошо, увидим "Загрузка завершена"

Прошивка CSI библиотеки

Теперь самое интересное — загружаем специальную прошивку для работы с CSI.

Для Arduino IDE:

  1. Устанавливаем ESP-CSI библиотеку

    # В терминале (Windows: Git Bash)
    cd Documents/Arduino/libraries
    git clone https://github.com/espressif/esp-csi
    
  2. Альтернативный способ:

    • Скачиваем ZIP с GitHub

    • В Arduino IDE: Скетч → Подключить библиотеку → Добавить .ZIP библиотеку

    • Выбираем скачанный файл

Для ESP-IDF:

# Клонируем репозиторий
git clone https://github.com/espressif/esp-csi
cd esp-csi
# Собираем и прошиваем
idf.py build
idf.py -p COM3 flash  # Замените COM3 на ваш порт

Важно: При первой прошивке ESP32 может потребовать ручного входа в режим загрузки. Когда увидите "Connecting..." в консоли, зажмите кнопку BOOT на плате и держите до начала загрузки.

Подключение внешних антенн

Это самая деликатная операция — нужна аккуратность!

  1. Подготовка платы

    • ВНИМАНИЕ: Отключите ESP32 от USB!

    • Найдите на плате небольшую антенну-змейку (обычно в углу)

    • Рядом с ней должна быть контактная площадка с надписью "IPEX" или "U.FL"

  2. Отключение встроенной антенны

    • Аккуратно перережьте тонкую дорожку между встроенной антенной и основной платой

    • Альтернатива: отпаяйте резистор перемычку (если есть навыки пайки)

  3. Подключение IPEX коннектора

    • Если коннектора нет — припаиваете его к контактной площадке

    • Если есть — просто подключаете кабель IPEX-SMA

    • Важно: не перетягивайте! Коннектор хрупкий

Шаг 2: Размещение оборудования в комнате

Пример компоновки оборудования
Пример компоновки оборудования

Правильное размещение антенн — половина успеха вашей лаборатории.

Идеальная конфигурация "треугольник":

ESP32 #1 с всенаправленной антенной:

  • Местоположение: Центр комнаты, высота 1.5-2 метра

  • Роль: Общий мониторинг активности

  • Особенности: Равномерное покрытие во все стороны

ESP32 #2 с панельной антенной:

  • Местоположение: Один из углов комнаты

  • Направление: На тестовую зону (где будете проводить эксперименты)

  • Роль: Детальный мониторинг определённой области

ESP32 #3 с Yagi антенной:

  • Местоположение: Противоположный угол от ESP32 #2

  • Направление: Узкий луч через тестовую зону

  • Роль: Высокочувствительная детекция мелких движений

Принципы размещения:

  1. Избегайте металлических предметов рядом с антеннами

  2. Высота имеет значение — чем выше, тем лучше обзор

  3. Углы падения сигнала — располагайте антенны под разными углами

  4. Минимум 1 метр между антеннами для избежания взаимных помех

Шаг 3: Настройка программного обеспечения

Теперь переходим к коду. Я приведу три варианта прошивки для разных уровней подготовки.

Прошивка для ESP32 (передатчик) — простая версия

Этот код превращает один ESP32 в источник сигнала. Он постоянно отправляет пакеты, которые будут принимать другие ESP32.

#include "freertos/FreeRTOS.h"
#include "esp_wifi.h"
#include "esp_log.h"
#include "esp_timer.h"

// Тег для логов (чтобы отличать сообщения этого кода)
static const char* TAG = "CSI_SENDER";

// Структура для конфигурации WiFi
wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();

void wifi_init() {
    // Инициализируем WiFi стек ESP32
    ESP_ERROR_CHECK(esp_wifi_init(&cfg));
    // Переводим в режим станции (подключается к другим точкам доступа)
    ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA));
    // Запускаем WiFi
    ESP_ERROR_CHECK(esp_wifi_start());

    ESP_LOGI(TAG, "WiFi инициализирован как передатчик");
}

void send_ping_task(void *pvParameters) {
    // Простой ping-пакет (можно заменить на любые данные)
    uint8_t ping_packet[] = {0x08, 0x00, 0x01, 0x02, 0x03, 0x04};

    while(1) {
        // Отправляем ping каждые 50ms для стабильного потока CSI
        // Этот интервал можно регулировать в зависимости от задач
        esp_wifi_80211_tx(WIFI_IF_STA, ping_packet, sizeof(ping_packet), false);

        // Задержка между пакетами
        vTaskDelay(pdMS_TO_TICKS(50)); // 50 миллисекунд

        // Выводим статистику каждую секунду
        static int counter = 0;
        if (++counter % 20 == 0) {
            ESP_LOGI(TAG, "Отправлено %d пакетов", counter);
        }
    }
}

void app_main() {
    // Главная функция программы
    ESP_LOGI(TAG, "Запуск CSI передатчика v1.0");

    // Инициализируем WiFi
    wifi_init();

    // Создаём задачу для отправки пакетов в отдельном потоке
    // Это позволяет основной программе заниматься другими делами
    xTaskCreate(send_ping_task, "ping_task", 4096, NULL, 5, NULL);

    ESP_LOGI(TAG, "CSI передатчик готов к работе!");
}

Что делает этот код:

  • Инициализирует WiFi на ESP32 в режиме станции

  • Каждые 50мс отправляет небольшой пакет данных

  • Эти пакеты будут принимать другие ESP32 и извлекать из них CSI

  • Выводит статистику в консоль для контроля работы

Как загрузить:

  1. Копируете код в новый проект Arduino IDE

  2. Выбираете правильную плату и порт

  3. Нажимаете "Загрузить"

  4. В Serial Monitor (Ctrl+Shift+M) увидите логи работы

Прошивка для ESP32 (приемник) — с извлечением CSI

Этот код более сложный — он принимает пакеты и извлекает из них CSI данные.

#include "freertos/FreeRTOS.h"
#include "esp_wifi.h"
#include "esp_log.h"
#include "esp_csi.h"
#include <string.h>

static const char* TAG = "CSI_RECEIVER";

// Callback функция — вызывается каждый раз при получении CSI данных
static void csi_recv_cb(void *ctx, wifi_csi_info_t *info) {
    // Проверяем, что данные корректны
    if (!info || !info->buf) {
        ESP_LOGW(TAG, "CSI данные пусты!");
        return;
    }

    // Получаем указатель на сырые CSI данные
    // Каждый элемент — это комплексное число (реальная + мнимая часть)
    int16_t *csi_data = (int16_t*)info->buf;
    int data_len = info->len / 2; // Длина в комплексных числах

    // Получаем текущее время для меток времени
    int64_t timestamp = esp_timer_get_time() / 1000; // в миллисекундах

    // Отправляем данные в компьютер в CSV формате
    // Формат: CSI,время,реальная1,мнимая1,реальная2,мнимая2,...
    printf("CSI,%lld", timestamp);

    // Выводим все CSI данные
    for(int i = 0; i < data_len; i++) {
        printf(",%d", csi_data[i]);
    }
    printf("\n"); // Завершаем строку

    // Дополнительная информация о пакете
    if (info->rx_ctrl.rssi != 0) {
        printf("RSSI,%lld,%d\n", timestamp, info->rx_ctrl.rssi);
    }
}

void csi_init() {
    // Конфигурация CSI — какие данные мы хотим получать
    wifi_csi_config_t csi_config = {
        .lltf_en = true,           // Включить Legacy Long Training Field
        .htltf_en = true,          // Включить HT Long Training Field  
        .stbc_htltf2_en = true,    // STBC HT-LTF2
        .ltf_merge_en = false,     // Не объединять LTF
        .channel_filter_en = false, // Не фильтровать каналы
        .manu_scale = false        // Автоматическое масштабирование
    };

    // Применяем конфигурацию
    ESP_ERROR_CHECK(esp_wifi_set_csi_config(&csi_config));

    // Регистрируем callback функцию для получения CSI
    ESP_ERROR_CHECK(esp_wifi_set_csi_rx_cb(csi_recv_cb, NULL));

    // Включаем сбор CSI данных
    ESP_ERROR_CHECK(esp_wifi_set_csi(true));

    ESP_LOGI(TAG, "CSI инициализирован успешно");
}

void wifi_init() {
    wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
    ESP_ERROR_CHECK(esp_wifi_init(&cfg));
    ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA));
    ESP_ERROR_CHECK(esp_wifi_start());

    ESP_LOGI(TAG, "WiFi инициализирован как приемник");
}

void app_main() {
    ESP_LOGI(TAG, "Запуск CSI приемника v1.0");

    // Инициализируем WiFi
    wifi_init();

    // Настраиваем CSI
    csi_init();

    ESP_LOGI(TAG, "CSI приемник готов! Данные будут выводиться в Serial...");

    // Основной цикл — просто ждём CSI данные
    while(1) {
        vTaskDelay(pdMS_TO_TICKS(1000)); // Спим 1 секунду
        ESP_LOGI(TAG, "Работаем... (для остановки нажмите Reset)");
    }
}

Что делает этот код:

  • Настраивает ESP32 для приёма и обработки CSI данных

  • При каждом полученном Wi-Fi пакете извлекает CSI и отправляет в Serial

  • Данные выходят в CSV формате — легко парсить в Python

  • Включает временные метки для синхронизации

Как использовать:

  1. Загружаете код на ESP32

  2. Открываете Serial Monitor (скорость 115200)

  3. Увидите строки вида: CSI,1634567890,124,-56,78,-123,...

  4. Эти данные можно сохранять в файл или обрабатывать в реальном времени

Шаг 4: Python-скрипт для сбора и анализа данных

Теперь самое интересное — пишем Python код для сбора данных с ESP32 и их анализа. Давайте для того чтобы было интереснее попробуем сразу написать простейшую систему которая будут выполнять детекцию падений человека.

Базовая система сбора данных

import serial
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import threading
import time
import json
from datetime import datetime
from scipy import signal
from sklearn.preprocessing import StandardScaler

class WiFiSensingLab:
    """
    Главный класс для управления Wi-Fi Sensing лабораторией
    """
    def __init__(self, port='/dev/ttyUSB0', baudrate=115200):
        """
        Инициализация лаборатории

        Args:
            port: COM-порт для подключения ESP32 (Windows: 'COM3', Linux: '/dev/ttyUSB0')
            baudrate: Скорость передачи данных (должна совпадать с ESP32)
        """
        try:
            self.serial_conn = serial.Serial(port, baudrate, timeout=1)
            print(f"✅ Подключился к {port} со скоростью {baudrate}")
        except serial.SerialException as e:
            print(f"❌ Ошибка подключения к {port}: {e}")
            print("Проверьте:")
            print("1. Правильность порта (Windows: COM3, Linux: /dev/ttyUSB0)")
            print("2. Установлены ли драйверы USB-UART")
            print("3. Не занят ли порт другой программой")
            raise

        # Буфер для хранения последних измерений
        self.csi_buffer = deque(maxlen=1000)  

        # Флаг для остановки сбора данных
        self.is_collecting = False

        # Для детекции падений
        self.fall_threshold = 3.0  # Порог детекции (настраивается)
        self.baseline_window = deque(maxlen=50)  # Окно для базовой линии

        # Статистика
        self.packets_received = 0
        self.start_time = time.time()

    def collect_csi_data(self):
        """
        Основной цикл сбора CSI данных с ESP32
        Запускается в отдельном потоке
        """
        print("? Начинаем сбор CSI данных...")

        while self.is_collecting:
            try:
                # Читаем строку из Serial порта
                line = self.serial_conn.readline().decode('utf-8').strip()

                if not line:
                    continue

                # Парсим данные в зависимости от типа
                if line.startswith('CSI,'):
                    self._parse_csi_data(line)
                elif line.startswith('RSSI,'):
                    self._parse_rssi_data(line)
                else:
                    # Это может быть лог сообщение от ESP32
                    print(f"? ESP32: {line}")

            except UnicodeDecodeError:
                # Иногда приходят поврежденные байты — просто игнорируем
                continue
            except Exception as e:
                print(f"❌ Ошибка чтения данных: {e}")
                break

        print("⛔ Сбор данных остановлен")

    def _parse_csi_data(self, line):
        """
        Парсинг CSI данных из строки формата: CSI,timestamp,data1,data2,...
        """
        try:
            # Разбиваем строку по запятым
            parts = line.split(',')

            if len(parts) < 3:
                return  # Недостаточно данных

            timestamp = int(parts[1])
            csi_values = [int(x) for x in parts[2:]]

            # Преобразуем в numpy массив для удобства
            csi_array = np.array(csi_values)

            # Вычисляем амплитуду (модуль комплексного числа)
            # CSI данные приходят как [real1, imag1, real2, imag2, ...]
            amplitudes = []
            for i in range(0, len(csi_values), 2):
                if i+1 < len(csi_values):
                    real = csi_values[i]
                    imag = csi_values[i+1]
                    amplitude = np.sqrt(real*real + imag*imag)
                    amplitudes.append(amplitude)

            # Средняя амплитуда по всем поднесущим
            mean_amplitude = np.mean(amplitudes) if amplitudes else 0

            # Сохраняем в буфер
            data_point = {
                'timestamp': timestamp,
                'csi_raw': csi_array,
                'amplitudes': np.array(amplitudes),
                'mean_amplitude': mean_amplitude,
                'received_at': datetime.now()
            }

            self.csi_buffer.append(data_point)
            self.packets_received += 1

            # Анализ в реальном времени
            self.analyze_fall_detection()

            # Статистика каждые 100 пакетов
            if self.packets_received % 100 == 0:
                elapsed = time.time() - self.start_time
                rate = self.packets_received / elapsed
                print(f"? Получено {self.packets_received} пакетов, скорость: {rate:.1f} пакетов/сек")

        except (ValueError, IndexError) as e:
            print(f"⚠️ Ошибка парсинга CSI: {e}, строка: {line[:50]}...")

    def _parse_rssi_data(self, line):
        """
        Парсинг RSSI данных (мощность сигнала)
        """
        try:
            parts = line.split(',')
            if len(parts) >= 3:
                timestamp = int(parts[1])
                rssi = int(parts[2])
                print(f"? RSSI: {rssi} dBm в {timestamp}")
        except (ValueError, IndexError):
            pass

    def analyze_fall_detection(self):
        """
        Простой алгоритм детекции падения на основе резких изменений CSI
        """
        if len(self.csi_buffer) < 10:
            return

        # Получаем последние амплитуды
        recent_data = list(self.csi_buffer)[-10:]
        recent_amplitudes = [data['mean_amplitude'] for data in recent_data]

        if not recent_amplitudes:
            return

        current_amplitude = recent_amplitudes[-1]

        # Обновляем базовую линию (медленно адаптируется к изменениям)
        if len(self.baseline_window) > 0:
            baseline = np.mean(self.baseline_window)
            deviation = abs(current_amplitude - baseline)

            # Детекция падения: резкое изменение сигнала
            if deviation > self.fall_threshold:
                self.send_alert(f"Возможное падение! Отклонение: {deviation:.2f}")

                # Сохраняем данные о событии для анализа
                self._save_event_data("fall_detected", recent_data)

        # Медленно обновляем базовую линию
        self.baseline_window.append(current_amplitude)

    def send_alert(self, message):
        """
        Отправка уведомления о событии
        """
        timestamp = datetime.now().strftime('%H:%M:%S')
        print(f"? ALERT [{timestamp}]: {message}")

        # Здесь можно добавить:
        # - Отправку SMS через API
        # - Push-уведомления
        # - Запись в лог файл
        # - Отправку в Telegram бот

    def _save_event_data(self, event_type, data):
        """
        Сохранение данных о событии для последующего анализа
        """
        filename = f"event_{event_type}_{int(time.time())}.json"

        # Конвертируем numpy массивы в списки для JSON
        serializable_data = []
        for item in data:
            serializable_item = {
                'timestamp': item['timestamp'],
                'csi_raw': item['csi_raw'].tolist(),
                'amplitudes': item['amplitudes'].tolist(),
                'mean_amplitude': float(item['mean_amplitude']),
                'received_at': item['received_at'].isoformat()
            }
            serializable_data.append(serializable_item)

        with open(filename, 'w') as f:
            json.dump({
                'event_type': event_type,
                'data': serializable_data,
                'saved_at': datetime.now().isoformat()
            }, f, indent=2)

        print(f"? Данные события сохранены в {filename}")

    def start_monitoring(self):
        """
        Запуск мониторинга в реальном времени
        """
        print("? Запуск Wi-Fi Sensing мониторинга...")
        self.is_collecting = True

        # Запускаем сбор данных в отдельном потоке
        collection_thread = threading.Thread(target=self.collect_csi_data)
        collection_thread.daemon = True  # Поток завершится при завершении программы
        collection_thread.start()

        return collection_thread

    def stop_monitoring(self):
        """
        Остановка мониторинга
        """
        print("⛔ Остановка мониторинга...")
        self.is_collecting = False
        if self.serial_conn and self.serial_conn.is_open:
            self.serial_conn.close()

Система визуализации в реальном времени

def create_realtime_visualizer(lab):
    """
    Создание системы визуализации CSI данных в реальном времени
    """
    plt.ion()  # Включаем интерактивный режим matplotlib

    # Создаём окно с двумя графиками
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
    fig.suptitle('Wi-Fi Sensing Laboratory - Real-time CSI Data', fontsize=14)

    def update_plots():
        """
        Обновление графиков каждые 100мс
        """
        while lab.is_collecting:
            try:
                if len(lab.csi_buffer) > 10:
                    # График 1: Амплитуды CSI во времени
                    recent_data = list(lab.csi_buffer)[-50:]  # Последние 50 точек
                    timestamps = [data['timestamp'] for data in recent_data]
                    amplitudes = [data['mean_amplitude'] for data in recent_data]

                    ax1.clear()
                    ax1.plot(timestamps, amplitudes, 'b-', linewidth=2, label='CSI Amplitude')

                    # Добавляем базовую линию и порог детекции
                    if len(lab.baseline_window) > 0:
                        baseline = np.mean(lab.baseline_window)
                        ax1.axhline(y=baseline, color='g', linestyle='--', alpha=0.7, label='Baseline')
                        ax1.axhline(y=baseline + lab.fall_threshold, color='r', linestyle='--', alpha=0.7, label='Fall Threshold')
                        ax1.axhline(y=baseline - lab.fall_threshold, color='r', linestyle='--', alpha=0.7)

                    ax1.set_ylabel('Amplitude')
                    ax1.set_title('CSI Amplitude Over Time')
                    ax1.legend()
                    ax1.grid(True, alpha=0.3)

                    # График 2: Спектрограмма (тепловая карта CSI по поднесущим)
                    if len(recent_data) >= 20:
                        # Берём последние 20 измерений
                        csi_matrix = []
                        for data in recent_data[-20:]:
                            if len(data['amplitudes']) > 0:
                                csi_matrix.append(data['amplitudes'])

                        if csi_matrix:
                            csi_matrix = np.array(csi_matrix).T  # Транспонируем для правильного отображения

                            ax2.clear()
                            im = ax2.imshow(csi_matrix, aspect='auto', cmap='viridis', interpolation='nearest')
                            ax2.set_ylabel('Subcarrier')
                            ax2.set_xlabel('Time (samples)')
                            ax2.set_title('CSI Spectrogram (Amplitude per Subcarrier)')

                            # Добавляем цветовую шкалу
                            if not hasattr(update_plots, 'colorbar'):
                                update_plots.colorbar = plt.colorbar(im, ax=ax2)
                                update_plots.colorbar.set_label('Amplitude')

                    plt.tight_layout()
                    plt.pause(0.1)  # Обновляем каждые 100мс

                else:
                    time.sleep(0.1)  # Ждём появления данных

            except Exception as e:
                print(f"Ошибка визуализации: {e}")
                break

    # Запускаем визуализацию в отдельном потоке
    viz_thread = threading.Thread(target=update_plots)
    viz_thread.daemon = True
    viz_thread.start()

    return viz_thread

# Использование системы
if __name__ == "__main__":
    print("=== Wi-Fi Sensing Laboratory v1.0 ===")
    print("Автор: Алексей")
    print("Нажмите Ctrl+C для остановки")
    print()

    # Создаём лабораторию (замените COM3 на ваш порт)
    try:
        lab = WiFiSensingLab(port='COM3', baudrate=115200)  # Windows
        # lab = WiFiSensingLab(port='/dev/ttyUSB0', baudrate=115200)  # Linux

        # Запускаем мониторинг
        collection_thread = lab.start_monitoring()

        # Запускаем визуализацию
        viz_thread = create_realtime_visualizer(lab)

        print("✅ Лаборатория запущена!")
        print("? Графики откроются в отдельном окне")
        print("? Система детекции падений активна")
        print()

        # Главный цикл программы
        try:
            while True:
                time.sleep(1)

                # Можно добавить интерактивные команды
                # Например, изменение порога детекции падений

        except KeyboardInterrupt:
            print("\n⛔ Получен сигнал остановки...")

    except Exception as e:
        print(f"❌ Критическая ошибка: {e}")

    finally:
        # Чистое завершение работы
        if 'lab' in locals():
            lab.stop_monitoring()
        print("? Лаборатория остановлена. До свидания!")

Сбор данных с нескольких ESP32 одновременно

Одно из главных преимуществ нашей лаборатории — возможность работы с несколькими ESP32 одновременно. Вот как это сделать:

Вариант 1: Несколько USB подключений

import threading
from queue import Queue

class MultiESP32Lab:
    """
    Лаборатория для работы с несколькими ESP32 одновременно
    """
    def __init__(self, ports_config):
        """
        ports_config: список словарей с настройками для каждого ESP32
        Пример: [
            {'port': 'COM3', 'name': 'ESP32_Center', 'antenna': 'omni'},
            {'port': 'COM4', 'name': 'ESP32_Corner', 'antenna': 'panel'},
            {'port': 'COM5', 'name': 'ESP32_Yagi', 'antenna': 'yagi'}
        ]
        """
        self.devices = []
        self.data_queue = Queue()  # Общая очередь для всех данных
        self.is_collecting = False

        # Инициализируем подключения ко всем ESP32
        for config in ports_config:
            try:
                device = {
                    'name': config['name'],
                    'antenna': config['antenna'],
                    'serial': serial.Serial(config['port'], 115200, timeout=1),
                    'buffer': deque(maxlen=500),
                    'packets_count': 0
                }
                self.devices.append(device)
                print(f"✅ {config['name']} подключен к {config['port']}")
            except Exception as e:
                print(f"❌ Ошибка подключения {config['name']}: {e}")

    def collect_from_device(self, device):
        """
        Сбор данных с одного ESP32 в отдельном потоке
        """
        while self.is_collecting:
            try:
                line = device['serial'].readline().decode('utf-8').strip()

                if line.startswith('CSI,'):
                    # Парсим данные (аналогично предыдущему коду)
                    parts = line.split(',')
                    timestamp = int(parts[1])
                    csi_values = [int(x) for x in parts[2:]]

                    # Добавляем информацию об устройстве
                    data_point = {
                        'device_name': device['name'],
                        'antenna_type': device['antenna'],
                        'timestamp': timestamp,
                        'csi_raw': np.array(csi_values),
                        'received_at': time.time()
                    }

                    # Сохраняем в буфер устройства и общую очередь
                    device['buffer'].append(data_point)
                    self.data_queue.put(data_point)
                    device['packets_count'] += 1

            except Exception as e:
                print(f"Ошибка чтения с {device['name']}: {e}")
                break

    def start_multi_collection(self):
        """
        Запуск сбора данных со всех ESP32 одновременно
        """
        self.is_collecting = True
        threads = []

        # Создаём отдельный поток для каждого ESP32
        for device in self.devices:
            thread = threading.Thread(target=self.collect_from_device, args=(device,))
            thread.daemon = True
            thread.start()
            threads.append(thread)

        print(f"? Запущен сбор данных с {len(self.devices)} устройств")
        return threads

    def get_synchronized_data(self, time_window=1000):
        """
        Получение синхронизированных данных со всех устройств

        time_window: окно времени в миллисекундах для синхронизации
        """
        current_time = int(time.time() * 1000)
        synchronized_data = {}

        for device in self.devices:
            # Ищем данные в временном окне
            recent_data = []
            for data_point in device['buffer']:
                if abs(data_point['timestamp'] - current_time) <= time_window:
                    recent_data.append(data_point)

            synchronized_data[device['name']] = recent_data

        return synchronized_data

# Использование Multi-ESP32 системы
multi_lab_config = [
    {'port': 'COM3', 'name': 'ESP32_Center', 'antenna': 'omni'},
    {'port': 'COM4', 'name': 'ESP32_Corner', 'antenna': 'panel'},
    {'port': 'COM5', 'name': 'ESP32_Yagi', 'antenna': 'yagi'}
]

multi_lab = MultiESP32Lab(multi_lab_config)
threads = multi_lab.start_multi_collection()

# Пример обработки синхронизированных данных
while True:
    sync_data = multi_lab.get_synchronized_data()

    # Анализируем данные с разных антенн
    for device_name, data_list in sync_data.items():
        if data_list:
            print(f"{device_name}: получено {len(data_list)} пакетов")

    time.sleep(1)

Вариант 2: MQTT для беспроводного сбора

Если не хочется возиться с проводами, можно настроить ESP32 на отправку данных по Wi-Fi через MQTT:

Код для ESP32 (отправка по MQTT):

#include <WiFi.h>
#include <PubSubClient.h>
#include "esp_csi.h"

const char* ssid = "ВашWiFi";
const char* password = "ВашПароль";
const char* mqtt_server = "192.168.1.100";  // IP вашего компьютера

WiFiClient espClient;
PubSubClient client(espClient);

void setup() {
    Serial.begin(115200);

    // Подключение к WiFi
    WiFi.begin(ssid, password);
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }

    // Подключение к MQTT брокеру
    client.setServer(mqtt_server, 1883);

    // Инициализация CSI
    csi_init();
}

void csi_recv_cb(void *ctx, wifi_csi_info_t *info) {
    // Формируем JSON с CSI данными
    String json_data = "{";
    json_data += "\"device\":\"ESP32_1\",";
    json_data += "\"timestamp\":" + String(millis()) + ",";
    json_data += "\"csi\":[";

    int16_t *csi_data = (int16_t*)info->buf;
    for(int i = 0; i < info->len/2; i++) {
        if(i > 0) json_data += ",";
        json_data += String(csi_data[i]);
    }
    json_data += "]}";

    // Отправляем по MQTT
    client.publish("wifi_sensing/csi", json_data.c_str());
}

Python код для приёма MQTT:

import paho.mqtt.client as mqtt
import json

def on_connect(client, userdata, flags, rc):
    print(f"MQTT подключен с кодом {rc}")
    client.subscribe("wifi_sensing/csi")

def on_message(client, userdata, msg):
    try:
        # Парсим JSON данные от ESP32
        data = json.loads(msg.payload.decode())
        device_name = data['device']
        timestamp = data['timestamp']
        csi_values = data['csi']

        print(f"Получены данные от {device_name}: {len(csi_values)} значений CSI")

        # Здесь обрабатываем данные аналогично Serial варианту

    except Exception as e:
        print(f"Ошибка парсинга MQTT: {e}")

# Настройка MQTT клиента
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("localhost", 1883, 60)
client.loop_forever()

Этот код позволяет собирать данные с ESP32 по беспроводной сети, что упрощает размещение устройств в комнате.

Пять экспериментов для вашей лаборатории

Теперь, когда у нас есть полноценная лаборатория, вот пять интересных экспериментов разной сложности:

1. Детекция дыхания человека

Сложность: ⭐⭐☆☆☆ Оборудование: ESP32 + Yagi антенна Цель: Подсчёт частоты дыхания бесконтактно.

def breathing_detection_experiment():
    """
    Эксперимент по детекции дыхания
    """
    print("? === ЭКСПЕРИМЕНТ: ДЕТЕКЦИЯ ДЫХАНИЯ ===")

    lab = WiFiSensingLab(port='COM3')  
    lab.start_monitoring()

    print("? Сядьте на расстоянии 1-2 метра от Yagi антенны")
    print("? Дышите спокойно и равномерно")
    input("Нажмите Enter когда будете готовы...")

    # Собираем данные 60 секунд
    breathing_data = []
    start_time = time.time()

    while time.time() - start_time < 60:
        if len(lab.csi_buffer) > 0:
            latest = list(lab.csi_buffer)[-1]
            breathing_data.append({
                'timestamp': latest['timestamp'],
                'amplitude': latest['mean_amplitude']
            })
        time.sleep(0.1)  # 10 Hz

    # Анализируем дыхание
    timestamps = [d['timestamp'] for d in breathing_data]
    amplitudes = [d['amplitude'] for d in breathing_data]

    # Фильтруем шум (дыхание: 0.1-0.5 Hz)
    from scipy.signal import butter, filtfilt

    # Полосовой фильтр для частот дыхания
    nyquist = 5  # Частота Найквиста (10 Hz / 2)
    low_freq = 0.1 / nyquist   # 0.1 Hz (6 вдохов/мин)
    high_freq = 0.8 / nyquist  # 0.8 Hz (48 вдохов/мин)

    b, a = butter(4, [low_freq, high_freq], btype='band')
    filtered_signal = filtfilt(b, a, amplitudes)

    # Подсчёт пиков (вдохов)
    from scipy.signal import find_peaks

    peaks, _ = find_peaks(filtered_signal, distance=10, prominence=0.1)
    breathing_rate = len(peaks) / 60 * 60  # вдохов в минуту

    print(f"? Обнаружено {len(peaks)} вдохов за 60 секунд")
    print(f"? Частота дыхания: {breathing_rate:.1f} вдохов/мин")
    print(f"? Норма: 12-20 вдохов/мин")

    # Визуализация
    plt.figure(figsize=(12, 6))
    plt.subplot(2, 1, 1)
    plt.plot(amplitudes, label='Сырой сигнал')
    plt.title('CSI Amplitude')
    plt.legend()

    plt.subplot(2, 1, 2)  
    plt.plot(filtered_signal, label='Отфильтрованный сигнал')
    plt.plot(peaks, filtered_signal[peaks], 'ro', label=f'Вдохи ({len(peaks)})')
    plt.title('Детекция дыхания')
    plt.legend()
    plt.show()

2. Мониторинг сна

Сложность: ⭐⭐⭐☆☆ Оборудование: ESP32 + панельная антенна над кроватью Цель: Анализ качества сна и фаз

3. Система "умный дом"

Сложность: ⭐⭐⭐⭐☆ Оборудование: 3-5 ESP32 в разных комнатах Цель: Автоматизация освещения и климата

4. Детекция нескольких человек

Сложность: ⭐⭐⭐⭐⭐ Оборудование: Все антенны + машинное обучение Цель: Подсчёт и трекинг до 5 человек одновременно

5. Анализ походки

Сложность: ⭐⭐⭐☆☆ Оборудование: ESP32 + всенаправленная антенна Цель: Идентификация человека по походке

Заключение и дальнейшее развитие

Поздравляю! Вы создали полноценную исследовательскую лабораторию Wi-Fi sensing всего за 9290 рублей. Эта установка позволяет проводить серьёзные эксперименты и даже публиковать научные работы.

Что вы получили:

  • Аппаратную платформу с тремя типами антенн

  • Программное обеспечение для сбора и анализа данных

  • Алгоритмы машинного обучения для классификации движений

  • Систему детекции падений с практическим применением

  • Базу для дальнейших экспериментов

Направления развития:

1. Edge AI на ESP32 Современные ESP32-S3 могут запускать небольшие нейросети прямо на борту. Это открывает возможности для создания полностью автономных систем мониторинга.

2. Mesh-сети Объединив десятки ESP32 в mesh-сеть, можно создать систему мониторинга целого здания или района.

3. Интеграция с IoT платформами Подключение к Home Assistant, Google Home или Apple HomeKit для создания по-настоящему умного дома.

4. Медицинские применения Система может быть адаптирована для мониторинга пожилых людей, пациентов с деменцией или людей с ограниченными возможностями.

Делитесь результатами экспериментов в комментариях! Особенно интересно увидеть:

  • Ваши конфигурации антенн

  • Результаты экспериментов

  • Модификации алгоритмов

  • Новые области применения

До новых встреч в мире Wi-Fi sensing! ?


P.S. Для скорости написания стати я выбрал примеры коды из разных своих проектов, что-то на ходу дописал, но не проверил работу кода в боевом режиме. Принципиально весь код верный, но где-то мог допустить ошибки и опечатки, так как все таки объем приведенного кода здесь достаточно большой. Заранее прошу сообщество меня извинить.

Комментарии (1)


  1. 9a75sd
    24.08.2025 09:45

    Есть еще идея: детекция присутствия в комнате, при этом отличая человека от кошки

    +взять, например, ESP32S3, и собрать проект с поддержкой USB-хабов, и собирать эти данные. Поддержку хабов вроде как завезли в ESP-IDF v5.5