Довольно продолжительное время существовала только одна достойная реализация работы с HTTP поверх Akka — spray. К этой библиотеке пару умельцев написали расширения для WebSocket,
которое было вполне понятно в использовании и проблем не возникало. Но годы шли и spray, в том или ином виде, перекочевал в Akka HTTP с реализованной поддержкой WebSocket из коробки.
Для работы с WebSocket ребята из Akka предлагают нам использовать Akka Stream, тем самым упрощая нам жизнь с потоковыми данными и, одновременно, усложняя ее. Akka Stream не так прост в понимании. Далее я попытаюсь показать базовые практические примеры использования.

Коротко об Akka Stream


Это своеобразный pipeline обработки данных, каждая итерация которого что-либо делает с данными, попадающими в него. Flow делится на 3 составляющие: Source, GraphStage, Sink.
Лучше всего это показано на диаграмме из документации
image

Для реализации WebSocket нам потребуется реализовывать GraphStagе. Source нам предоставляет akka, это как раз и есть наш клиент с летящими от него сообщениями. А Sink — сама отправка наших сообщений клиенту.

Actor style


Пожалуй один из самых неэффективных способов обработки, но самый простой для понимания.
Идея его заключается в том, чтобы все входящие сообщения попадали в актор, и у него был ActorRef, который отправлял данные непосредственно клиенту.

import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, Terminated}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._

import scala.io.StdIn

object Boot extends App {
  implicit val system = ActorSystem("example")
  implicit val materializer = ActorMaterializer()

  def flow: Flow[Message, Message, Any] = {
    val client = system.actorOf(Props(classOf[ClientConnectionActor]))
    val in = Sink.actorRef(client, 'sinkclose)
    val out = Source.actorRef(8, OverflowStrategy.fail).mapMaterializedValue { a ?
      client ! ('income > a)
      a
    }
    Flow.fromSinkAndSource(in, out)
  }

  val route = path("ws")(handleWebSocketMessages(flow))
  val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

  println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
  StdIn.readLine()

  import system.dispatcher
  bindingFuture
    .flatMap(_.unbind())
    .onComplete(_ ? system.terminate())
}

class ClientConnectionActor extends Actor {
  var connection: Option[ActorRef] = None

  val receive: Receive = {
    case ('income, a: ActorRef) ? connection = Some(a); context.watch(a)
    case Terminated(a) if connection.contains(a) ? connection = None; context.stop(self)
    case 'sinkclose ? context.stop(self)

    case TextMessage.Strict(t) ? connection.foreach(_ ! TextMessage.Strict(s"echo $t"))
    case _ ? // ingone
  }

  override def postStop(): Unit = connection.foreach(context.stop)
}

На каждое подключение клиента мы создаем актор ClientConnectionActor. А также Source, который будет представлять из себя еще один актор, направляющий полученные сообщения во flow. После его создания через метод mapMaterializedValue мы получим на него ссылку. Кроме этого мы создаем Sink, который все сообщения будет отправлять в ClientConnectionActor.

Таким образом ClientConnectionActor будет получать все сообщения из сокета. Отправлять мы их будем через прилетевший ему ActorRef, который будет доставлять их клиенту.

Минусы: необходимо следить за побочными акторами; быть аккуратным с OverflowStrategy; для обработки всех сообщений у нас всего один актор, он, соотвественно, однопоточный, из-за чего могут последовать проблемы с производительностью.

Производный вариант с использованием ActorPublisher и ActorSubscriber мы рассматривать не будем, так как, судя по официальной документации, он в состоянии deprecated.

Flow style


Идея данного подхода заключается в полном ипользовании Akka Stream для достижения целей. Общий вид его сводится к построению pipeline обработки входящих сообщений клиента.

Скелет
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._

import scala.io.StdIn

object Boot extends App {
  implicit val system = ActorSystem("example")
  implicit val materializer = ActorMaterializer()

  def flow: Flow[Message, Message, Any] = {
    Flow[Message].collect {
      case TextMessage.Strict(t) ? t
    }.map { text ?
      TextMessage.Strict(s"echo: $text")
    }
  }

  val route = path("ws")(handleWebSocketMessages(flow))
  val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

  println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
  StdIn.readLine()

  import system.dispatcher
  bindingFuture
    .flatMap(_.unbind())
    .onComplete(_ ? system.terminate())
}

В данном случае мы обрабатываем только текстовые сообщения и изменяем их. Дальше TextMessage отправляется клиенту.


Теперь немного усложним скелет и добавим парсинг и сериализацию JSON.

Классы для сериализации
trait WsIncome
trait WsOutgoing
@JsonCodec case class Say(name: String) extends WsIncome with WsOutgoing

implicit val WsIncomeDecoder: Decoder[WsIncome] = Decoder[Say].map[WsIncome](identity)
implicit val WsOutgoingEncoder: Encoder[WsOutgoing] = {
  case s: Say ? s.asJson
}


Модифицируем flow

Flow[Message]
 .collect {
   case tm: TextMessage ? tm.textStream
 }
 .mapAsync(CORE_COUNT * 2 - 1)(in ? in.runFold("")(_ + _).flatMap(in ? Future.fromTry(parse(in).toTry.flatMap(_.as[WsIncome].toTry))))
 .collect {
   case Say(name) ? Say(s"hello: $name")
 }
 .mapAsync(CORE_COUNT * 2 - 1)(out ? Future(TextMessage(out.asJson.noSpaces)))

Сперва мы отсекаем все бинарные сообщения, далее парсим входящий поток в JSON, обрабатываем его и сериализуем в текст для отправки клиенту.

Усложним конструкцию, добавив контекст для каждого клиента. В этом нам поможет statefulMapConcat.

ClientContext
class ClientContext {
  @volatile var userName: Option[String] = None
}
object ClientContext {
  def unapply(arg: ClientContext): Option[String] = arg.userName
}

@JsonCodec case class SetName(name: String) extends WsIncome
@JsonCodec case class Say(text: String) extends WsIncome with WsOutgoing

implicit val WsIncomeDecoder: Decoder[WsIncome] =
  Decoder[Say].map[WsIncome](identity)
    .or(Decoder[SetName].map[WsIncome](identity))


def flow: Flow[Message, Message, Any] = {
  Flow[Message]
    .collect {
      case tm: TextMessage ? tm.textStream
    }
    .mapAsync(CORE_COUNT * 2 - 1)(in ? in.runFold("")(_ + _).flatMap(in ? Future.fromTry(parse(in).toTry.flatMap(_.as[WsIncome].toTry))))
    .statefulMapConcat(() ? {
      val context = new ClientContext
      m ? (context > m) :: Nil
    })
    .mapConcat {
      case (c: ClientContext, SetName(name)) ?
        c.userName = Some(name)
        Nil
      case a ? a :: Nil
    }
    .collect {
      case (ClientContext(userName), Say(text)) ? Say(s"$userName: $text")
      case (_, Say(text)) ? Say(s"unknown: $text")
    }
    .mapAsync(CORE_COUNT * 2 - 1)(out ? Future(TextMessage(out.asJson.noSpaces)))
}

Есть и другой способ: можно реализовать свой filter/map унаследовав GraphStage[FlowShape[A, A]].

Пример (не адаптировано под предыдущий код)
class AuthFilter(auth: ws.AuthMessage ? Future[Option[UserProfile]])(implicit ec: ExecutionContext) extends GraphStage[FlowShape[ws.WsIncomeMessage, ws.WsContextIncomeMessage]] {

  val in = Inlet[ws.WsIncomeMessage]("AuthFilter.in")
  val out = Outlet[ws.WsContextIncomeMessage]("AuthFilter.out")

  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      @volatile var profile: Option[UserProfile] = None
      setHandler(in, new InHandler {
        override def onPush(): Unit = profile match {
          case Some(p) ? push(out, ws.WsContextIncomeMessage(p, grab(in)))
          case _ ? grab(in) match {
            case a: ws.AuthMessage ? auth(a) onComplete {
              case Success(p) ?
                profile = p
                pull(in)
              case Failure(e) ? fail(out, e)
            }
            case _ ? pull(in)
          }
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = pull(in)
      })
    }
  }
}

В данном варианте фильтруется все сообщения до тех пор, пока не получено сообщение на авторизацию. Если авторизация пройдет успешно, сообщения проходят дальше совместно с профилем пользователя.

И напоследок сделаем так, чтобы всем подключенным пользователям каждую секунду отправлялось текущее время:

case object Tick extends WsOutgoing
implicit val WsOutgoingEncoder: Encoder[WsOutgoing] = {
   case s: Say ? s.asJson
   case Tick ? Json.obj("time" > DateTime.now.toIsoDateTimeString().asJson)
}

...

val broadcast = Source.tick[WsOutgoing](1.second, 1.second, Tick)

...

.collect {
  case (ClientContext(userName), Say(text)) ? Say(s"$userName: $text")
  case (_, Say(text)) ? Say(s"unknown: $text")
}
.merge(broadcast)
.mapAsync(CORE_COUNT * 2 - 1)(out ? Future(TextMessage(out.asJson.noSpaces)))

Это базовые примеры того, как можно реализовать поддержку WebSocket в своем проекте. Пакет Akka Stream большой и разннобразный, он поможет решить довольно большой пласт задач, не переживая за масштабирование и параллелизацию.

PS: Используя новую для вас технологию в более-менее нагруженном проекте, не забывайте проводить нагрузочное тестирование, следить за памятью и горячими участками кода (в этом вам может помочь gatling). Всем добра.
Поделиться с друзьями
-->

Комментарии (6)


  1. AndreyNagih
    20.01.2017 08:38

    А это так должно быть, что в исходниках стрелки заменились на спецсимволы? Это отголоски внедрения парсера формул в редактор?


    1. 4lex1v
      20.01.2017 11:02

      В IDEA в настройках можно включить автозамену. В Scala плагине для Emacs'a тоже такая штука имеется


  1. fuCtor
    20.01.2017 11:40

    Спасибо за статью, прям актуально.
    Побольше бы статей про Akka и Streams в частности. Недавно с ними разбирался, с ходу не зашло. Но после пары статей с примерами все встало на свои места.
    В принципе streams позволяет представить весь data flow как некоторый поток, где и просто преобразования, и запросы по сети, и теже акторы спокойно интегрированы. Новый источник, не проблема Source — map — merge и в общий котел, надо отправить еще кроме email sms: %flow% — broadcast(2) — Sink. Красота. Ну и главный плюс back pressure из коробки.


  1. Googolplex
    20.01.2017 12:45
    +3

    Хочу сказать, что ставить GraphStage на один уровень с Sink и Source — это примерно как говорить, что у нас есть Map, Set и LinkedList. Утверждение корректное, но Sink и Source — это интерфейсы, обозначающие конец и начало стрима, а GraphStage — это конкретная реализация стримов произвольной формы (GraphStage может иметь форму Sink'ов, Source'ов, Flow'ов, BidiFlow'ов и прочего).


    Основные интерфейсы в Akka Streams это Source, Flow и Sink:


    Linear processing pipelines can be expressed in Akka Streams using the following core abstractions:

    Source: A processing stage with exactly one output, emitting data elements whenever downstream processing stages are ready to receive them.

    Sink: A processing stage with exactly one input, requesting and accepting data elements possibly slowing down the upstream producer of elements

    Flow: A processing stage which has exactly one input and output, which connects its up- and downstreams by transforming the data elements flowing through it.

    RunnableGraph: A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be run().

    GraphStage же позволяет реализовать на низком уровне произвольную логику части стрима с произвольным количеством входов и выходов. Это очень мощная штука, но ей довольно сложно пользоваться, и, как правило, она нужна для реализации абстрактных примитивов, которые потом переиспользуются, а не для непосредственно бизнес-логики приложения. Для бизнес-логики гораздо удобнее и проще использовать акторов.


  1. solver
    20.01.2017 13:17
    +1

    У вас есть небольшая недоработка.
    Если сообщение будет большим, то оно просто потеряется.
    Недостаточно

    case TextMessage.Strict(t) ? 
    

    еще надо добавить обработку
    case TextMessage.Streamed(stream) ? 
    

    Например так
    case TextMessage.Streamed(stream) ? stream.runFold("")(_ + _)
    


    1. voooka
      20.01.2017 13:45

      В примерах с flow так и сделано. Пример для актора не хотел нагромождать.