Представляю вашему вниманию перевод статьи Tailing the MongoDB Replica Set Oplog with Scala and Akka Streams.
В этой статье я попробую объяснить, как следить за обновлениями в MongoDB Oplog при помощи Scala драйвера MongoDB и Akka Streams.
Примеры, приведенные в данной статье не следует рассматривать и использовать в продакшн среде.
Каждый из нас знает Unix команду
Если вы хотите узнать больше об
Проект созданный в данной статье удобно расположился на Github.
Пример файла build.sbt:
Также вам понадобится MongoDB Replica Set, могу порекомендовать официальный образ mongo docker.
Предполагая, что у вас уже есть установленное соединения, давай те определим запрос. Ниже приведен пример:
Как видно из запроса, мы определяем курсор типа
Полня документация на английском языке доступна тут.
В Akka Stream схема обработки потока данных определяется следующими абстракциями:
Source — Этап обработки данных только с одной точкой выхода, предоставляющий элементы данных, как только нижележащие элементы обработки данных готовы к приему.
Sink — Этап обработки данных только с одной точкой входа, запрашивающий и принимающий элементы данных с возможностью замедления поступления данных с вышележащего элемента.
Flow — Этап обработки данных только с одной точкой входа и выхода, которая соединяет поток данных и трансформирует элементы проходящие через него.
RunnableGraph — Это
В нашем случае мы будем использовать только
К сожалению нету возможности по умолчанию для интеграции Akka Streams и MongoDB драйвера, но у Akka Streams есть возможность интеграции с Reactive Streams, так же как и у недавно опубликованного, нового, официального, асинхронного MongoDB Scala драйвера. Новый драйвер использует модель
Определение
На данный момент у нас имеется объект
В этом нам поможет магия неявного преобразования. Тип
который преобразовывает тип Reactive Streams
Это было давольно легко, не так ли?
Все что вы можете себе представить, я пока просто буду выводить все данные в
или же можно использовать шорткат:
это выведет все CRUD операции из MongoDB Replica Set с начала
Вы можете задать более специфический запрос, определить базы данных и коллекции из которых вы хотите получать обновления, так же вы можете определить временные рамки для поступающих документов. Это я оставлю на вас.
Возможно вы задумались, зачем нам нужно преобразовывать
Дело в том что модель
Так что если вы заинтересованы только в переносе данных из
Как видно из статьи, слежение за MongoDB Oplog очень простая задача, особенно в Replica Set. Могут возникнуть другие подводные камни, если это будет MongoDB Sharded Cluster, это будет покрыто в следующей статье.
Конечно, этот пост не охватывает все аспекты этой темы, на пример, обработка ошибок, гарантии доставки и т.д. Это может быть реализовано различными способами и не является темой данной статьи.
Введение
В этой статье я попробую объяснить, как следить за обновлениями в MongoDB Oplog при помощи Scala драйвера MongoDB и Akka Streams.
Примеры, приведенные в данной статье не следует рассматривать и использовать в продакшн среде.
Каждый из нас знает Unix команду
tail -f
, Tailable Cursor
имеет тот же концепт. MongoDB предоставляет возможность использовать эту функцию по умолчанию и не требует дополнительных библиотек и инструментов. Что касается Oplog
— это такая же коллекция, как и все остальные и ничего нового не требуется.Если вы хотите узнать больше об
Oplog
и Tailable Cursor
, то вы можете найти больше информации в документации MongoDB: Проект созданный в данной статье удобно расположился на Github.
Библиотеки и инструменты
Пример файла build.sbt:
name := "MongoDB Replica Set oplog tailing with Akks Streams"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"ch.qos.logback" % "logback-classic" % "1.1.5",
"org.mongodb.scala" %% "mongo-scala-driver" % "1.1.0",
"com.typesafe.akka" %% "akka-slf4j" % "2.4.2",
"com.typesafe.akka" %% "akka-stream" % "2.4.2"
)
Также вам понадобится MongoDB Replica Set, могу порекомендовать официальный образ mongo docker.
Запрос для слежения за MongoDB Oplog
Предполагая, что у вас уже есть установленное соединения, давай те определим запрос. Ниже приведен пример:
val collection: MongoCollection[Document] = _
val observable = collection.find(in("op", "i", "d", "u"))
.cursorType(CursorType.TailableAwait)
.noCursorTimeout(true)
Как видно из запроса, мы определяем курсор типа
tailable
без timeout
, так же поле op
, определяющее тип операции в Oplog
, должно быть CRUD операцией i/d/u
.Немного о терминологии Akka Streams
Полня документация на английском языке доступна тут.
В Akka Stream схема обработки потока данных определяется следующими абстракциями:
Source — Этап обработки данных только с одной точкой выхода, предоставляющий элементы данных, как только нижележащие элементы обработки данных готовы к приему.
Sink — Этап обработки данных только с одной точкой входа, запрашивающий и принимающий элементы данных с возможностью замедления поступления данных с вышележащего элемента.
Flow — Этап обработки данных только с одной точкой входа и выхода, которая соединяет поток данных и трансформирует элементы проходящие через него.
RunnableGraph — Это
Flow
который соединен с Source
и Sink
и готов к выполнению команды run()
.В нашем случае мы будем использовать только
Source
и Sink
, так как мы будем только следить за обновлениями без изменения поступающих данных.MongoDB Scala драйвер и Akka Stream
К сожалению нету возможности по умолчанию для интеграции Akka Streams и MongoDB драйвера, но у Akka Streams есть возможность интеграции с Reactive Streams, так же как и у недавно опубликованного, нового, официального, асинхронного MongoDB Scala драйвера. Новый драйвер использует модель
Observable
, которая может быть преобразована в Reactive Streams Publisher
всего лишь в несколько строк кода, также команда MongoDB уже привела пример преобразования на базе implicit
, который мы будем использовать, как точку соприкосновения между этими двумя библиотеками.Объявление потока Source
Определение
Source
очень легкое. Из Oplog
мы будем получать объекты Document
, это и будет типом потока Source
.val source: Source[Document, NotUsed] = _
На данный момент у нас имеется объект
FindObservable[Document]
из MongoDB Oplog
запроса и тип ресурса Source[Document, NotUsed]
, так как же нам преобразовать одно в другое?В этом нам поможет магия неявного преобразования. Тип
Source
содержит метод:def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed]
который преобразовывает тип Reactive Streams
Publisher
в Source
, также у нас есть неявное преобразование из MongoDB Observable
в Publisher
. Теперь мы можем связать все части:import rxStreams.Implicits._
val collection: MongoCollection[Document] = _
val observable = collection.find(in("op", "i", "d", "u"))
.cursorType(CursorType.TailableAwait)
.noCursorTimeout(true)
val source: Source[Document, NotUsed] = Source.fromPublisher(observable)
Это было давольно легко, не так ли?
Что делать дальше?
Все что вы можете себе представить, я пока просто буду выводить все данные в
STDOUT
:source.runWith(Sink.foreach(println))
или же можно использовать шорткат:
source.runForeach(println)
это выведет все CRUD операции из MongoDB Replica Set с начала
Oplog
коллекции до конца и будет следить за новыми поступлениями.Вы можете задать более специфический запрос, определить базы данных и коллекции из которых вы хотите получать обновления, так же вы можете определить временные рамки для поступающих документов. Это я оставлю на вас.
Зачем нам это?
Возможно вы задумались, зачем нам нужно преобразовывать
Observable
в Publisher
, а потом еще и в Source
, в то время как мы могли бы просто использовать Reactive Streams Publisher
или же Observable
.Дело в том что модель
Observables
и Reactive Streams API предоставляют общие механизмы переноса данных в асинхронных рамках без потерь, когда как Akka Streams API фокусируется на трансформации потока данных.Так что если вы заинтересованы только в переносе данных из
Oplog
куда либо, вы можете придерживаться модели Observables
предоставляемой MongoDB драйвером, но если вам нужно трансформировать поток данных, то выбор падает на Akka Stream.Заключение
Как видно из статьи, слежение за MongoDB Oplog очень простая задача, особенно в Replica Set. Могут возникнуть другие подводные камни, если это будет MongoDB Sharded Cluster, это будет покрыто в следующей статье.
Конечно, этот пост не охватывает все аспекты этой темы, на пример, обработка ошибок, гарантии доставки и т.д. Это может быть реализовано различными способами и не является темой данной статьи.