Если в процессе изучения gRPC хотите попрактиковаться с Bidirectional Streaming (двунаправленная потоковая передача данных), c запросами в рамках одного соединения, инициированием событий со стороны сервера, то создание простого чата может быть отличным способом.

Демонстрация работы консольного чата
Демонстрация работы консольного чата

Проект будем писать на языке Scala с использованием библиотеки fs2-grpc. Будем использовать клиент-серверную архитектуру, где клиенты могут отправлять сообщения на сервер, который будет ретранслировать их всем подключенным клиентам.

gRPC

Но прежде чем начать, давайте вспомним, что такое gRPC и как он связан с HTTP/2 не углубляясь в подробности (на эту тему и так достаточно статей).

gRPC - это RPC-фреймворк (Remote Procedure Call), который позволяет создавать клиент-серверные приложения для обмена данными. gRPC использует под капотом протокол HTTP/2, который позволяет ускорить передачу данных, уменьшить объем передаваемых данных и снизить задержку. Важно упомянуть о том, что gRPC использует Protobuf чтобы определить методы и структуру сообщений с помощью специального языка описания интерфейсов, а затем сгенерировать код для работы с этими сообщениями на различных языках программирования. Protobuf обеспечивает эффективную сериализацию/десериализацию данных в компактный бинарный формат.

Механизм работы чата

Ограничения

Во первых, чтобы не усложнять проект, я решил делать чат консольным и не хранить сообщения на стороне сервера.

Bidirectional Streaming

Поговорим про механизм работы чата с Bidirectional Streaming в gRPC. Процесс обмена сообщениями будет работать таким образом, что сервер и клиент обмениваются потоками сообщений в рамках одного соединения. Клиент отправляет событие, сервер его получает, обрабатывает, а затем отправляет ответное событие. Клиент и сервер обмениваются сообщениями асинхронно. Таким образом, данные передаются между клиентом и сервером в реальном времени и в обе стороны.

Мультикастинг событий внутри сервера

Когда клиенты подключаются к серверу, каждый из них может отправлять события на сервер. Однако, возникает проблема - как переслать сообщения от одного клиента остальным подключенным клиентам.

Для решения этой проблемы можно использовать механизм мультикастинга с помощью топика. Топик - это объект, который позволяет отправлять сообщения одновременно нескольким подписчикам. То есть, если один клиент отправляет событие, то полученное событие на стороне сервера будет направлено на этот топик, а оттуда автоматически пересылается всем клиентам, подписанным на этот топик.

Для реализации мультикастинга я использовал Topic из библиотеки fs2 (Functional Streams for Scala).

Таким образом, визуально механизм взаимодействия клиентов в сервером выглядит примерно так

Реализация

Для языка Scala есть несколько библиотек для работы с gRPC. Я использую fs2-grpc, который является оберткой над ScalaPB и сделана на основе функциональной библиотеки для работы со стримами - fs2.

fs2-grpc поддерживает все типы RPC-вызовов - Unary, Server Streaming, Client Streaming и Bidirectional Streaming. Она также предоставляет механизмы обработки ошибок и управления ресурсами, такие как Resource и Bracket. fs2-grpc интегрируется со стеком функциональных библиотек для работы с эффектами (cats-effect, zio, monix). В моем примере используется Cats Effect 3.

Proto

И так, приступим. В первую очередь нужно накидать прото-файл, в котором опишем контракт взаимодействия клиента и сервера.

Создадим некоторый ChatService с методом eventsStream, у которого на входе и на выходе потоковые данные с типом Events (то есть будем события через стримы туда-сюда делать).

service ChatService {  
  
  rpc eventsStream(stream Events) returns (stream Events) { }  
}

Events содержит данные обернутые в тип события, которые могут быть инициированы как на стороне клиента, так и сервера (в нашем случае только на стороне клиентов).

message Events {  
  
  oneof event {  
    Login    client_login    = 1;  
    Logout   client_logout   = 2;  
    Message  client_message  = 3;  
    Shutdown server_shutdown = 4;  
  }

Реализация сервера

Ранее мы говорили, что сервер должен получать события от клиентов и транслировать их остальным клиентам.

После компиляции прото-файла будет сгенерирован базовый код для работы с gRPC, среди которого будет интерфейс ChatServiceFs2Grpc. Он должен быть имплементирован на стороне сервера. Моя реализация имеет следующий вид.

object ChatService {  
  
  def apply[F[_]: Concurrent: Console](  
      eventsTopic: Topic[F, Events]  
  ): ChatServiceFs2Grpc[F, Metadata] = new ChatServiceFs2Grpc[F, Metadata] {  
  
    val eventsToClients: Stream[F, Events] =  
      eventsTopic  
        .subscribeUnbounded  
        .evalTap(event => Console[F].println(s"From topic: $event"))
  
    override def eventsStream(  
        eventsFromClient: fs2.Stream[F, Events],  
        ctx: Metadata  
    ): fs2.Stream[F, Events] = {  
      eventsToClients.concurrently(  
        eventsFromClient  
          .evalTap(event => Console[F].println(s"Event from client: $event"))  
          .evalMap(eventsTopic.publish1)  
      )  
    }  
  }  
}

Мы видим метод eventsStream, который описывали в proto-файле. Из потока eventsFromClient получаем события от клиентов. На выходе отдаем некоторый поток событий eventsToClients. Если посмотреть выше, то видим, что eventsToClients это подписка на топик eventsTopic: Topic[F, Events], в который публикуются события от клиента для отправки остальным клиентам.

Сборка и запуск сервера

Собираем все компоненты, которые представляют собой основу серверного приложения.

object ChatServerApp extends IOApp {  
  
  private def runServer(service: ServerServiceDefinition): IO[Nothing] = {  
    NettyServerBuilder  
      .forPort(50053)  
      .keepAliveTime(5, TimeUnit.SECONDS)  
      .addService(service)  
      .resource[IO]  
      .evalMap(server => IO(server.start()))  
      .useForever  
  }  
  
  override def run(args: List[String]): IO[ExitCode] = for {  
    topic <- Topic[IO, Events]  
    serviceResource = ChatServiceFs2Grpc.bindServiceResource[IO](ChatService(topic))  
    _ <- serviceResource.use(runServer)  
  } yield ExitCode.Success  
}

В функции runServer создается и запускается новый сервер с помощью NettyServerBuilder, который прослушивает порт 50053. NettyServerBuilder предоставляется библиотекой gRPC для создания серверов, использующих Netty в качестве транспорта и позволяет настроить параметры сервера (порт, keepAliveTime и т.д.)

В методе run создается топик, который будет использоваться для мультикастинга событий по клиентам. Создаем инстанс сервиса ChatService и биндим его к серверу. Затем запускаем наш сервер.

$ sbt "runMain org.github.ainr.chat.server.ChatServerApp"

В итоге, когда сервер запущен, клиенты смогут подключаться к нему, отправлять сообщения и получать их в режиме реального времени.

Реализация клиента

Что должен делать клиент? Клиент может показаться чутка сложнее, но на самом деле тут тоже все просто. Клиент делает несколько простых вещей:

  • Читает ввод в консоль

  • Отправляет события серверу

  • Получает события от сервера и обрабатывает их

  • Печатает полученные сообщения в консоль

Со стороны клиента тоже все сделано на стримах (Stream).

Чтение ввода из консоли

Для чтения ввода из консоли снова прибегаем к помощи стримов. Создаем класс InputStream с методом read, который возвращает поток сообщений напечатанных клиентом - Stream[F, String].

object InputStream {  
  
  def apply[F[_]: Async: Console](bufSize: Int): InputStream[F] = {  
  
    new InputStream[F] {  
  
      override def read: Stream[F, String] = {  
        fs2.io  
          .stdinUtf8(bufSize)  
          .through(fs2.text.lines)  
          .evalTap(erase)          // удалить из консоли ввод
          .filter(_.nonEmpty)      // фильтруем пустые строки
      }  
  
      private def erase: PartialFunction[String, F[Unit]] = {  
        _ => Console[F].print("\u001b[1A\u001b[0K")  
      }  
    }  
  }  
}

По коду видно, что он берет поток символов, преобразует их в строки и фильтрует пустые. Магическим может показаться только лишь метод erase, который печатает что-то непонятное в консоль.

private def erase: PartialFunction[String, F[Unit]] = {  
  _ => Console[F].print("\u001b[1A\u001b[0K")}  

На самом деле никакой магии нет. Все, что он делает - это удаляет то, что мы напечатали в консоль путем ввода спец-символов ANSI чтобы сообщения не дублировались.

Логика клиента

Далее введенный пользователем в консоль текст нужно преобразовать в тип события Event и отправить серверу.

В целом, логика клиента довольно простая и описана путем композиции стримов в методе start. Здесь снова фигурирует chatService: ChatServiceFs2Grpc[F, Metadata] с методом eventsStream сгенерированный библиотекой fs2-grpc на вход которого отправляем события из консоли (InputStream), генерируемые пользователем.

object ChatClient {  
  
  def apply[F[_]: Concurrent: Console](  
      clientName: String,  
      inputStream: InputStream[F],  
      chatService: ChatServiceFs2Grpc[F, Metadata]  
  ): ChatClient[F] = new ChatClient[F] {  
  
    private val grpcMetaData = new Metadata() // empty  
  
    override def start: F[Unit] = {  
      chatService  
        .eventsStream(  
          login(clientName) ++ inputStream.read.through(handleInput),  
          grpcMetaData  
        )  
        .through(processEvent)    // обрабатываем полученные события от сервера
        .through(writeToConsole)  // пишем в консоль
        .compile  
        .drain  
    }

private def login(clientName: String): fs2.Stream[F, Events] =  
  fs2.Stream(Events(ClientLogin(Login(clientName))))
  
// ...

Metadata в gRPC - это способ передачи дополнительных метаданных между клиентом и сервером, которые представляет собой пары ключ-значение и могут быть добавлены к любому запросу.

На выходе eventsStream ловим события с сервера, сгенерированные другими клиентами, обрабатываем их методом processEvent, который преобразовывает события в строки.

private def processEvent: Pipe[F, Events, String] =  
  _.map { data =>  
    data.event match {  
      case event: ClientLogin   => s"${Color.Green(event.value.name).overlay(Bold.On)} entered the chat."
      case event: ClientLogout  => s"${Color.Blue(event.value.name).overlay(Bold.On)} left the chat."
      case event: ClientMessage => s"${Color.LightGray(s"${event.value.name}:").overlay(Bold.On)} ${event.value.message}"  
      case _: ServerShutdown    => s"${Color.LightRed("Server shutdown")}"  
      case unknown              => s"${Color.Red("Unknown event:")} $unknown"  
    }  
  }

Для форматированного вывода текста в консоли используется библиотека fansi от lihaoyi, предназначенная для работы с цветами и стилями текста в консольном приложении. Она позволяет добавлять цветовые и стилевые эффекты к тексту, что делает консольный вывод более информативным и привлекательным. Далее сообщения будут напечатаны в консоль методом writeToConsole.

Сборка и запуск клиента

Собираем все компоненты, которые представляет собой основу клиентского приложения.

object ChatClientApp extends IOApp {  
  
  private def buildChatService(channel: Channel): Resource[IO, ChatServiceFs2Grpc[IO, Metadata]] =  
    ChatServiceFs2Grpc.stubResource[IO](channel)  
  
  private def resources: Resource[IO, ChatServiceFs2Grpc[IO, Metadata]] =  
    NettyChannelBuilder  
      .forAddress("127.0.0.1", 50053)  
      .usePlaintext()  
      .resource[IO]  
      .flatMap(buildChatService)  
  
  override def run(args: List[String]): IO[ExitCode] =  
    resources.use { chatServiceFs2Grpc =>  
      ChatClient(  
        args.headOption.getOrElse("Anonymous"),  
        InputStream[IO](bufSize = 1024),  
        chatServiceFs2Grpc  
      ).start  
    }.as(ExitCode.Success)  
}

NettyChannelBuilder - это класс, предоставляемый библиотекой gRPC для создания клиентов, использующих Netty в качестве транспорта. Он позволяет настроить параметры клиента, такие как адрес сервера, используемый протокол, методы аутентификации и т.д.

В функции buildChatService создается ресурс, который представляет собой клиент для обращения к серверу чата. Для его создания используется метод stubResource из ChatServiceFs2Grpc.

Запускаем клиент через sbt, передав в аргументы имя клиента.

$ sbt "runMain org.github.ainr.chat.client.ChatClientApp Username"

И можем общаться :)

Вместо заключения

Создание небольших, простых проектов - это отличный способ попрактиковаться и углубить свои знания в технологиях. Это может быть что-то, что вы можете написать быстро и без особых усилий, но в то же время дает возможность изучить какой-то новый аспект технологии или языка программирования.

Простые проекты могут быть очень разнообразными. Например, вы можете написать небольшой веб-сервер, создать небольшую игру, написать скрипт для автоматического сбора данных, или же написать чат на базе gRPC, как мы обсуждали ранее.

Преимущество создания небольших проектов заключается в том, что вы можете более глубоко изучить технологию и применить знания на практике. Вы также можете быстро увидеть результат своей работы и получить удовлетворение от завершения проекта.

Не бойтесь начинать с чего-то простого и постепенно увеличивать сложность - это поможет вам стать более опытным и уверенным программистом.

Исходники

Код проекта можно посмотреть на гитхабе:

Комментарии (0)