Проект на GitHub находится в стадии ранней альфы. Эта статья дополняет README проекта.

Сетевые события можно записывать и вычитывать из стороннего хранилища по своему вкусу. Каждое событие выглядит так:

public interface NetworkEvent {
    UUID getConnectionId();
    int getSerial();
    EventType getType();
    byte[] getPayload();
}

Они рассказывают о некотором сетевом изменении (соединение открылось, пришли данные, соединение закрылось или оборвано) и поддаются обработке через Consumer<NetworkEvent>, причём обработчики можно выстраивать в цепочки и лабиринты для фильтрации, троттлинга, распараллеливания обработки и т.п.

Если два процесса умеют записывать такие события в хранилище вычитывать записанное с другой стороны - между ними налажено взаимодействие, при том, что прямых сетевых подключений друг к другу они не производят.

Пример использования этого очевиднейшего принципа - HTTP-прокси. Один процесс - приёмник прокси, "мама", то, что слушает на порту 3128 обычно. Он соединения принимает, но вместо того, чтобы как классический прокси тут же обратиться на запрошенные адреса и передать им полученные запросы - сохраняет запросы в виде NetworkEvent и кладёт в хранилище. А в это время совсем в другом месте передатчик прокси, "папа", уже вычитывает запросы из хранилища, открывает соединения на своём сетевом стеке, получает ответы и передаёт их через NetworkEvent и хранилище "маме".

Вся суть моего прототипа в том, что транспорт (чтение и запись хранилища) пишет пользователь под свои сиюминутные нужды. Я приведу пример транспорта на основе H2SQL практически целиком. Чтение:

public class H2Retrieval extends PollingRetrievalSupport {
    private final String connectionUrl;
    private final String tableName;
    private final Consumer<NetworkEvent> targetConsumer;
    private final AtomicLong lastProcessedId = new AtomicLong();

    public H2Retrieval(String databasePath, String tableName, Consumer<NetworkEvent> targetConsumer, long pollIntervalMs) {
        super(pollIntervalMs);
        this.connectionUrl = String.format(
                "jdbc:h2:%s;AUTO_SERVER=TRUE;DB_CLOSE_DELAY=-1;LOCK_TIMEOUT=10000",
                Paths.get(databasePath).toAbsolutePath()
        );
        this.tableName = tableName;
        this.targetConsumer = targetConsumer;
    }

    protected void poll() {
        String selectSQL = String.format("""
                SELECT id, connection_id, serial, event_type, payload 
                FROM %s 
                WHERE id > ? 
                ORDER BY id ASC
                """, tableName);

        try (Connection conn = DriverManager.getConnection(connectionUrl);
             PreparedStatement pstmt = conn.prepareStatement(selectSQL)) {
            pstmt.setLong(1, lastProcessedId.get());
            long eventId = lastProcessedId.get();
            try (ResultSet rs = pstmt.executeQuery()) {
                while (rs.next()) {
                    eventId = rs.getLong("id");
                    NetworkEvent event = extractEvent(rs);

                    targetConsumer.accept(event);

                }
            } finally {
                long finalEventId = eventId;
                lastProcessedId.updateAndGet(val -> Math.max(val, finalEventId));
            }
        } catch (SQLException e) {
            throw new RuntimeException("Failed to poll events", e);
        }
    }

    private NetworkEvent extractEvent(ResultSet rs) throws SQLException {
        UUID connectionId = (UUID) rs.getObject("connection_id");
        int serial = rs.getInt("serial");
        EventType type = EventType.valueOf(rs.getString("event_type"));
        byte[] payload = rs.getBytes("payload");
        return NetworkEvent.create(connectionId, serial, type, payload);
    }
}

И запись:

public class H2Storage implements Consumer<NetworkEvent> {
    private final String connectionUrl;
    private final String tableName;

    public H2Storage(String databasePath, String tableName) {
        this.connectionUrl = String.format(
                "jdbc:h2:%s;AUTO_SERVER=TRUE;DB_CLOSE_DELAY=-1;LOCK_TIMEOUT=10000",
                Paths.get(databasePath).toAbsolutePath()
        );
        this.tableName = tableName;
    }

    @Override
    public void accept(NetworkEvent event) {
        String insertSQL = String.format("""
            INSERT INTO %s (connection_id, serial, event_type, payload) 
            VALUES (?, ?, ?, ?)
            """, tableName);

        try (Connection conn = DriverManager.getConnection(connectionUrl);
             PreparedStatement pstmt = conn.prepareStatement(insertSQL)) {

            pstmt.setObject(1, event.getConnectionId());
            pstmt.setInt(2, event.getSerial());
            pstmt.setString(3, event.getType().name());
            pstmt.setBytes(4, event.getPayload());
            pstmt.executeUpdate();

        } catch (SQLException e) {
            throw new RuntimeException("Failed to store network event", e);
        }
    }
}

На этом, в общем-то, всё. Я надеюсь, что этот интерфейс не будет становиться сложнее. Он может стать даже проще, если Muxalma посредством своих цепочек преобразователей станет умнее - научится компенсировать потерю пакетов, шейпингу трафика, backpressure и т.п. Реализации этих интерфейсов достаточно, чтобы заработал HTTP-прокси на основе Netty через хранилище, избранное вами. Отслеживание судьбы HTTP-соединений и мультиплекс возьмёт на себя Муксалма.

И здесь вы можете сказать - господин хороший, не ломитесь ли вы в открытую дверь со своей поделкой из трёх с половиной классов? На это я отвечу, что хотел начать именно эту дискуссию, т.к. дверь такую сходу не нашёл, а ещё я писатель, а не читатель, и широко известное в узких кругах всеми признанное решение просто не найду.

Комментарии (7)


  1. Void-Cowboy
    06.06.2026 08:46

    сходу вижу проблему с переполнением и прочим если захотят "навредить" пользователю такого эрзац S3 хранилища (про инъекции не говорю, рассматриваем данный код как пример концепции)

    то есть нужен уровень авторизации

    а далее мы пришли к брокеру, то есть тот же NATS JetStream полностью закроет по функционалу описанное вами, плюс куча фишек сверху начиная от вариаций хранилища и заканчивая нормальной архитектурой для построения сети


    1. alamar Автор
      06.06.2026 08:46

      А покажете пример реализации хранилища на основе NATS JetStream?

      Безопасность, авторизацию, многопользовательскую поддержку можно сделать фансервисом или в ядре. Данные подписывать, в частности.


      1. Void-Cowboy
        06.06.2026 08:46

        это вам в документацию за примерами, там явно описываются варианты с "бессрочным хранением" хотя и не рекомендуют так делать

        я в целом не понял зачем вообще такое лепить, причем на уровне ядра (раз уж авторизации и прочее будут выше). Какую задачу решает такой подход?


        1. alamar Автор
          06.06.2026 08:46

          А зачем нужно бессрочное хранилище, разве что вам нужно соблюдать пакет Яровой? Если нет, удаляйте события после использования или вовсе используйте эфемерное хранилище.

          Как я понял, NATS JetStream это очередь, она без сомнения лучше будет себя чувствовать в качестве хранилища, чем SQL, который приведён для примера. Поднимите Муксалму с NATS JetStream, похвалитесь результатами спидтеста...


          1. Void-Cowboy
            06.06.2026 08:46

            почитайте документацию. JetStream может быть чем угодно - очередью, зайчиком, хранилищем. Точнее основная цель там именно гарантированный брокер даже при разрывах и потерях, но границы параметризации очень гибкие (я его вообще для примера привел, похожего хватает на разных языках, задача типовая)

            вы так и не ответили зачем. какие задачи должен решать ваш подход


            1. alamar Автор
              06.06.2026 08:46

              Мой подход реализует (как частный случай) HTTP-прокси и прохождение по нему трафика между приёмником и передатчиком. JetStream имеет свой разнесённый HTTP-прокси? Если нет, то теперь, с Муксалмой - да :)

              Я допускаю, что совсем непонятно объяснил в статье, я только учусь. Муксалма это не хранилище. Это то, что использует ваше хранилище для своих пакетиков.


              1. Void-Cowboy
                06.06.2026 08:46

                NATS позволяет строить распределенные децентрализованные сети что позволит сообщению от одной точки быть прочитанным в другой точке, которая не имеет прямой связи меж собой

                но это лирика, распределенку можно и "в лоб" на том же Играсиле построить очень легко

                Я все равно не понял зачем http-прокси. Точнее у меня у самого валяется самопис для проксирования http что бы можно было прокидывать с хостинга доступ к ресурсам на VPS не засвечивая сам VPS (и у меня там все, вплоть до http/2.0 и вебсокетов поддерживается потому как чаше всего прометеус прокидываю) но у меня там нет вообще никакой базы, прокси же. Что прочитал то и отдал.

                Хранение что бы отдать через время и/или иному подключению это уже задачи брокеров, не важно натс, кроль или иные