На Хабре не так уж много статей, посвященных Vert.x, раз, два и обчёлся. Поэтому решил внести свой вклад и опубликовать небольшой урок, в котором рассказано, как написать простой чат с помощью Vert.x 3.



Содержание


  1. Что такое Vert.x?
  2. О чате
  3. Структура проекта
  4. Сервер
  5. Клиент
  6. Тестирование
  7. Сборка и запуск исполняемого модуля
  8. Полный исходный код
  9. Полезные ресурсы

Что такое Vert.x?


Vert.x это событийно-ориентированный фреймворк работающий на JVM. На данный момент последняя версия этого фреймворка 3.2. Vert.x 3 предоставляет следующие возможности:

  • Мультиязычность. Компоненты приложения могут быть разработаны на Java, JavaScript, Scala, Python, Ruby, Groovy, а также Clojure;
  • Параллелизм. Довольно-таки простая модель параллелизма, освобождающая от хлопот многопоточного программирования;
  • Асинхронность. Простая модель асинхронного взаимодействия без блокировки;
  • Распределенная шина событий. Включающая как клиентскую, так и серверную стороны. Играет непосредственно главную роль в нашем чате;
  • Java 8. Vert.x 3 требует версии Java не ниже 8.

Более подробно о чате


Приложение запускается на сервере, после развертывания публикуется адрес анонимного чата, к которому можно присоединиться, через любой браузер. По этому адресу приложение транслирует в реальном времени сообщения от всех пользователей.



Поехали!


Разработку будем вести в IntelliJ IDEA 15, достаточно Community-версии.

Структура проекта


Создаем maven-проект. К сожалению готового архетипа, для vert.x 3 нет (хотя для 2 существует), поэтому генерим обычный maven-проект. Его конечная структура будет иметь следующий вид:

структура
src
+---main
|   +---java
|   |   |   Server.java
|   |   |   VerticleLoader.java
|   |   |
|   |   \---webroot
|   |           date-format.js
|   |           index.html
|   |           vertx-eventbus.js
|   |
|   \---resources
\---test
    \---java
            ChatTest.java


В pom.xml задаем следующие зависимости. Где vertx-core библиотека поддержки Verticles (более подробно, что это такое, немного дальше), vertx-web – позволяет использовать обработчик событий (и не только) и vertx-unit – для модульного тестирования.

pom.xml
<dependencies>
    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-core</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-web</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-unit</artifactId>
        <version>3.0.0</version>
        <scope>test</scope>
    </dependency>
</dependencies>


Сервер


Особенностью данного фреймворка является то, что все компоненты должны быть представлены в виде Verticle.

Verticle – это некоторый аналог сервлета, является атомарной единицей развёртывания. Сами разработчики описывают Verticle, как нечто похожее на актера в модели акторов. Собственно, эта конструкция позволяет организовать высокую степень параллелизма и асинхронности, чем и славится Vert.x. В реализации нашего сервера, мы наследуем абстрактный класс AbstractVerticle.

Переопределяемый нами метод start() является точкой входа в программу. Сначала выполняется развертывание приложения – функция deploy(), затем вешается обработчик – метод handle().

Server.java
public class Server extends AbstractVerticle {
    private Logger log = LoggerFactory.getLogger(Server.class);
    private SockJSHandler handler = null;
    private AtomicInteger online = new AtomicInteger(0);

    //точка входа.
    @Override
    public void start() throws Exception {

        if (!deploy()) {
            log.error("Failed to deploy the server.");
            return;
        }

        handle();
    }

    //...
}


Для развертывания приложения необходимо получить свободный порт, в случае, если не удалось его получить, в hostPort будет отрицательное значение. Далее создаем роутер, указываем для него адрес получателя и вешаем обработчик. И наконец, запускаем HTTP-Server на доступном порту.

Server.java
//развертывание приложения.
private boolean deploy() {
    int hostPort = getFreePort();

    if (hostPort < 0)
        return false;

    Router router = Router.router(vertx);

    //обработчик событий.
    handler = SockJSHandler.create(vertx);

    router.route("/eventbus/*").handler(handler);
    router.route().handler(StaticHandler.create());

    //запуск веб-сервера.
    vertx.createHttpServer().requestHandler(router::accept).listen(hostPort);

    try {
        String addr = InetAddress.getLocalHost().getHostAddress();
        log.info("Access to \"CHAT\" at the following address: \nhttp://" + addr + ":" + hostPort);
    } catch (UnknownHostException e) {
        log.error("Failed to get the local address: [" + e.toString() + "]");
        return false;
    }

    return true;
}


Процесс получения свободного порта представлен во фрагменте кода ниже. Сначала проверяется static-поле PROCESS_ARGS на наличие аргументов запуска приложения, одним из которых может быть порт развёртывания приложения, заданный пользователем. В случае, если порт не был задан, используется порт по умолчанию: 8080.

Server.java
//получение свободного порта для развертывания приложения.
private int getFreePort() {
    int hostPort = 8080;

    //если порт задан в качестве аргумента,
    // при запуске приложения.
    if (Starter.PROCESS_ARGS != null
            && Starter.PROCESS_ARGS.size() > 0) {
        try {
            hostPort = Integer.valueOf(Starter.PROCESS_ARGS.get(0));
        } catch (NumberFormatException e) {
            log.warn("Invalid port: [" + Starter.PROCESS_ARGS.get(0) + "]");
        }
    }

    //если некорректно указан порт.
    if (hostPort < 0 || hostPort > 65535)
        hostPort = 8080;

    return getFreePort(hostPort);
}


Если в качестве аргумента конструктора создания сокета, указан параметр со значением 0, то в таком случае будет выдан случайный свободный порт.

Когда порт уже занят (например, порт 8080 уже используется другим приложением, но при этом, он указан в качестве аргумента запуска текущего приложения), выбрасывается исключение BindException, в таком случае выполняется повторная попытка получения свободного порта.

Server.java
private int getFreePort(int hostPort) {
    try {
        ServerSocket socket = new ServerSocket(hostPort);
        int port = socket.getLocalPort();
        socket.close();

        return port;
    } catch (BindException e) {
        //срабатывает, когда указанный порт уже занят.
        if (hostPort != 0)
            return getFreePort(0);

        log.error("Failed to get the free port: [" + e.toString() + "]");
        return -1;
    } catch (IOException e) {
        log.error("Failed to get the free port: [" + e.toString() + "]");
        return -1;
    }
}


В случае успешного развертывания, начинается прослушивание шины событий по адресам: chat.to.server (входящие события) и chat.to.client (исходящие события).

После обработки очередного события на шине, необходимо чекнуть это событие.

Server.java
private void handle() {
    BridgeOptions opts = new BridgeOptions()
            .addInboundPermitted(new PermittedOptions().setAddress("chat.to.server"))
            .addOutboundPermitted(new PermittedOptions().setAddress("chat.to.client"));

    //обработка приходящих событий.
    handler.bridge(opts, event -> {
        if (event.type() == PUBLISH)
            publishEvent(event);

        if (event.type() == REGISTER)
            registerEvent(event);

        if (event.type() == SOCKET_CLOSED)
            closeEvent(event);

        //обратите внимание, после обработки события
        // должен вызываться говорящий сам за себя метод.
        event.complete(true);
    });
}


Любые события, которые происходят на шине, могут быть представлены 7 следующими типами:
Тип Событие
SOCKET_CREATED возникает при создании сокета
SOCKET_CLOSED при закрытии сокета
SEND попытка отправки сообщения от клиента к серверу
PUBLISH публикация сообщения клиентом для сервера
RECEIVE уведомление от сервера, о доставленном сообщении
REGISTER попытка зарегистрировать обработчик
UNREGISTER попытка отменить зарегистрированный обработчик

В нашем приложении нам достаточно лишь обрабатывать события с типом PUBLISH, REGISTER и SOCKET_CLOSED.

Событие с типом PUBLISH срабатывает, когда кто-то из пользователей отправляет сообщение в чат.
REGISTER – срабатывает тогда, когда пользователь регистрирует обработчик. Почему не SOCKET_CREATED? Потому что, событие с типом SOCKET_CREATED предшествует – REGISTER, и, естественно, пока клиент не зарегистрирует обработчик, он не сможет получать события.
SOCKET_CLOSED – возникает, всегда когда пользователь покидает чат или когда возникает непредвиденная ситуация.

При публикации сообщения, срабатывает обработчик и вызывает метод publishEvent. Проверяется адрес назначения, в случае, если он корректен, сообщение извлекается, затем проверяется и публикуется на шине событий для всех клиентов (в т.ч. и отправителя).

Server.java
private boolean publishEvent(BridgeEvent event) {
    if (event.rawMessage() != null
            && event.rawMessage().getString("address").equals("chat.to.server")) {
        String message = event.rawMessage().getString("body");
        if (!verifyMessage(message))
            return false;

        String host = event.socket().remoteAddress().host();
        int port = event.socket().remoteAddress().port();

        Map<String, Object> publicNotice = createPublicNotice(host, port, message);
        vertx.eventBus().publish("chat.to.client", new Gson().toJson(publicNotice));
        return true;
    } else
        return false;
}


Генерация уведомления для публикации сообщения выглядит следующим образом:

Server.java
//создание уведомления о публикации сообщения.
private Map<String, Object> createPublicNotice(String host, int port, String message) {
    Date time = Calendar.getInstance().getTime();

    Map<String, Object> notice = new TreeMap<>();
    notice.put("type", "publish");
    notice.put("time", time.toString());
    notice.put("host", host);
    notice.put("port", port);
    notice.put("message", message);
    return notice;
}


Вход и выход пользователей в чат обрабатываются следующим способом:

Server.java
//тип события - регистрация обработчика.
private void registerEvent(BridgeEvent event) {
    if (event.rawMessage() != null
            && event.rawMessage().getString("address").equals("chat.to.client"))
        new Thread(() ->
        {
            Map<String, Object> registerNotice = createRegisterNotice();
            vertx.eventBus().publish("chat.to.client", new Gson().toJson(registerNotice));
        }).start();
}

//создание уведомления о регистрации пользователя.
private Map<String, Object> createRegisterNotice() {
    Map<String, Object> notice = new TreeMap<>();
    notice.put("type", "register");
    notice.put("online", online.incrementAndGet());
    return notice;
}

//тип события - закрытие сокета.
private void closeEvent(BridgeEvent event) {
    new Thread(() ->
    {
        Map<String, Object> closeNotice = createCloseNotice();
        vertx.eventBus().publish("chat.to.client", new Gson().toJson(closeNotice));
    }).start();
}

//создание уведомления о выходе пользвателя из чата.
private Map<String, Object> createCloseNotice() {
    Map<String, Object> notice = new TreeMap<>();
    notice.put("type", "close");
    notice.put("online", online.decrementAndGet());
    return notice;
}


Проверка публикуемого сообщения достаточно примитивная, но для примера и этого достаточно, т.е. вы её можете сами усложнить проверяя, например, на передачу скриптов в виде сообщения и прочих хаков.

Server.java
private boolean verifyMessage(String msg) {
    return msg.length() > 0
            && msg.length() <= 140;
}


Для обмена данными используется формат JSON, поэтому файл pom.xml необходимо обновить, добавив следующую зависимость:

pom.xml
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.3.1</version>
</dependency>


Также, в нашем чате будет отображаться счетчик числа онлайн-пользователей, т.к. наше приложение многопоточное, оно гарантировано должно быть thread-safety, поэтому наиболее простой способ объявить наш счётчик как AtomicInteger.

Клиент


Создаем index.html в разделе webroot, как это представленной на структуре в начале статьи. Для общения с сервером, а точнее, с шиной событий используется библиотека vertx-eventbus.js.

Для форматирования даты, будем использовать библиотеку date-format.js, довольно-таки простая и удобная. Помимо этого, в качестве html оформления будем использовать bootstrap версии 3.3.5, sockjs.js версии 0.3.4, необходимый для библиотеки vertx-eventbus.js и jquery версии 1.11.3.

Обработчик шины событий на стороне клиента выглядит следующим образом:

index.html
var online = 0; //счетчик онлайн-пользователей.
var eb = new EventBus("/eventbus/"); //шина событий.

eb.onopen = function() {
    //обработчик событий в чате.
    eb.registerHandler("chat.to.client", eventChatProcessing);
};

//обработчик событий в чате.
function eventChatProcessing(err, msg) {
    var event = jQuery.parseJSON(msg.body);

    if (event.type == 'publish') { //сообщение.
        var time = Date.parse(event.time);
        var formattedTime = dateFormat(time, "dd.mm.yy HH:MM:ss");

        //добавить сообщение.
        appendMsg(event.host, event.port, event.message, formattedTime);
    } else { //изменение числа пользователей.
        //type: register или close.
        online = event.online;
        $('#online').text(online);
    }
};


В случае, если тип события publish (т.е. публикация сообщения), то данные из события (event) формируются в кортеж и присоединяются к таблице сообщений. Иначе, когда тип события соответствует новому или ушедшему пользователю, просто обновляется счетчик онлайн пользователей. Функция добавления сообщения довольно-таки проста.

index.html
//добавление нового сообщения.
function appendMsg(host, port, message, formattedTime) {
    var $msg = $('<tr bgcolor="#dff0d8"><td align="left">' + formattedTime + '</td><td align="left">' + host + ' [' + port + ']' + '</td><td>' + message + '</td></tr>');

    var countMsg = $('#messages tr').length;
    if (countMsg == 0)
        $('#messages').append($msg);
    else
        $('#messages > tbody > tr:first').before($msg);
}


Во время отправки сообщения, оно сначала публикуется по адресу “chat.to.server”, где его обрабатывает сервер, в случае, если сообщение проходит верификацию, оно рассылается всем клиентам, в т.ч. и отправителю.

index.html
$(document).ready(function() {
    //событие отправления сообщения.
    $('#chatForm').submit(function(evt) {
        evt.preventDefault();
        var message = $('#message').val();
        if (message.length > 0) {
            //отправление сообщения на шину событий.
            eb.publish("chat.to.server", message);
            $('#message').val("").focus();
            countChar();
        }
    });
});


Ну и наконец последний метод, который обрабатывает количество введенных символов, по условию, пользователь не может ввести сообщение длиной более 140 символов.

index.html
//счетчик введенных символов.
function countChar() {
    var len = $('#message').val().length;
    if (len > 140) {
        var msg = $('#message').val().substring(0, 140);
        $('#message').val(msg);
    } else {
        $('#charNum').text(140 - len);
        var per = 100 / 140 * len;
        $('#charNumProgressBar').css('width', per + '%').attr('aria-valuenow', per);
    }
};


Полная версия index.html, включая разметку, размещена в конце статьи.

После того, как мы написали серверную и клиентскую части, наступила очередь запуска приложения. Для запуска и удобной отладки, я рекомендую написать собственный загрузчик Verticle, хотя и есть более простая альтернатива, которую я приведу немного позже.

Единственно, значение, которое инициализирует переменную dir должно быть актуально, т.е. в действительности должен существовать такой путь. А также, переменная verticleID должна инициализироваться именем запускаемого verticle-класса, весь остальной код не подлежит изменению.

VerticleLoader.java
public class VerticleLoader {
    private static Vertx vertx;

    public static Vertx getVertx() {
        return vertx;
    }

    public static void load() {
        load(null);
    }

    public static void load(Handler<AsyncResult<String>> completionHandler) {
        VertxOptions options = new VertxOptions().setClustered(false);
        //путь до verticle-класса.
        String dir = "chat/src/main/java/";

        try {
            File current = new File(".").getCanonicalFile();
            if (dir.startsWith(current.getName()) && !dir.equals(current.getName())) {
                dir = dir.substring(current.getName().length() + 1);
            }
        } catch (IOException e) {
        }

        System.setProperty("vertx.cwd", dir);
        String verticleID = Server.class.getName();

        Consumer<Vertx> runner = vertx ->
        {
            try {
                if (completionHandler == null)
                    vertx.deployVerticle(verticleID);
                else
                    vertx.deployVerticle(verticleID, completionHandler);
            } catch (Throwable t) {
                t.printStackTrace();
            }
        };

        if (options.isClustered()) {
            Vertx.clusteredVertx(options, res ->
            {
                if (res.succeeded()) {
                    vertx = res.result();
                    runner.accept(vertx);
                } else {
                    res.cause().printStackTrace();
                }
            });
        } else {
            vertx = Vertx.vertx(options);
            runner.accept(vertx);
        }
    }

    public static void main(String[] args) {
        load();
    }
}


Теперь, когда загрузчик готов, создадим конфигурацию запуска: Run – Edit Configuration… – Add New Configuration (Alt + Insert) – Application. Указываем Main Class как VerticleLoader, сохраняем конфигурацию и запускаем.

Изображение конфигурации


PROFIT!

Обещанная альтернатива.

Альтернативная конфигурация
Необходимо создать конфигурацию запуска, как это представлено на картинке. На самом деле, класс Starter является главным классом, он содержит метод main, который-то и является точкой входа приложения.




Тестирование


Давайте протестируем разработанное нами приложение. Делать мы это будем с использованием JUnit, поэтому необходимо снова открыть pom.xml и добавить следующую зависимость:

pom.xml
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
</dependency>


В setUp мы создаем экземпляр Vertx и развертываем на него нашу Verticle. В отличии от традиционного JUnit методов, все текущие методы получают еще TestContext. Задача этого объекта соблюдать асинхронность наших тестов.

В методе tearDown() для объекта TestContext вызывается asyncAssertSuccess(), он терпит неудачу, если при завершении работы Verticle возникли проблемы.

ChatTest.java
@RunWith(VertxUnitRunner.class)
public class ChatTest {
    private Vertx vertx;
    private int port = 8080;
    private Logger log = LoggerFactory.getLogger(ChatTest.class);

    //@Ignore
    @Before
    public void setUp(TestContext context) throws IOException {
        VerticleLoader.load(context.asyncAssertSuccess());
        vertx = VerticleLoader.getVertx();
    }

    //@Ignore
    @After
    public void tearDown(TestContext context) {
        vertx.close(context.asyncAssertSuccess());
    }

    //...
}


В методе loadVerticleTest мы проверяем загрузку нашего приложения. Создаем клиента и пытаемся удостовериться, что приложение, развернутое по указанному нами адресу доступно. В случае успеха, мы получаем код состояния 200.

Затем, пытаемся получить содержимое страницы, заголовок которой должен содержать текст “Chat”.

Так как запрос и ответ являются асинхронными операциями, поэтому необходимо это как-то контролировать и получать уведомления, когда тест завершился для этого используется объект Async, вызывающий всегда метод complete() по завершению теста.

ChatTest.java
@Test
public void loadVerticleTest(TestContext context) {
    log.info("*** loadVerticleTest ***");

    Async async = context.async();
    vertx.createHttpClient().getNow(port, "localhost", "/", response ->
    {
        context.assertEquals(response.statusCode(), 200);
        context.assertEquals(response.headers().get("content-type"), "text/html");
        response.bodyHandler(body ->
        {
            context.assertTrue(body.toString().contains("<title>Chat</title>"));
            async.complete();
        });
    });
}


В методе eventBusTest создается клиент шины событий и вешается обработчик. В то время, пока клиент ждет какие-либо события на шине, публикуется сообщение. Обработчик реагирует на это и проверяет тело входящего события на эквивалентность, в случае успешной проверки тест завершается вызовом async.complete().

ChatTest.java
@Test
public void eventBusTest(TestContext context) {
    log.info("*** eventBusTest ***");

    Async async = context.async();
    EventBus eb = vertx.eventBus();
    eb.consumer("chat.to.server").handler(message ->
    {
        String getMsg = message.body().toString();
        context.assertEquals(getMsg, "hello");
        async.complete();
    });

    eb.publish("chat.to.server", "hello");
}


Запускаем тесты.

Посмотреть как...
Вкладка Maven Projects – Lifecycle – test – Run [test].


Сборка и запуск исполняемого модуля


Для этого необходимо в pom.xml добавить плагин maven-shade-plugin. Где Main-Verticle в нашем случае должен указывать на класс Server.

pom.xml
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.3</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <manifestEntries>
                            <Main-Class>io.vertx.core.Starter</Main-Class>
                            <Main-Verticle>Server</Main-Verticle>
                        </manifestEntries>
                    </transformer>
                </transformers>
                <artifactSet/>
                <outputFile>${project.build.directory}/${project.artifactId}-${project.version}-fat.jar</outputFile>
            </configuration>
        </execution>
    </executions>
</plugin>


Выполняем команду Run Maven Build, после чего в каталоге target появится chat-1.0-fat.jar. Для запуска приложения исполняемый модуль и папка webroot должны находиться в одном каталоге. Чтобы развернуть наше приложение на порту 12345 необходимо выполнить команду:
java -jar chat-1.0-fat.jar 12345


На этом всё. Успехов!

Полный исходный код


Server.java
import com.google.gson.Gson;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Starter;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.StaticHandler;
import io.vertx.ext.web.handler.sockjs.BridgeEvent;
import io.vertx.ext.web.handler.sockjs.BridgeOptions;
import io.vertx.ext.web.handler.sockjs.PermittedOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;

import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

import static io.vertx.ext.web.handler.sockjs.BridgeEvent.Type.*;

public class Server extends AbstractVerticle {
    private Logger log = LoggerFactory.getLogger(Server.class);
    private SockJSHandler handler = null;
    private AtomicInteger online = new AtomicInteger(0);

    //точка входа.
    @Override
    public void start() throws Exception {

        if (!deploy()) {
            log.error("Failed to deploy the server.");
            return;
        }

        handle();
    }

    //развертывание приложения.
    private boolean deploy() {
        int hostPort = getFreePort();

        if (hostPort < 0)
            return false;

        Router router = Router.router(vertx);

        //обработчик событий.
        handler = SockJSHandler.create(vertx);

        router.route("/eventbus/*").handler(handler);
        router.route().handler(StaticHandler.create());

        //запуск веб-сервера.
        vertx.createHttpServer().requestHandler(router::accept).listen(hostPort);

        try {
            String addr = InetAddress.getLocalHost().getHostAddress();
            log.info("Access to \"CHAT\" at the following address: \nhttp://" + addr + ":" + hostPort);
        } catch (UnknownHostException e) {
            log.error("Failed to get the local address: [" + e.toString() + "]");
            return false;
        }

        return true;
    }

    //получение свободного порта для развертывания приложения.
    private int getFreePort() {
        int hostPort = 8080;

        //если порт задан в качестве аргумента,
        // при запуске приложения.
        if (Starter.PROCESS_ARGS != null
                && Starter.PROCESS_ARGS.size() > 0) {
            try {
                hostPort = Integer.valueOf(Starter.PROCESS_ARGS.get(0));
            } catch (NumberFormatException e) {
                log.warn("Invalid port: [" + Starter.PROCESS_ARGS.get(0) + "]");
            }
        }

        //если некорректно указан порт.
        if (hostPort < 0 || hostPort > 65535)
            hostPort = 8080;

        return getFreePort(hostPort);
    }

    //если в качестве порта указано значение 0,
    // то выдается случайный свободный порт.
    private int getFreePort(int hostPort) {
        try {
            ServerSocket socket = new ServerSocket(hostPort);
            int port = socket.getLocalPort();
            socket.close();

            return port;
        } catch (BindException e) {
            //срабатывает, когда указанный порт уже занят.
            if (hostPort != 0)
                return getFreePort(0);

            log.error("Failed to get the free port: [" + e.toString() + "]");
            return -1;
        } catch (IOException e) {
            log.error("Failed to get the free port: [" + e.toString() + "]");
            return -1;
        }
    }

    private void handle() {
        BridgeOptions opts = new BridgeOptions()
                .addInboundPermitted(new PermittedOptions().setAddress("chat.to.server"))
                .addOutboundPermitted(new PermittedOptions().setAddress("chat.to.client"));

        //обработка приходящих событий.
        handler.bridge(opts, event -> {
            if (event.type() == PUBLISH)
                publishEvent(event);

            if (event.type() == REGISTER)
                registerEvent(event);

            if (event.type() == SOCKET_CLOSED)
                closeEvent(event);

            //обратите внимание, после обработки события
            // должен вызываться говорящий сам за себя метод.
            event.complete(true);
        });
    }

    //тип события - публикация сообщения.
    private boolean publishEvent(BridgeEvent event) {
        if (event.rawMessage() != null
                && event.rawMessage().getString("address").equals("chat.to.server")) {
            String message = event.rawMessage().getString("body");
            if (!verifyMessage(message))
                return false;

            String host = event.socket().remoteAddress().host();
            int port = event.socket().remoteAddress().port();

            Map<String, Object> publicNotice = createPublicNotice(host, port, message);
            vertx.eventBus().publish("chat.to.client", new Gson().toJson(publicNotice));
            return true;
        } else
            return false;
    }

    //создание уведомления о публикации сообщения.
    private Map<String, Object> createPublicNotice(String host, int port, String message) {
        Date time = Calendar.getInstance().getTime();

        Map<String, Object> notice = new TreeMap<>();
        notice.put("type", "publish");
        notice.put("time", time.toString());
        notice.put("host", host);
        notice.put("port", port);
        notice.put("message", message);
        return notice;
    }

    //тип события - регистрация обработчика.
    private void registerEvent(BridgeEvent event) {
        if (event.rawMessage() != null
                && event.rawMessage().getString("address").equals("chat.to.client"))
            new Thread(() ->
            {
                Map<String, Object> registerNotice = createRegisterNotice();
                vertx.eventBus().publish("chat.to.client", new Gson().toJson(registerNotice));
            }).start();
    }

    //создание уведомления о регистрации пользователя.
    private Map<String, Object> createRegisterNotice() {
        Map<String, Object> notice = new TreeMap<>();
        notice.put("type", "register");
        notice.put("online", online.incrementAndGet());
        return notice;
    }

    //тип события - закрытие сокета.
    private void closeEvent(BridgeEvent event) {
        new Thread(() ->
        {
            Map<String, Object> closeNotice = createCloseNotice();
            vertx.eventBus().publish("chat.to.client", new Gson().toJson(closeNotice));
        }).start();
    }

    //создание уведомления о выходе пользвателя из чата.
    private Map<String, Object> createCloseNotice() {
        Map<String, Object> notice = new TreeMap<>();
        notice.put("type", "close");
        notice.put("online", online.decrementAndGet());
        return notice;
    }

    //довольно простая проверка сообщения,
    // конечно её можно усложнить,
    // но для пример и этого достаточно ;)
    private boolean verifyMessage(String msg) {
        return msg.length() > 0
                && msg.length() <= 140;
    }
}


VerticleLoader.java
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.StringEscapeUtils;

import java.io.File;
import java.io.IOException;
import java.util.function.Consumer;

public class VerticleLoader {
    private static Vertx vertx;

    public static Vertx getVertx() {
        return vertx;
    }

    public static void load() {
        load(null);
    }

    public static void load(Handler<AsyncResult<String>> completionHandler) {
        VertxOptions options = new VertxOptions().setClustered(false);
        //путь до verticle-класса.
        String dir = "chat/src/main/java/";

        try {
            File current = new File(".").getCanonicalFile();
            if (dir.startsWith(current.getName()) && !dir.equals(current.getName())) {
                dir = dir.substring(current.getName().length() + 1);
            }
        } catch (IOException e) {
        }

        System.setProperty("vertx.cwd", dir);
        String verticleID = Server.class.getName();

        Consumer<Vertx> runner = vertx ->
        {
            try {
                if (completionHandler == null)
                    vertx.deployVerticle(verticleID);
                else
                    vertx.deployVerticle(verticleID, completionHandler);
            } catch (Throwable t) {
                t.printStackTrace();
            }
        };

        if (options.isClustered()) {
            Vertx.clusteredVertx(options, res ->
            {
                if (res.succeeded()) {
                    vertx = res.result();
                    runner.accept(vertx);
                } else {
                    res.cause().printStackTrace();
                }
            });
        } else {
            vertx = Vertx.vertx(options);
            runner.accept(vertx);
        }
    }

    public static void main(String[] args) {
        load();
    }
}


ChatTest.java
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.io.IOException;

@RunWith(VertxUnitRunner.class)
public class ChatTest {
    private Vertx vertx;
    private int port = 8080;
    private Logger log = LoggerFactory.getLogger(ChatTest.class);

    //@Ignore
    @Before
    public void setUp(TestContext context) throws IOException {
        //развертывание нашей Verticle.
        VerticleLoader.load(context.asyncAssertSuccess());
        vertx = VerticleLoader.getVertx();
    }

    //@Ignore
    @After
    public void tearDown(TestContext context) {
        vertx.close(context.asyncAssertSuccess());
    }

    //@Ignore
    @Test
    public void loadVerticleTest(TestContext context) {
        log.info("*** loadVerticleTest ***");

        Async async = context.async();
        vertx.createHttpClient().getNow(port, "localhost", "/", response ->
        {
            //проверка доступности развернутого нами приложения.
            context.assertEquals(response.statusCode(), 200);
            context.assertEquals(response.headers().get("content-type"), "text/html");

            //проверка содержимого страницы.
            response.bodyHandler(body ->
            {
                context.assertTrue(body.toString().contains("<title>Chat</title>"));
                async.complete();
            });
        });
    }

    //@Ignore
    @Test
    public void eventBusTest(TestContext context) {
        log.info("*** eventBusTest ***");

        Async async = context.async();
        EventBus eb = vertx.eventBus();
        //ожидание события на шине.
        eb.consumer("chat.to.server").handler(message ->
        {
            String getMsg = message.body().toString();
            context.assertEquals(getMsg, "hello");
            async.complete();
        });

        //отправка сообщения на шину.
        eb.publish("chat.to.server", "hello");
    }
}


index.html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Chat</title>
    <meta charset="windows-1251">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <script src="//cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
    <link rel="stylesheet" href="http://maxcdn.bootstrapcdn.com/bootstrap/3.3.5/css/bootstrap.min.css">
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.3/jquery.min.js"></script>
    <script src="http://maxcdn.bootstrapcdn.com/bootstrap/3.3.5/js/bootstrap.min.js"></script>
    <script src="date-format.js"></script>
    <script src="vertx-eventbus.js"></script>

    <style type="text/css">
        body {
            padding-top: 40px;
            padding-bottom: 40px;
            background-color: #f5f5f5;
        }

        .received{
            width: 160px;
            font-size: 10px;
        }

        input[type=text]:focus, textarea:focus{
            box-shadow: 0 0 5px #4cae4c;
            border: 1px solid #4cae4c;
        }

        .tab-content{
            padding:5px
        }
    </style>

    <script>
        var online = 0; //счетчик онлайн-пользователей.
        var eb = new EventBus("/eventbus/"); //шина событий.

        eb.onopen = function() {
            //обработчик событий в чате.
            eb.registerHandler("chat.to.client", eventChatProcessing);
        };

        //обработчик событий в чате.
        function eventChatProcessing(err, msg) {
            var event = jQuery.parseJSON(msg.body);

			if (event.type == 'publish') {//сообщение.
				var time = Date.parse(event.time);
				var formattedTime = dateFormat(time, "dd.mm.yy HH:MM:ss");

				//добавить сообщение.
				appendMsg(event.host, event.port, event.message, formattedTime);
			} else { //изменение числа пользователей.
			    //type: register или close.
			    online = event.online;
				$('#online').text(online);
			}
        };

        //добавление нового сообщения.
		function appendMsg(host, port, message, formattedTime){
			var $msg = $('<tr bgcolor="#dff0d8"><td align="left">' + formattedTime
					+ '</td><td align="left">' + host + ' [' + port + ']'
					+ '</td><td>' + message
					+ '</td></tr>');

            var countMsg = $('#messages tr').length;
			if (countMsg == 0)
				$('#messages').append($msg);
			else
			    $('#messages > tbody > tr:first').before($msg);
		}

        $(document).ready(function() {
            //событие отправления сообщения.
            $('#chatForm').submit(function(evt) {
                evt.preventDefault();
                var message = $('#message').val();
                if (message.length > 0) {
                    //отправление сообщения на шину событий.
                    eb.publish("chat.to.server", message);
                    $('#message').val("").focus();
                    countChar();
                }
            });
        });

        //счетчик введенных символов.
        function countChar() {
            var len = $('#message').val().length;
            if (len > 140) {
                var msg = $('#message').val().substring(0, 140);
                $('#message').val(msg);
            } else {
                $('#charNum').text(140 - len);
                var per = 100 / 140 * len;
                $('#charNumProgressBar').css('width', per+'%').attr('aria-valuenow', per);
            }
        };
    </script>
</head>
<body>
    <div class="container chat-wrapper">
        <form id="chatForm">
            <h2 align="center" class="alert alert-success">CHAT ROOM</h2>
            <fieldset>
                <div class="input-group input-group-lg">
                    <span class="input-group-addon" id="onlineIco">
                        <span class="glyphicon glyphicon-eye-open"></span>
                    </span>
                    <span class="input-group-addon" id="online">
                        <span class="glyphicon glyphicon-option-horizontal"></span>
                    </span>
                    <input type="text" maxlength="141" autocomplete="off" class="form-control"
                        placeholder="What's new?" id="message" aria-describedby="sizing-addon1"
                        onkeyup="countChar()"/>
                    <span class="input-group-btn">
                        <button class="btn btn-success" type="submit">
                            <span class="glyphicon glyphicon-send"></span>
                        </button>
                    </span>
                </div>
            </fieldset>

            <h3 id="charNum">140</h3>

            <div class="progress">
                <div id="charNumProgressBar" class="progress-bar progress-bar-success active" role="progressbar"
                     aria-valuenow="0" aria-valuemin="0" aria-valuemax="100" style="width: 0%">
                    <span class="sr-only">100% Complete</span>
                </div>
            </div>

            <div class="panel panel-success">
                <div class="panel-heading"><h3>New messages</h3></div>
                    <table id="messages" class="table table-hover" width="100%">
                        <colgroup>
                            <col style="width:10%">
                            <col style="width:10%">
                            <col style="width:10%">
                        </colgroup>
                    </table>
            </div>
        </form>
    </div>
</body>
</html>



Полезные ресурсы


Комментарии (7)


  1. gurinderu
    08.02.2016 14:59

    Забавно конечно вы свободный порт определяете, после close его может кто-то успеть занять)


  1. A_Gura
    08.02.2016 15:06

    Стоило написать статью, как релизнулась версия 3.2.1.

    Счетчик на базе AtomicInteger будет работать корректно только в случае одного экземпляра вашего чата. Как только вы запустите несколько вертиклов чата, начнете получать некорректные значения.


    1. oxaoo
      08.02.2016 16:18

      Счетчик на базе AtomicInteger будет работать корректно только в случае одного экземпляра вашего чата. Как только вы запустите несколько вертиклов чата, начнете получать некорректные значения.

      Это еще почему, можно поподробней?


      1. A_Gura
        08.02.2016 16:31

        Потому что у каждого вертикла будет свой экземпляр этого атомика.


        1. oxaoo
          08.02.2016 17:24

          Совсем нет. По умолчанию кластеризация в vert.x отключена, поэтому несколько созданных инстансов чата ни как не могут повлиять друг на друга, отправляя события в EventBus.


          1. A_Gura
            08.02.2016 17:25

            Я вам как раз про кластеризацию.