которое было вполне понятно в использовании и проблем не возникало. Но годы шли и spray, в том или ином виде, перекочевал в Akka HTTP с реализованной поддержкой WebSocket из коробки.
Для работы с WebSocket ребята из Akka предлагают нам использовать Akka Stream, тем самым упрощая нам жизнь с потоковыми данными и, одновременно, усложняя ее. Akka Stream не так прост в понимании. Далее я попытаюсь показать базовые практические примеры использования.
Коротко об Akka Stream
Это своеобразный pipeline обработки данных, каждая итерация которого что-либо делает с данными, попадающими в него. Flow делится на 3 составляющие: Source, GraphStage, Sink.
Лучше всего это показано на диаграмме из документации
Для реализации WebSocket нам потребуется реализовывать GraphStagе. Source нам предоставляет akka, это как раз и есть наш клиент с летящими от него сообщениями. А Sink — сама отправка наших сообщений клиенту.
Actor style
Пожалуй один из самых неэффективных способов обработки, но самый простой для понимания.
Идея его заключается в том, чтобы все входящие сообщения попадали в актор, и у него был ActorRef, который отправлял данные непосредственно клиенту.
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, Terminated}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._
import scala.io.StdIn
object Boot extends App {
implicit val system = ActorSystem("example")
implicit val materializer = ActorMaterializer()
def flow: Flow[Message, Message, Any] = {
val client = system.actorOf(Props(classOf[ClientConnectionActor]))
val in = Sink.actorRef(client, 'sinkclose)
val out = Source.actorRef(8, OverflowStrategy.fail).mapMaterializedValue { a ?
client ! ('income > a)
a
}
Flow.fromSinkAndSource(in, out)
}
val route = path("ws")(handleWebSocketMessages(flow))
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine()
import system.dispatcher
bindingFuture
.flatMap(_.unbind())
.onComplete(_ ? system.terminate())
}
class ClientConnectionActor extends Actor {
var connection: Option[ActorRef] = None
val receive: Receive = {
case ('income, a: ActorRef) ? connection = Some(a); context.watch(a)
case Terminated(a) if connection.contains(a) ? connection = None; context.stop(self)
case 'sinkclose ? context.stop(self)
case TextMessage.Strict(t) ? connection.foreach(_ ! TextMessage.Strict(s"echo $t"))
case _ ? // ingone
}
override def postStop(): Unit = connection.foreach(context.stop)
}
На каждое подключение клиента мы создаем актор ClientConnectionActor. А также Source, который будет представлять из себя еще один актор, направляющий полученные сообщения во flow. После его создания через метод mapMaterializedValue мы получим на него ссылку. Кроме этого мы создаем Sink, который все сообщения будет отправлять в ClientConnectionActor.
Таким образом ClientConnectionActor будет получать все сообщения из сокета. Отправлять мы их будем через прилетевший ему ActorRef, который будет доставлять их клиенту.
Минусы: необходимо следить за побочными акторами; быть аккуратным с OverflowStrategy; для обработки всех сообщений у нас всего один актор, он, соотвественно, однопоточный, из-за чего могут последовать проблемы с производительностью.
Производный вариант с использованием ActorPublisher и ActorSubscriber мы рассматривать не будем, так как, судя по официальной документации, он в состоянии
Flow style
Идея данного подхода заключается в полном ипользовании Akka Stream для достижения целей. Общий вид его сводится к построению pipeline обработки входящих сообщений клиента.
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._
import scala.io.StdIn
object Boot extends App {
implicit val system = ActorSystem("example")
implicit val materializer = ActorMaterializer()
def flow: Flow[Message, Message, Any] = {
Flow[Message].collect {
case TextMessage.Strict(t) ? t
}.map { text ?
TextMessage.Strict(s"echo: $text")
}
}
val route = path("ws")(handleWebSocketMessages(flow))
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine()
import system.dispatcher
bindingFuture
.flatMap(_.unbind())
.onComplete(_ ? system.terminate())
}
В данном случае мы обрабатываем только текстовые сообщения и изменяем их. Дальше TextMessage отправляется клиенту.
Теперь немного усложним скелет и добавим парсинг и сериализацию JSON.
trait WsIncome
trait WsOutgoing
@JsonCodec case class Say(name: String) extends WsIncome with WsOutgoing
implicit val WsIncomeDecoder: Decoder[WsIncome] = Decoder[Say].map[WsIncome](identity)
implicit val WsOutgoingEncoder: Encoder[WsOutgoing] = {
case s: Say ? s.asJson
}
Модифицируем flow
Flow[Message]
.collect {
case tm: TextMessage ? tm.textStream
}
.mapAsync(CORE_COUNT * 2 - 1)(in ? in.runFold("")(_ + _).flatMap(in ? Future.fromTry(parse(in).toTry.flatMap(_.as[WsIncome].toTry))))
.collect {
case Say(name) ? Say(s"hello: $name")
}
.mapAsync(CORE_COUNT * 2 - 1)(out ? Future(TextMessage(out.asJson.noSpaces)))
Сперва мы отсекаем все бинарные сообщения, далее парсим входящий поток в JSON, обрабатываем его и сериализуем в текст для отправки клиенту.
Усложним конструкцию, добавив контекст для каждого клиента. В этом нам поможет statefulMapConcat.
class ClientContext {
@volatile var userName: Option[String] = None
}
object ClientContext {
def unapply(arg: ClientContext): Option[String] = arg.userName
}
@JsonCodec case class SetName(name: String) extends WsIncome
@JsonCodec case class Say(text: String) extends WsIncome with WsOutgoing
implicit val WsIncomeDecoder: Decoder[WsIncome] =
Decoder[Say].map[WsIncome](identity)
.or(Decoder[SetName].map[WsIncome](identity))
def flow: Flow[Message, Message, Any] = {
Flow[Message]
.collect {
case tm: TextMessage ? tm.textStream
}
.mapAsync(CORE_COUNT * 2 - 1)(in ? in.runFold("")(_ + _).flatMap(in ? Future.fromTry(parse(in).toTry.flatMap(_.as[WsIncome].toTry))))
.statefulMapConcat(() ? {
val context = new ClientContext
m ? (context > m) :: Nil
})
.mapConcat {
case (c: ClientContext, SetName(name)) ?
c.userName = Some(name)
Nil
case a ? a :: Nil
}
.collect {
case (ClientContext(userName), Say(text)) ? Say(s"$userName: $text")
case (_, Say(text)) ? Say(s"unknown: $text")
}
.mapAsync(CORE_COUNT * 2 - 1)(out ? Future(TextMessage(out.asJson.noSpaces)))
}
Есть и другой способ: можно реализовать свой filter/map унаследовав GraphStage[FlowShape[A, A]].
class AuthFilter(auth: ws.AuthMessage ? Future[Option[UserProfile]])(implicit ec: ExecutionContext) extends GraphStage[FlowShape[ws.WsIncomeMessage, ws.WsContextIncomeMessage]] {
val in = Inlet[ws.WsIncomeMessage]("AuthFilter.in")
val out = Outlet[ws.WsContextIncomeMessage]("AuthFilter.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
@volatile var profile: Option[UserProfile] = None
setHandler(in, new InHandler {
override def onPush(): Unit = profile match {
case Some(p) ? push(out, ws.WsContextIncomeMessage(p, grab(in)))
case _ ? grab(in) match {
case a: ws.AuthMessage ? auth(a) onComplete {
case Success(p) ?
profile = p
pull(in)
case Failure(e) ? fail(out, e)
}
case _ ? pull(in)
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
}
}
}
В данном варианте фильтруется все сообщения до тех пор, пока не получено сообщение на авторизацию. Если авторизация пройдет успешно, сообщения проходят дальше совместно с профилем пользователя.
И напоследок сделаем так, чтобы всем подключенным пользователям каждую секунду отправлялось текущее время:
case object Tick extends WsOutgoing
implicit val WsOutgoingEncoder: Encoder[WsOutgoing] = {
case s: Say ? s.asJson
case Tick ? Json.obj("time" > DateTime.now.toIsoDateTimeString().asJson)
}
...
val broadcast = Source.tick[WsOutgoing](1.second, 1.second, Tick)
...
.collect {
case (ClientContext(userName), Say(text)) ? Say(s"$userName: $text")
case (_, Say(text)) ? Say(s"unknown: $text")
}
.merge(broadcast)
.mapAsync(CORE_COUNT * 2 - 1)(out ? Future(TextMessage(out.asJson.noSpaces)))
Это базовые примеры того, как можно реализовать поддержку WebSocket в своем проекте. Пакет Akka Stream большой и разннобразный, он поможет решить довольно большой пласт задач, не переживая за масштабирование и параллелизацию.
PS: Используя новую для вас технологию в более-менее нагруженном проекте, не забывайте проводить нагрузочное тестирование, следить за памятью и горячими участками кода (в этом вам может помочь gatling). Всем добра.
Комментарии (6)
fuCtor
20.01.2017 11:40Спасибо за статью, прям актуально.
Побольше бы статей про Akka и Streams в частности. Недавно с ними разбирался, с ходу не зашло. Но после пары статей с примерами все встало на свои места.
В принципе streams позволяет представить весь data flow как некоторый поток, где и просто преобразования, и запросы по сети, и теже акторы спокойно интегрированы. Новый источник, не проблема Source — map — merge и в общий котел, надо отправить еще кроме email sms: %flow% — broadcast(2) — Sink. Красота. Ну и главный плюс back pressure из коробки.
Googolplex
20.01.2017 12:45+3Хочу сказать, что ставить GraphStage на один уровень с Sink и Source — это примерно как говорить, что у нас есть
Map
,Set
иLinkedList
. Утверждение корректное, ноSink
иSource
— это интерфейсы, обозначающие конец и начало стрима, аGraphStage
— это конкретная реализация стримов произвольной формы (GraphStage
может иметь форму Sink'ов, Source'ов, Flow'ов, BidiFlow'ов и прочего).
Основные интерфейсы в Akka Streams это Source, Flow и Sink:
Linear processing pipelines can be expressed in Akka Streams using the following core abstractions:
Source: A processing stage with exactly one output, emitting data elements whenever downstream processing stages are ready to receive them.
Sink: A processing stage with exactly one input, requesting and accepting data elements possibly slowing down the upstream producer of elements
Flow: A processing stage which has exactly one input and output, which connects its up- and downstreams by transforming the data elements flowing through it.
RunnableGraph: A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be run().GraphStage же позволяет реализовать на низком уровне произвольную логику части стрима с произвольным количеством входов и выходов. Это очень мощная штука, но ей довольно сложно пользоваться, и, как правило, она нужна для реализации абстрактных примитивов, которые потом переиспользуются, а не для непосредственно бизнес-логики приложения. Для бизнес-логики гораздо удобнее и проще использовать акторов.
solver
20.01.2017 13:17+1У вас есть небольшая недоработка.
Если сообщение будет большим, то оно просто потеряется.
Недостаточно
case TextMessage.Strict(t) ?
еще надо добавить обработку
case TextMessage.Streamed(stream) ?
Например так
case TextMessage.Streamed(stream) ? stream.runFold("")(_ + _)
AndreyNagih
А это так должно быть, что в исходниках стрелки заменились на спецсимволы? Это отголоски внедрения парсера формул в редактор?
4lex1v
В IDEA в настройках можно включить автозамену. В Scala плагине для Emacs'a тоже такая штука имеется