Прослышал я про то, что существует 3000 наиболее часто используемых слов, подобранных на OxfordDictionary сайте. Вот тут этот список слов: www.oxfordlearnersdictionaries.com/wordlist/english/oxford3000/Oxford3000_A-B Ну а перевод на русский я решил брать отсюда: www.translate.ru/dictionary/en-ru Одна только проблема, все находиться на этих сайтах ну совсем не в том формате, который можно распечатать и учить. В итоге родилась идея это все запрограммировать. Но сделать это не как последовательный алгоритм, а все распаралелить. Что бы выкачивание и парсинг всех слов занял не (3000 слов * 2 сайта) / 60 секунд = 100 минут. Это если давать по 1 секунде на выкачивание и распарсивание страницы для извлечения перевода и транскрипции (в реальности думаю это в 3 раза дольше, пока соединение откроем, пока закроем и тд и тп).
Задачу я разбил сразу на два крупных блока. Первый блок, это операции ввода/вывода блокирующие — выкачивание страницы с сайта. Второй блок, это операции вычислительные, не блокирующие, но нагружающие CPU: парсинг страницы для извлечения перевода и транскрипции и добавление в словарь результатов парсинга.
Блокирующие операции я решил делать в пуле нитей, используя Future от Scala. Вычислительные задачи, решил раскидать на 3 актера Akka. Применяя методику TDD, cначала я написал тест к своим кирпичикам будущего приложения.
class Test extends FlatSpec with Matchers {
"Table Of Content extractor" should "download and extract content from Oxford Site" in {
val content:List[String] = OxfordSite.getTableOfContent
content.size should be (10)
content.find(_ == "A-B") should be (Some("A-B"))
content.find(_ == "U-Z") should be (Some("U-Z"))
}
"Words list extractor" should "download words from page" in {
val future: Future[Try[Option[List[String]]]] = OxfordSite.getWordsFromPage("A-B", 1)
val wordsTry:Try[Option[List[String]]] = Await.result(future,60 seconds)
wordsTry should be a 'success
val words = wordsTry.get
words.get.find(_ == "abandon") should be (Some("abandon"))
}
"Words list extractor" should "return None from empty page" in {
val future: Future[Try[Option[List[String]]]] = OxfordSite.getWordsFromPage("A-B", 999)
val wordsTry:Try[Option[List[String]]] = Await.result(future,60 seconds)
wordsTry should be a 'success
val words = wordsTry.get
words should be(None)
}
"Russian Translation" should "download translation and parse" in {
val page: Future[Try[String]] = LingvoSite.getPage("test")
val pageResultTry: Try[String]= Await.result(page,60 seconds)
pageResultTry should be a 'success
val pageResult = pageResultTry.get
pageResult.contains("тест") should be(true)
LingvoSite.parseTranslation(pageResult).get should be("тест")
}
"English Translation" should "download translation and parse" in {
val page: Future[Try[String]] = OxfordSite.getPage("test")
val pageResultTry: Try[String] = Await.result(page,60 seconds)
pageResultTry should be a 'success
val pageResult = pageResultTry.get
pageResult.contains("examination") should be(true)
OxfordSite.parseTranslation(pageResult).get should be(("test", "an examination of somebody’s knowledge or ability, consisting of questions for them to answer or activities for them to perform"))
}
}
Обратите внимание. Функции, которые могут вернуть результат вычислений имеют Try[…]. Те либо Success результат или Failure и эксепшен. Функции, которые будут часто вызывать и имеют блокирующие i/o операции имеют результат, как Future[Try[…]]. Те при вызове функции сразу возвращается Future в которой идут долгие i/o операции. Притом они идут внутри Try и могут завершится с ошибок (например соединение порвалось).
Само приложение инициализируется в Top3000WordsApp.scala. Поднимается система актеров. Создаются актеры. Запускается парсинг списка слов, который в параллель запускает выкачивание английских и русских страниц с транскрипцией и переводом. В случае успешной скачки страниц, срабатывает передача содержимое страниц актерам для парсинга, извлекающих перевод и транскрипцию. Результат перевода актеры передают конечному актеру-словарю, который акамулирует все результаты в одном месте. И по нажатию enter, система актеров идет в shutdown. И актер DictionaryActor, получая сигнал об этом, сохраняет собраный словарь в файл dictionaty.txt
object Top3000WordsApp extends App {
val system = ActorSystem("Top3000Words")
val dictionatyActor = system.actorOf(Props[DictionaryActor], "dictionatyActor")
val englishTranslationActor = system.actorOf(Props(classOf[EnglishTranslationActor], dictionatyActor), "englishTranslationActor")
val russianTranslationActor = system.actorOf(Props(classOf[RussianTranslationActor], dictionatyActor), "russianTranslationActor")
val mapGetPageThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))
val mapGetWordsThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))
start()
scala.io.StdIn.readLine()
system.terminate()
def start() = {
import concurrent.ExecutionContext.Implicits.global
Future {
OxfordSite.getTableOfContent.par.foreach(letterGroup => {
getWords(letterGroup, 1)
})
}
}
def getWords(letterGroup: String, pageNum: Int): Unit = {
implicit val executor = mapGetWordsThreadExecutionContext
OxfordSite.getWordsFromPage(letterGroup, pageNum).map(tryWords => {
tryWords match {
case Success(Some(words)) => words.par.foreach(word => {
parse(word,letterGroup,pageNum)
})
case Success(None) => Unit
case Failure(ex) => println(ex.getMessage)
}
})
}
def parse(word: String, letterGroup: String, pageNum: Int)= {
implicit val executor = mapGetPageThreadExecutionContext
OxfordSite.getPage(word).map(tryEnglishPage => {
tryEnglishPage match {
case Success(englishPage) => {
englishTranslationActor ! (word, englishPage)
getWords(letterGroup, pageNum + 1)
}
case Failure(ex) => println(ex.getMessage)
}
})
LingvoSite.getPage(word).map(_ match {
case Success(russianPage) => {
russianTranslationActor !(word, russianPage)
}
case Failure(ex) => println(ex.getMessage)
})
}
}
Обратите внимание, что алгоритм разбит на start, getWords, parse функции. Это сделано из-за того, что для каждой фазы задачи требуется свой пул нитей, который передается неявно, как ThreadExecutionContext. Сначала, у меня была всего одна функция getWords, для рекурсивного вызова. Но все работало очень медленно, так как на верхнем уровне алгоритма распаралеливание выжирало весь пул нитей и в самом низу были вечные ожидания, когда же мне дадут свободную нить, что бы поработать. А как раз в низу самое большое число операций.
Вот реализация скачивания и парсинга с сайтов.
object OxfordSite {
val getPageThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))
def parseTranslation(content: String): Try[(String, String)] = {
Try {
val browser = new Browser
val doc = browser.parseString(content)
val spanElement: Element = doc >> element(".phon")
val str = Jsoup.parse(spanElement.toString).text()
val transcription = str.stripPrefix("BrE//").stripSuffix("//").trim
val translation = doc >> text(".def")
(transcription,translation)
}
}
def getPage(word: String): Future[Try[String]] = {
implicit val executor = getPageThreadExecutionContext
Future {
Try {
val html = Source.fromURL("http://www.oxfordlearnersdictionaries.com/definition/english/" + (word.replace(' ','-')) + "_1")
html.mkString
}
}
}
def getWordsFromPage(letterGroup: String, pageNum: Int): Future[Try[Option[List[String]]]] = {
import ExecutionContext.Implicits.global
Future {
Try {
val html = Source.fromURL("http://www.oxfordlearnersdictionaries.com" +
"/wordlist/english/oxford3000/Oxford3000_" + letterGroup + "/?page=" + pageNum)
val page = html.mkString
val browser = new Browser
val doc = browser.parseString(page)
val ulElement: Element = doc >> element(".wordlist-oxford3000")
val liElements: List[Element] = ulElement >> elementList("li")
if (liElements.size > 0) Some(liElements.map(_ >> text("a")))
else None
}
}
}
def getTableOfContent: List[String] = {
val html = Source.fromURL("http://www.oxfordlearnersdictionaries.com/wordlist/english/oxford3000/Oxford3000_A-B/")
val page = html.mkString
val browser = new Browser
val doc = browser.parseString(page)
val ulElement: Element = doc >> element(".hide_phone")
val liElements: List[Element] = ulElement >> elementList("li")
List(liElements.head >> text("span")) ++ liElements.tail.map(_ >> text("a"))
}
}
object LingvoSite {
val getPageThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))
def parseTranslation(content: String): Try[String] = {
Try {
val browser = new Browser
val doc = browser.parseString(content)
val spanElement: Element = doc >> element(".r_rs")
spanElement >> text("a")
}
}
def getPage(word: String): Future[Try[String]] = {
implicit val executor = getPageThreadExecutionContext
Future {
Try {
val html = Source.fromURL("http://www.translate.ru/dictionary/en-ru/" + java.net.URLEncoder.encode(word,"UTF-8"))
html.mkString
}
}
}
}
Структуры данных с которыми работают актеры.
case class Word (word: String, transcription: Option[String] = None, russianTranslation:Option[String] = None, englishTranslation: Option[String] = None)
case class RussianTranslation(word:String, translation: String)
case class EnglishTranslation(word:String, translation: String)
case class Transcription(word:String, transcription: String)
Актеры, которые принимают на входе скачанные страницы для парсинга и пересылают перевод и транскрипцию актеру DictionaryActor
class EnglishTranslationActor (dictionaryActor: ActorRef) extends Actor {
println("EnglishTranslationActor")
def receive = {
case (word: String, englishPage: String) => {
OxfordSite.parseTranslation(englishPage) match {
case Success((transcription, translation)) => {
dictionaryActor ! EnglishTranslation(word,translation)
dictionaryActor ! Transcription(word,transcription)
}
case Failure(ex) => {
println(ex.getMessage)
}
}
}
}
}
class RussianTranslationActor (dictionaryActor: ActorRef) extends Actor {
println("RussianTranslationActor")
def receive = {
case (word: String, russianPage: String) => {
LingvoSite.parseTranslation(russianPage) match {
case Success(translation) => {
dictionaryActor ! RussianTranslation(word, translation)
}
case Failure(ex) => {
println(ex.getMessage)
}
}
}
}
}
Актер который накапливает в себе словарь с переводами и транскрипцией и после shutdown системы актеров записывает весь словарь в dictionary.txt
class DictionaryActor extends Actor {
println("DictionaryActor")
override def postStop(): Unit = {
println("DictionaryActor postStop")
val fileText = DictionaryActor.words.map{case (_, someWord)=> {
val transcription = someWord.transcription.getOrElse(" ")
val russianTranslation = someWord.russianTranslation.getOrElse(" ")
val englishTranslation = someWord.englishTranslation.getOrElse(" ")
List(someWord.word, transcription , russianTranslation , englishTranslation).mkString("|")
}}.mkString("\n")
scala.tools.nsc.io.File("dictionary.txt").writeAll(fileText)
println("dictionary.txt saved")
System.exit(0)
}
def receive = {
case Transcription(wordName, transcription) => {
val newElement = DictionaryActor.words.get(wordName) match {
case Some(word) => word.copy(transcription = Some(transcription))
case None => Word(wordName,transcription = Some(transcription))
}
DictionaryActor.words += wordName -> newElement
println(newElement)
}
case RussianTranslation(wordName, translation) => {
val newElement = DictionaryActor.words.get(wordName) match {
case Some(word) => word.copy(russianTranslation = Some(translation))
case None => Word(wordName,russianTranslation = Some(translation))
}
DictionaryActor.words += wordName -> newElement
println(newElement)
}
case EnglishTranslation(wordName, translation) => {
val newElement = DictionaryActor.words.get(wordName) match {
case Some(word) => word.copy(englishTranslation = Some(translation))
case None => Word(wordName,englishTranslation = Some(translation))
}
DictionaryActor.words += wordName -> newElement
println(newElement)
}
}
}
object DictionaryActor {
var words = scala.collection.mutable.Map[String, Word]()
}
Какие выводы? На моем Mac Book Pro этот скрипт работал в течение примерно 1 часа, пока я писал эту статью. Я его прервал нажав enter и вот какой результат:
bash-3.2$ cat ./dictionary.txt |wc -l
1809
Потом, я еще раз запустил скрипт и оставил на несколько часов. Когда вернулся, то у меня был процессор загружен 100% и были ошибки в консоле про гарбаж коллектор, по нажатию на enter моя программа не смогла сохранить результат своей работы в файл. Диагноз такой, писать на Future и par.map или par.foreach, конечно красиво и удобно, но реально очень тяжело понять как на самом деле на уровне нитей это все работает и где же узкое горлышко бутылки. В итоге я планирую все переписать на актеры. Притом, буду использовать пулы актеров. Что бы, например, 4 актера выкачивало и парсило страницы со списками слов, 18 актеров выкачивало страницы с переводами, 4 актера парсило страницы извлекая переводы и транскрипцию, ну и 1 актер складывал все в словарь.
Текущая реализация в бранче v0.1 github.com/evgenyigumnov/top3000words/tree/v0.1 Версия где все переписано на актеры с пулами будет в бранче v0.2, ну и в master, чуть позже. Может есть у кого соображения, что я делал не так, в текущей версии? Ну и может советы подкините по новой версии?
Проект на гитхабе доступен: github.com/evgenyigumnov/top3000words
Запустить тесты проекта: sbt test
Запустить приложение: sbt run
Ну и как надоест ждать, нажать enter и ознакомиться с содержимым файла dictionary.txt в текущей папке
PS
В итоге сделал финальную версию v0.2, которая парсит за 10 минут в 30 потоков. github.com/evgenyigumnov/top3000words/tree/v0.2
По окончанию работы enter нажимать не нужно. Все сделано на актерах. В Future обвернуты только блокирующие i/o тяжелые.
Комментарии (26)
elmortem
18.12.2015 16:50+2Надо было руками всё переводить, выписывая на листочек. Как раз запомнил бы.
eld0727
18.12.2015 17:40+1Просто оставлю это здесь:
https://www.coursera.org/course/reactive
https://github.com/alexandru/scala-best-practices/blob/master/sections/5-actors.md
IrinaInina
19.12.2015 19:30Листочек бумаги это здорово, конечно… Не хотите посмотреть этот сайт: lingvist.io/ru? Учить слова там очень удобно.
tangro
19.12.2015 22:29+1Всегда удивляла эта картинка со щенками. А что не так справа? Посмотрите, каждый щенок по-прежнему кушает из одной тарелки. Ну ок, с четвёртым и шестым слева не очень понятно, одна ли у них тарелка или две, но пока не доказано обратное — можно, например, считать что у каждого своя.
Bringoff
19.12.2015 22:43Слева каждый кушает со своей тарелки и лишних тарелок нет. Справа же я вижу как минимум 2 неиспользуемых тарелки.
tangro
19.12.2015 23:17Там и собак меньше. В общем правая картинка либо настолько же эффективна, как левая, либо уступает на одну «собако-тарелку». Что на 10 псах составляет 10% — странно такое падение производительности показывать как катастрофу реализации многопоточности.
abatyuk
21.12.2015 07:54`Future[Try[Something]]` — не самая лучшая идея, ведь результат выполнения Future сам по себе будет Try, например:
object Runner { def main(args: Array[String]) { import scala.concurrent.ExecutionContext.Implicits.global val future = Future[String] { throw new Exception("This is an exception of execution!") "Hello, i will never be returned!" } val awaitable = future.onComplete { case Success(s) => println(s) case Failure(e) => println("Exception with message: " + e.getMessage) } /* * Waiting for Future completion without attempt to get result, * as completion of future is already handled above */ Await.ready(future, 1.second) } }
abatyuk
21.12.2015 07:58За такие вещи надо бить по рукам долго и больно, уж простите:
object DictionaryActor { var words = scala.collection.mutable.Map[String, Word]() }
Состояние актора должно храниться в актере! Как показала практика, если не хранить состояние или поведение в акторе, то акторы не нужны — проще решать задачи при помощи обычных Scala Futuresigumnov
21.12.2015 08:42Да это я переделаю с использованием context.become вот в таком стиле
import collection.immutable.Set class MyActor extends Actor { def receive = active(Set.empty) def active(isInSet: Set[String]): Receive = { case Add(key) => context become active(isInSet + key) case Contains(key) => sender() ! isInSet(key) } }
igumnov
21.12.2015 09:02Хмм — действительно. Согласен (про Future и Try).
Я вот тут голову сломал над тем, как сделать правильно вторую версию. Я хочу, что бы каскадно прошел сигнал Terminate по всем актерам в этой логике и последний актер получив этот сигнал сохранил в файл результат работы. Но только одна проблема. У меня часть актеров иницирует блокирующие I/O (скачивание страниц) и везде все рекомендуют внутри актера это дело обвернуть в Future и в нем в случае успеха пересылать далее результат работы в следущий актер. Просто у меня сигнал Terminate пролетит по актерам раньше чем все Future отработают. И часть данных будет утеряна. Что делать не знаю. Единственное решение что приходит на ум. Не делать внутри актера Future. И что бы данный актер не стал узким горлышком, сделать пул актеров, где будут эти блокирующие операции. Но опять же не понятно как последнему актеру в цепочке послать сигнал Terminate от пула. Как определить что в пуле это последний терминирующийся актер и только он пошлет Terminate дальше?
abatyuk
21.12.2015 19:18Вечером с работы вернусь, поподробнее напишу на другой комментарий.
По поводу этого:
— Set — достаточно медленная коллекция. Точно ли она нужна? Есть сравнение от авторов по алгоритмической сложности docs.scala-lang.org/overviews/collections/performance-characteristics.html
— Если есть всего две операции (добавление и проверка наличия), и вся коллекция не отдается никогда, нет ничего плохого в использовании либо mutable коллекций, либо переменной состояния (надо быть только аккуратным с опасными операциями и их откладывать в другой эктор)
То есть можно, например, использовать переменную Vector или mutable HashSetigumnov
21.12.2015 20:19Я не про Set говорил — а вот про такой вариант
class DictionaryActor extends Actor { println("DictionaryActor") def receive = active(Map.empty) def active(words: Map[String, Word]): Receive = { case Transcription(wordName, transcription) => { val newElement = words.get(wordName) match { case Some(word) => word.copy(transcription = Some(transcription)) case None => Word(wordName,transcription = Some(transcription)) } context become active(words ++ Map(wordName -> newElement)) println(newElement) } case RussianTranslation(wordName, translation) => { val newElement = words.get(wordName) match { case Some(word) => word.copy(russianTranslation = Some(translation)) case None => Word(wordName,russianTranslation = Some(translation)) } context become active(words ++ Map(wordName -> newElement)) println(newElement) } case EnglishTranslation(wordName, translation) => { val newElement = words.get(wordName) match { case Some(word) => word.copy(englishTranslation = Some(translation)) case None => Word(wordName,englishTranslation = Some(translation)) } context become active(words ++ Map(wordName -> newElement)) println(newElement) } } }
abatyuk
22.12.2015 00:35Вполне рабочий вариант, кроме пары вещей.
Вполне можно сделать инициализацию через конструктор, например — теперь можно состояние передавать извне
class DictionaryActor(words: Map[String, Word]) extends Actor { def receive = active(words) def active(words: Map[String, Word]): Receive = { } }
Далее, зачем создавать новый мэп для каждого имени? Можно сделать проще:
context become active(words + (wordName -> newElement))
igumnov
22.12.2015 10:19Ага — кстати вот v 0.2 github.com/evgenyigumnov/top3000words/tree/v0.2
финальная — сделана по всем правилам и канонам
ну как?
igumnov
21.12.2015 08:55Хмм — действительно. Согласен (про Future и Try).
Я вот тут голову сломал над тем, как сделать правильно вторую версию. Я хочу, что бы каскадно прошел сигнал Terminate по всем актерам в этой логике и последний актер получив этот сигнал сохранил в файл результат работы. Но только одна проблема. У меня часть актеров иницирует блокирующие I/O (скачивание страниц) и везде все рекомендуют внутри актера это дело обвернуть в Future и в нем в случае успеха пересылать далее результат работы в следущий актер. Просто у меня сигнал Terminate пролетит по актерам раньше чем все Future отработают. И часть данных будет утеряна. Что делать не знаю. Единственное решение что приходит на ум. Не делать внутри актера Future. И что бы данный актер не стал узким горлышком, сделать пул актеров, где будут эти блокирующие операции. Но опять же не понятно как последнему актеру в цепочке послать сигнал Terminate от пула. Как определить что в пуле это последний терминирующийся актер и только он пошлет Terminate дальше?PHmaster
23.12.2015 15:13А зачем вам вообще нужен блокирующий I/O? В Akka есть вполне себе годная реализация неблокирующего http-клиента, построенная как раз на акторах. Я ей успешно пользовался, особых недостатков не обнаружил. Посылаете HttpRequest, получаете HttpResponse, можете настроить тайм-ауты, в общем, все довольно гибко и удобно.
igumnov
23.12.2015 15:26lollyrock.com/articles/scala-http-requests это про эту либу?
он возвращает Future — потом если на ней onComplete вызывать там неявный параметр с ExecutionContext те надо опять самому управлять нитями что бы когда результат пришел передать его другому актеру — хотя тут не очень критично так как операция передать актеру сообщение легкая
ну я попробую в v0.3 сделать на этой библиотеке и еще там кое что добавлю — интересно скорость работы выростит скрипта? я вот что то сомневаюсь или зря?PHmaster
23.12.2015 17:301. Я про spray.io, который обещали включить в Akka, когда я пользовался им последний раз где-то год назад или около того. Насколько я понял, до сих пор не включили, обещают включить с релизом akka-streams (намечено на версию 2.0). Но это совершенно не мешает пользоваться Spray сейчас как отдельной библиотекой. Ссылка на простой пример HTTP-запроса из документации
2. В классе актора можно сделать import context.dispather — он и является неявным ExecutionContext. Вручную потоками (нитями) управлять не нужно, либо используется system.dispatcher (извне), либо context.dispatcher изнутри актора, который выступает в роли ExecutionContext и исполняет фьючерсы в своем пуле потоков (нитей).
3. Для передачи результата из Future по готовности другому актору существует паттерн pipe (по той же ссылке):
import akka.pattern.pipe val future: Future[String] = ??? future pipeTo actor
PHmaster
23.12.2015 15:07Я не совсем понял, зачем во Future заворачивать Try, если сам Future внутри себя его уже имеет? Он либо завершается в будущем как Success, либо как Failure. Чтобы работать с успешным результатом дальше, используются map, flatMap, foreach, onSuccess и т.п., для обработки ошибок — recover, recoverWith, onFailure и т.п.
P.S. Странно, вроде обновил страницу перед отправкой коммента, а после отправки увидел, что об этом уже писали.
Bringoff
То, что раньше делалось питоновским скриптом, написанным на коленке, теперь разрабатывается на скале с кучей плагинов и покрывается тестами. Куда идет этот мир…