Привет, Хабр!

Сегодня рассмотрим библиотеку libp2p в Go. libp2p — это модульная библиотека для построения P2P-сетей. Libp2p выросла из проекта IPFS, но теперь активно используется в блокчейнах, мессенджерах и других децентрализованных приложениях. Главная фича библиотеки — она даёт возможность полностью контролировать P2P-коммуникации.

Libp2p разделена на несколько модулей, которые можно подключать по мере необходимости:

  1. Transport: низкоуровневое соединение (TCP, WebSocket, QUIC, WebRTC).

  2. Muxing: позволяет мультиплексировать несколько потоков данных через одно соединение (например, протокол Yamux).

  3. Security: отвечает за шифрование трафика (Noise или TLS).

  4. Peer Discovery: поиск пиров в сети.

  5. PubSub: широковещательная рассылка данных.

  6. NAT Traversal: обход NAT и создание прямых соединений.

Всё это управляется через набор опций, которые вы передаёте при создании хоста.

Синтаксис: ключевые функции и модули

Создание базового хоста

Хост — это сердце любого P2P-приложения. Он представляет узел в сети, управляет подключениями и адресами.

Функция libp2p.New

func New(opts ...Option) (host.Host, error)

opts — список опций, которые задают поведение хоста. Возвращает объект host.Host, который вы будете использовать для работы с соединениями.

import (
	"fmt"
	"log"

	libp2p "github.com/libp2p/go-libp2p"
)

func main() {
	h, err := libp2p.New()
	if err != nil {
		log.Fatalf("Ошибка создания хоста: %v", err)
	}
	defer h.Close()

	fmt.Println("Хост создан. ID:", h.ID())
}

Опции хоста

Libp2p позволяет настраивать хост с помощью опций. Самые важные из них:

libp2p.Identity

Задаёт приватный ключ, который будет использоваться для идентификации узла.

import (
	"crypto/rand"
	"github.com/libp2p/go-libp2p/core/crypto"
)

func createIdentity() crypto.PrivKey {
	priv, _, err := crypto.GenerateEd25519Key(rand.Reader)
	if err != nil {
		log.Fatalf("Ошибка генерации ключа: %v", err)
	}
	return priv
}

h, err := libp2p.New(
	libp2p.Identity(createIdentity()),
)

libp2p.ListenAddrs

Задаёт адреса, на которых хост будет принимать подключения.

import "github.com/multiformats/go-multiaddr"

addr, _ := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/9000")
h, err := libp2p.New(
	libp2p.ListenAddrs(addr),
)

libp2p.Security

Добавляет поддержку шифрования. Например, можно использовать протокол Noise:

import "github.com/libp2p/go-libp2p/p2p/security/noise"

h, err := libp2p.New(
	libp2p.Security(noise.ID, noise.New),
)

Работа с пирами

Для подключения к другому узлу используйте host.Connect:

import (
	"context"
	"github.com/libp2p/go-libp2p/core/peer"
)

peerAddr, _ := peer.AddrInfoFromString("/ip4/127.0.0.1/tcp/9000/p2p/QmPeerID")
err := h.Connect(context.Background(), *peerAddr)
if err != nil {
	log.Fatalf("Ошибка подключения: %v", err)
}

Libp2p использует протоколы, чтобы передавать данные между узлами. Вот как открыть поток и отправить сообщение:

import "github.com/libp2p/go-libp2p/core/network"

stream, err := h.NewStream(context.Background(), peerID, "/my-protocol/1.0.0")
if err != nil {
	log.Fatalf("Ошибка создания потока: %v", err)
}

_, err = stream.Write([]byte("Привет, P2P!"))
if err != nil {
	log.Fatalf("Ошибка записи: %v", err)
}

PubSub

Если нужно реализовать распределённый чат или передачу событий, используйте модуль PubSub. Пример:

import (
	"context"
	"fmt"
	"log"

	libp2p "github.com/libp2p/go-libp2p"
	pubsub "github.com/libp2p/go-libp2p-pubsub"
)

func main() {
	// Создаём хост
	h, err := libp2p.New()
	if err != nil {
		log.Fatalf("Ошибка создания хоста: %v", err)
	}

	// Создаём PubSub
	ps, err := pubsub.NewGossipSub(context.Background(), h)
	if err != nil {
		log.Fatalf("Ошибка создания PubSub: %v", err)
	}

	// Подключаемся к теме
	topic, err := ps.Join("chat-room")
	if err != nil {
		log.Fatalf("Ошибка подписки: %v", err)
	}

	// Получаем сообщения
	sub, _ := topic.Subscribe()
	go func() {
		for {
			msg, _ := sub.Next(context.Background())
			fmt.Println("Получено сообщение:", string(msg.Data))
		}
	}()

	// Отправляем сообщения
	topic.Publish(context.Background(), []byte("Привет, P2P!"))
}

Особенности NAT traversal

Libp2p поддерживает hole punching, чтобы соединять узлы за NAT. Вот как включить его:

import "github.com/libp2p/go-libp2p/p2p/protocol/holepunch"

h, err := libp2p.New(
	libp2p.EnableHolePunching(),
)

Пример P2P-файлообменника с PubSub и NAT Traversal

Создадим прототип P2P-файлообменника, который:

  • Позволяет узлам обнаруживать друг друга.

  • Передаёт метаинформацию о файлах через PubSub.

  • Реализует передачу файлов напрямую через стримы.

  • Работает за NAT благодаря NAT traversal.

Каждый узел будет запускаться с уникальным идентификатором и подключается к общей PubSub-теме file-share. Узлы обмениваются информацией о доступных файлах через PubSub. Когда один из узлов хочет скачать файл, он подключается к владельцу файла и получает данные через поток.

Код:

package main

import (
	"bufio"
	"context"
	"crypto/rand"
	"fmt"
	"io"
	"log"
	"os"
	"strings"
	"time"

	libp2p "github.com/libp2p/go-libp2p"
	crypto "github.com/libp2p/go-libp2p/core/crypto"
	pubsub "github.com/libp2p/go-libp2p-pubsub"
	peer "github.com/libp2p/go-libp2p/core/peer"
)

const TopicName = "file-share"

type FileInfo struct {
	Name string
	Size int64
	Hash string
}

func main() {
	// Создаём P2P-хост
	host, err := libp2p.New(
		libp2p.EnableHolePunching(),
		libp2p.Identity(generateIdentity()),
	)
	if err != nil {
		log.Fatalf("Ошибка создания хоста: %v", err)
	}
	defer host.Close()

	fmt.Println("Хост запущен с ID:", host.ID())

	// Создаём PubSub
	ctx := context.Background()
	ps, err := pubsub.NewGossipSub(ctx, host)
	if err != nil {
		log.Fatalf("Ошибка создания PubSub: %v", err)
	}

	// Подключаемся к теме
	topic, err := ps.Join(TopicName)
	if err != nil {
		log.Fatalf("Ошибка подписки на тему: %v", err)
	}

	sub, err := topic.Subscribe()
	if err != nil {
		log.Fatalf("Ошибка подписки: %v", err)
	}

	// Запускаем слушатель сообщений
	go listenForMessages(ctx, sub)

	// Основной цикл
	for {
		fmt.Println("Введите команду: [share <путь к файлу>] или [download <hash>]")
		reader := bufio.NewReader(os.Stdin)
		command, _ := reader.ReadString('\n')
		command = strings.TrimSpace(command)

		if strings.HasPrefix(command, "share") {
			// Команда для публикации файла
			filePath := strings.TrimPrefix(command, "share ")
			shareFile(ctx, topic, filePath)
		} else if strings.HasPrefix(command, "download") {
			// Команда для скачивания файла
			hash := strings.TrimPrefix(command, "download ")
			downloadFile(ctx, host, hash)
		}
	}
}

// Генерация приватного ключа для узла
func generateIdentity() crypto.PrivKey {
	priv, _, err := crypto.GenerateEd25519Key(rand.Reader)
	if err != nil {
		log.Fatalf("Ошибка генерации ключа: %v", err)
	}
	return priv
}

// Обработка входящих сообщений
func listenForMessages(ctx context.Context, sub *pubsub.Subscription) {
	for {
		msg, err := sub.Next(ctx)
		if err != nil {
			log.Printf("Ошибка получения сообщения: %v", err)
			continue
		}

		fmt.Printf("Сообщение от %s: %s\n", msg.ReceivedFrom, string(msg.Data))
	}
}

// Публикация информации о файле
func shareFile(ctx context.Context, topic *pubsub.Topic, filePath string) {
	fileInfo, err := os.Stat(filePath)
	if err != nil {
		log.Printf("Ошибка чтения файла: %v", err)
		return
	}

	// Генерируем хэш файла
	hash := fmt.Sprintf("%x", fileInfo.ModTime().UnixNano())

	// Публикуем информацию о файле
	message := fmt.Sprintf("file|%s|%d|%s", fileInfo.Name(), fileInfo.Size(), hash)
	if err := topic.Publish(ctx, []byte(message)); err != nil {
		log.Printf("Ошибка публикации сообщения: %v", err)
		return
	}

	fmt.Printf("Файл %s опубликован! Хэш: %s\n", fileInfo.Name(), hash)
}

// Скачивание файла
func downloadFile(ctx context.Context, host libp2p.Host, hash string) {
	fmt.Println("Поиск файла с хэшем:", hash)

	// В реальном приложении нужно было бы найти пира с этим файлом через PubSub или DHT
	// Здесь просто демонстрируем подключение
	peerAddr, _ := peer.AddrInfoFromString("/ip4/127.0.0.1/tcp/9000/p2p/<peer-id>")

	err := host.Connect(ctx, *peerAddr)
	if err != nil {
		log.Printf("Ошибка подключения к узлу: %v", err)
		return
	}

	stream, err := host.NewStream(ctx, peerAddr.ID, "/file-transfer/1.0.0")
	if err != nil {
		log.Printf("Ошибка создания потока: %v", err)
		return
	}
	defer stream.Close()

	// Читаем файл из потока
	file, err := os.Create(hash + "_downloaded")
	if err != nil {
		log.Printf("Ошибка создания файла: %v", err)
		return
	}
	defer file.Close()

	fmt.Println("Скачивание начато...")
	_, err = io.Copy(file, stream)
	if err != nil {
		log.Printf("Ошибка при загрузке файла: %v", err)
		return
	}

	fmt.Println("Файл успешно скачан!")
}

Пример работает так: все узлы подключаются к общей теме file-share и обмениваются информацией о файлах. Если узел хочет поделиться файлом, он отправляет сообщение с названием, размером и хэшем файла. Другой узел, заинтересованный в этом файле, находит владельца через PubSub, подключается к нему и скачивает файл напрямую через поток. Чтобы всё было круче, можно добавить распределённый поиск узлов через DHT, шифровать передачу данных и настроить репликацию файлов, чтобы они не потерялись, если кто-то из узлов отключится.

Подробнее с библиотекой можно ознакомиться здесь.


Напоследок напоминаю про открытые уроки, которые пройдут в Otus завтра (успевайте записаться!):

  • «Коммуникация между микросервисами с помощью RabbitMQ в ASP.NET». Подробнее

  • «Хранение данных в Kubernetes: Volumes, Storages, Stateful-приложения». Подробнее

Полный список открытых уроков по разработке, а также по всем IT-направлениям можно посмотреть в календаре мероприятий.

Комментарии (1)


  1. Boomerang
    30.01.2025 06:54

    Очень интересно, но в голову вообще не приходит, для чего это можно применить. Разве что какое-то приложение похожее на торрент. У вас нет каких-то более пракических, менее выдуманных юзкейсов?