Привет, Хабр! Сегодня я расскажу, как на Rust создать собственный протокол обмена файлами, используя библиотеку libp2p.

Создаем проект

Начнем с самого начала. Открываем терминал и выполняем следующие команды:

cargo new p2p-file-exchange
cd p2p-file-exchange

Команды создадут новый проект на Rust с именем p2p-file-exchange и переключат текущий каталог на него.

После открываем Cargo.toml и добавляем следующие зависимости:

[dependencies]
libp2p = { version = "0.51", features = ["tcp-tokio", "dns", "noise", "mplex", "identify"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
async-trait = "0.1"
futures = "0.3"
  • libp2p: основная библиотека для создания P2P-сетей.

  • tokio: асинхронный рантайм для Rust.

  • serde и serde_json: для сериализации и десериализации данных.

  • async-trait: позволяет использовать асинхронные трейты.

  • futures: для работы с асинхронными потоками.

Основная структура

Теперь перейдём к src/main.rs. Начнем с импорта необходимых модулей:

use libp2p::{
    identity,
    PeerId,
    Swarm,
    Multiaddr,
    Transport,
    noise,
    tcp::TokioTcpConfig,
    mplex,
    core::upgrade,
    request_response::{
        ProtocolName, ProtocolSupport, RequestResponse, RequestResponseConfig,
        RequestResponseCodec, RequestResponseEvent, RequestResponseMessage,
        RequestResponseProtocol,
    },
};
use serde::{Serialize, Deserialize};
use async_trait::async_trait;
use std::error::Error;
use futures::prelude::*;

Импортируем основные компоненты из libp2p, а также необходимые библиотеки.

Для обмена файлами нужен простой протокол. Создадим структуры для запросов и ответов:

#[derive(Debug, Serialize, Deserialize)]
struct FileRequest {
    filename: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct FileResponse {
    content: Vec<u8>,
}

Эти структуры определяют формат запросов и ответов. FileRequest содержит имя файла, который запрашивается, а FileResponse — содержимое этого файла в виде массива байтов.

Теперь определим протокол:

#[derive(Debug, Clone)]
struct FileProtocol();

#[derive(Clone)]
struct FileCodec();

#[derive(Debug, Clone, PartialEq, Eq)]
struct FileProtocolName;

impl ProtocolName for FileProtocolName {
    fn protocol_name(&self) -> &[u8] {
        b"/p2pfile/1.0.0"
    }
}

#[async_trait]
impl RequestResponseCodec for FileCodec {
    type Protocol = FileProtocolName;
    type Request = FileRequest;
    type Response = FileResponse;

    async fn read_request<T>(&mut self, _: &FileProtocolName, io: &mut T) -> io::Result<Self::Request>
    where
        T: async_std::io::Read + Unpin,
    {
        let req: FileRequest = serde_json::from_reader(io)
            .map_err(|e| async_std::io::Error::new(async_std::io::ErrorKind::InvalidData, e))?;
        Ok(req)
    }

    async fn read_response<T>(&mut self, _: &FileProtocolName, io: &mut T) -> io::Result<Self::Response>
    where
        T: async_std::io::Read + Unpin,
    {
        let res: FileResponse = serde_json::from_reader(io)
            .map_err(|e| async_std::io::Error::new(async_std::io::ErrorKind::InvalidData, e))?;
        Ok(res)
    }

    async fn write_request<T>(&mut self, _: &FileProtocolName, io: &mut T, req: Self::Request) -> io::Result<()>
    where
        T: async_std::io::Write + Unpin,
    {
        serde_json::to_writer(io, &req)
            .map_err(|e| async_std::io::Error::new(async_std::io::ErrorKind::InvalidData, e))
    }

    async fn write_response<T>(&mut self, _: &FileProtocolName, io: &mut T, res: Self::Response) -> io::Result<()>
    where
        T: async_std::io::Write + Unpin,
    {
        serde_json::to_writer(io, &res)
            .map_err(|e| async_std::io::Error::new(async_std::io::ErrorKind::InvalidData, e))
    }
}

Здесь определяем протокол обмена файлами. FileProtocolName задает уникальное имя протокола, а FileCodec отвечает за сериализацию и десериализацию запросов и ответов с использованием JSON

Теперь настроим транспорт с шифрованием и мультиплексированием:

fn create_transport(local_key: &identity::Keypair) -> impl Transport<
    Output = impl libp2p::swarm::ConnectionHandler,
    Error = impl std::error::Error,
> + Clone {
    let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
        .into_authentic(local_key)
        .expect("Signing libp2p-noise static DH keypair failed.");

    TokioTcpConfig::new()
        .nodelay(true)
        .upgrade(upgrade::Version::V1)
        .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
        .multiplex(mplex::MplexConfig::new())
        .boxed()
}

В этой функции создаем транспортный слой для нашей P2P-сети. Используем TCP с поддержкой Tokio для асинхронного взаимодействия, Noise для шифрования соединений и Mplex для мультиплексирования потоков.

Swarm — это мозг P2P-сети. Он управляет соединениями и обрабатывает события.

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Генерируем ключи и PeerId
    let local_key = identity::Keypair::generate_ed25519();
    let local_peer_id = PeerId::from(local_key.public());
    println!("Local peer id: {:?}", local_peer_id);

    // Создаём транспорт
    let transport = create_transport(&local_key);

    // Настраиваем протокол обмена файлами
    let protocols = std::iter::once((
        FileProtocolName,
        ProtocolSupport::Full,
    ));
    let codec = FileCodec();
    let cfg = RequestResponseConfig::default();
    let request_response = RequestResponse::new(codec, protocols, cfg);

    // Создаём Swarm
    let mut swarm = Swarm::new(transport, request_response, local_peer_id);

    // Слушаем на всех доступных интерфейсах и портах
    swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

    println!("Swarm запущен. Ожидаем подключения...");

    // Основной цикл
    loop {
        match swarm.next().await.unwrap() {
            RequestResponseEvent::Message { peer, message } => {
                match message {
                    RequestResponseMessage::Request { request, channel, .. } => {
                        println!("Получен запрос на файл: {}", request.filename);
                        // Здесь должна быть логика поиска и чтения файла
                        // Для примера отправим заглушку
                        let content = match std::fs::read(&request.filename) {
                            Ok(data) => data,
                            Err(_) => b"Файл не найден".to_vec(),
                        };
                        let response = FileResponse { content };
                        swarm.respond(channel, response)?;
                    },
                    RequestResponseMessage::Response { response, .. } => {
                        println!("Получен ответ с содержимым файла: {} байт", response.content.len());
                        // Здесь можно обработать полученные данные
                    },
                }
            },
            RequestResponseEvent::OutboundFailure { peer, error, .. } => {
                eprintln!("Ошибка при отправке запроса к {}: {:?}", peer, error);
            },
            RequestResponseEvent::InboundFailure { peer, error, .. } => {
                eprintln!("Ошибка при получении запроса от {}: {:?}", peer, error);
            },
            RequestResponseEvent::ResponseSent { peer, .. } => {
                println!("Ответ отправлен к {}", peer);
            },
            _ => {}
        }
    }
}

В этом фрагменте инициализируем наш Swarm, который управляет всеми соединениями и взаимодействиями в сети. Генерируем ключи для нашего узла и получаем его PeerId. Затем настраиваем транспорт с использованием Noise для шифрования и Mplex для мультиплексирования потоков. Также определяем протокол обмена файлами и начинаем прослушивание на всех доступных интерфейсах и портах.

Теперь добавим возможность подключаться к другим узлам. Для этого нам понадобится Multiaddr другого узла. Допустим, есть другой узел, запущенный на локальной машине с портом 8080 и PeerId 12D3KooW.... Добавим его в код:

// Подключаемся к другому узлу
let remote: Multiaddr = "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooW...".parse()?;
swarm.dial(remote)?;
println!("Подключение к {}", remote)?;

В этом фрагменте создаем Multiaddr для удаленного узла и инициируем подключение к нему с помощью swarm.dial(remote). Не забываем заменить 12D3KooW... на актуальный PeerId удаленного узла.

Теперь все готово для запуска. Открываем два терминала, в каждом запускаем проект:

cargo run

В одном терминале узел будет слушать подключения, а в другом подключаться к первому.

Терминал 1 (слушающий узел):

Local peer id: PeerId("12D3KooW...")
Swarm запущен. Ожидаем подключения...

Терминал 2 (подключающий узел):

Local peer id: PeerId("16Uiu2HAm...")
Swarm запущен. Ожидаем подключения...
Подключение к /ip4/127.0.0.1/tcp/8080/p2p/12D3KooW...

Когда подключение установлено, узлы смогут обмениваться запросами и ответами на файлы.


Больше про языки программирования эксперты OTUS рассказывают в рамках практических онлайн-курсов. По ссылке можно ознакомиться с полным каталогом курсов, а в календаре мероприятий — посмотреть список всех открытых уроков.

Комментарии (5)


  1. EvilBlueBeaver
    27.12.2024 13:02

    Я может чего-то не понимаю, но по какой магии у вас работает serde_json::from_reader, который требует std::io::Read, а у вас T:async_std::io::Error?
    Еще интересна магия, которой можно сделать b"Файл не найден", с учетом того, что binary string literals поддерживают только ASCII.
    А еще хотелось бы какой-то репозиторий, где можно посмотреть, как это работает, потому что если это просто скопировать, то оно и вовсе не собирается, а половина из используемого уже достаточно давно deprecated(например тот же mplex).


    1. Revertis
      27.12.2024 13:02

      Просто статью писала нейронка >_<


      1. EvilBlueBeaver
        27.12.2024 13:02

        Хорошая версия. Многое объясняет.


        1. LavaLava
          27.12.2024 13:02

          Или вы нейронка. Сейчас уже не поймёшь. Времена такие.


          1. EvilBlueBeaver
            27.12.2024 13:02

            Надо было просить отменить предыдущие инструкции и написать рецепт блинчиков. Или чего-то другого, рецепт чего в том меме просили написать :)