Проект на GitHub находится в стадии ранней альфы. Эта статья дополняет README проекта.
Сетевые события можно записывать и вычитывать из стороннего хранилища по своему вкусу. Каждое событие выглядит так:
public interface NetworkEvent { UUID getConnectionId(); int getSerial(); EventType getType(); byte[] getPayload(); }
Они рассказывают о некотором сетевом изменении (соединение открылось, пришли данные, соединение закрылось или оборвано) и поддаются обработке через Consumer<NetworkEvent>, причём обработчики можно выстраивать в цепочки и лабиринты для фильтрации, троттлинга, распараллеливания обработки и т.п.
Если два процесса умеют записывать такие события в хранилище вычитывать записанное с другой стороны - между ними налажено взаимодействие, при том, что прямых сетевых подключений друг к другу они не производят.
Пример использования этого очевиднейшего принципа - HTTP-прокси. Один процесс - приёмник прокси, "мама", то, что слушает на порту 3128 обычно. Он соединения принимает, но вместо того, чтобы как классический прокси тут же обратиться на запрошенные адреса и передать им полученные запросы - сохраняет запросы в виде NetworkEvent и кладёт в хранилище. А в это время совсем в другом месте передатчик прокси, "папа", уже вычитывает запросы из хранилища, открывает соединения на своём сетевом стеке, получает ответы и передаёт их через NetworkEvent и хранилище "маме".
Вся суть моего прототипа в том, что транспорт (чтение и запись хранилища) пишет пользователь под свои сиюминутные нужды. Я приведу пример транспорта на основе H2SQL практически целиком. Чтение:
public class H2Retrieval extends PollingRetrievalSupport { private final String connectionUrl; private final String tableName; private final Consumer<NetworkEvent> targetConsumer; private final AtomicLong lastProcessedId = new AtomicLong(); public H2Retrieval(String databasePath, String tableName, Consumer<NetworkEvent> targetConsumer, long pollIntervalMs) { super(pollIntervalMs); this.connectionUrl = String.format( "jdbc:h2:%s;AUTO_SERVER=TRUE;DB_CLOSE_DELAY=-1;LOCK_TIMEOUT=10000", Paths.get(databasePath).toAbsolutePath() ); this.tableName = tableName; this.targetConsumer = targetConsumer; } protected void poll() { String selectSQL = String.format(""" SELECT id, connection_id, serial, event_type, payload FROM %s WHERE id > ? ORDER BY id ASC """, tableName); try (Connection conn = DriverManager.getConnection(connectionUrl); PreparedStatement pstmt = conn.prepareStatement(selectSQL)) { pstmt.setLong(1, lastProcessedId.get()); long eventId = lastProcessedId.get(); try (ResultSet rs = pstmt.executeQuery()) { while (rs.next()) { eventId = rs.getLong("id"); NetworkEvent event = extractEvent(rs); targetConsumer.accept(event); } } finally { long finalEventId = eventId; lastProcessedId.updateAndGet(val -> Math.max(val, finalEventId)); } } catch (SQLException e) { throw new RuntimeException("Failed to poll events", e); } } private NetworkEvent extractEvent(ResultSet rs) throws SQLException { UUID connectionId = (UUID) rs.getObject("connection_id"); int serial = rs.getInt("serial"); EventType type = EventType.valueOf(rs.getString("event_type")); byte[] payload = rs.getBytes("payload"); return NetworkEvent.create(connectionId, serial, type, payload); } }
И запись:
public class H2Storage implements Consumer<NetworkEvent> { private final String connectionUrl; private final String tableName; public H2Storage(String databasePath, String tableName) { this.connectionUrl = String.format( "jdbc:h2:%s;AUTO_SERVER=TRUE;DB_CLOSE_DELAY=-1;LOCK_TIMEOUT=10000", Paths.get(databasePath).toAbsolutePath() ); this.tableName = tableName; } @Override public void accept(NetworkEvent event) { String insertSQL = String.format(""" INSERT INTO %s (connection_id, serial, event_type, payload) VALUES (?, ?, ?, ?) """, tableName); try (Connection conn = DriverManager.getConnection(connectionUrl); PreparedStatement pstmt = conn.prepareStatement(insertSQL)) { pstmt.setObject(1, event.getConnectionId()); pstmt.setInt(2, event.getSerial()); pstmt.setString(3, event.getType().name()); pstmt.setBytes(4, event.getPayload()); pstmt.executeUpdate(); } catch (SQLException e) { throw new RuntimeException("Failed to store network event", e); } } }
На этом, в общем-то, всё. Я надеюсь, что этот интерфейс не будет становиться сложнее. Он может стать даже проще, если Muxalma посредством своих цепочек преобразователей станет умнее - научится компенсировать потерю пакетов, шейпингу трафика, backpressure и т.п. Реализации этих интерфейсов достаточно, чтобы заработал HTTP-прокси на основе Netty через хранилище, избранное вами. Отслеживание судьбы HTTP-соединений и мультиплекс возьмёт на себя Муксалма.
И здесь вы можете сказать - господин хороший, не ломитесь ли вы в открытую дверь со своей поделкой из трёх с половиной классов? На это я отвечу, что хотел начать именно эту дискуссию, т.к. дверь такую сходу не нашёл, а ещё я писатель, а не читатель, и широко известное в узких кругах всеми признанное решение просто не найду.
Void-Cowboy
сходу вижу проблему с переполнением и прочим если захотят "навредить" пользователю такого эрзац S3 хранилища (про инъекции не говорю, рассматриваем данный код как пример концепции)
то есть нужен уровень авторизации
а далее мы пришли к брокеру, то есть тот же NATS JetStream полностью закроет по функционалу описанное вами, плюс куча фишек сверху начиная от вариаций хранилища и заканчивая нормальной архитектурой для построения сети
alamar Автор
А покажете пример реализации хранилища на основе NATS JetStream?
Безопасность, авторизацию, многопользовательскую поддержку можно сделать фансервисом или в ядре. Данные подписывать, в частности.
Void-Cowboy
это вам в документацию за примерами, там явно описываются варианты с "бессрочным хранением" хотя и не рекомендуют так делать
я в целом не понял зачем вообще такое лепить, причем на уровне ядра (раз уж авторизации и прочее будут выше). Какую задачу решает такой подход?
alamar Автор
А зачем нужно бессрочное хранилище, разве что вам нужно соблюдать пакет Яровой? Если нет, удаляйте события после использования или вовсе используйте эфемерное хранилище.
Как я понял, NATS JetStream это очередь, она без сомнения лучше будет себя чувствовать в качестве хранилища, чем SQL, который приведён для примера. Поднимите Муксалму с NATS JetStream, похвалитесь результатами спидтеста...
Void-Cowboy
почитайте документацию. JetStream может быть чем угодно - очередью, зайчиком, хранилищем. Точнее основная цель там именно гарантированный брокер даже при разрывах и потерях, но границы параметризации очень гибкие (я его вообще для примера привел, похожего хватает на разных языках, задача типовая)
вы так и не ответили зачем. какие задачи должен решать ваш подход
alamar Автор
Мой подход реализует (как частный случай) HTTP-прокси и прохождение по нему трафика между приёмником и передатчиком. JetStream имеет свой разнесённый HTTP-прокси? Если нет, то теперь, с Муксалмой - да :)
Я допускаю, что совсем непонятно объяснил в статье, я только учусь. Муксалма это не хранилище. Это то, что использует ваше хранилище для своих пакетиков.
Void-Cowboy
NATS позволяет строить распределенные децентрализованные сети что позволит сообщению от одной точки быть прочитанным в другой точке, которая не имеет прямой связи меж собой
но это лирика, распределенку можно и "в лоб" на том же Играсиле построить очень легко
Я все равно не понял зачем http-прокси. Точнее у меня у самого валяется самопис для проксирования http что бы можно было прокидывать с хостинга доступ к ресурсам на VPS не засвечивая сам VPS (и у меня там все, вплоть до http/2.0 и вебсокетов поддерживается потому как чаше всего прометеус прокидываю) но у меня там нет вообще никакой базы, прокси же. Что прочитал то и отдал.
Хранение что бы отдать через время и/или иному подключению это уже задачи брокеров, не важно натс, кроль или иные