Попробую рассказать о своем опыте использования на примере простенькой задачи.
Недавно появилась задача извлечь описания продуктов с большого сайта (2.5 миллиона продуктов).
Сайт был выкачан с помощью утилиты wget:
C:>start wget110 --recursive --level 10 -nc --no-clobber --html-extension --exclude-directories=it,fr --convert-links http://site.com
Можно запускать несколько инстансов утилиты, будут качать параллельно. Но все равно долго, неделю где-то.
Изначально я пробовал распарсить с помощью консольной программы Xidel:
REM обход дерева каталогов и запуск утилиты для каждого файла
FOR /R ./ %%G IN (dbfamily*.html) DO xidel "%%G" --quiet --extract-file=mytemplate.txt
Файл mytemplate.txt содержал шаблон:
//td/h1 || " # " || //td[contains(text(),"ЗНАЧ")] || " # " || //td[contains(text(),"ПОЛ")] || " # " || //td[contains(text(),"ПРИСХОЖДЕНИЕ")] || " # " || //td[contains(text(),"ПЕРЕВОД")]
Но уж очень много времени занимал последовательный парсинг. Поэтому было решено сделать это параллельно на нескольких компьютерах. Удивительно, но в итоге это вылилось всего в один класс на java.
Каждая HTML-страничка парсится с помощью XPath-выражений. Документ предварительно чистится с помощью html-cleaner
Код парсинга HTML-странички:
public static ArrayList parseWithXPathList(String path, ArrayList<String> XPathList) {
count++;
String str = "";
ArrayList list = new ArrayList();
try {
String content = readFile(path, StandardCharsets.UTF_8);
TagNode tagNode = new HtmlCleaner().clean(content);
org.w3c.dom.Document doc = new DomSerializer(new CleanerProperties()).createDOM(tagNode);
// And then use the standard JAXP interfaces to query it:
XPath xpath = XPathFactory.newInstance().newXPath();
Iterator<String> it = XPathList.iterator();
while (it.hasNext()) {
String XPath = it.next();
String res = (String) xpath.evaluate(XPath, doc, XPathConstants.STRING);
list.add(res);
}
// System.out.println(str);
} catch (Exception e) {
str = "" + e;
list.add(str);
}
return list;
}
Вызывается так:
ArrayList Xpaths = new ArrayList(Arrays.asList("//title", "//td/h1"));
ArrayList ResultList = parseWithXPathList(param.toString(), Xpaths);
Теперь осталось только обойти дерево каталога с помощью стандартного волкера:
Files.walkFiletree(startingDir,opts,Integer.MAX_VALUE,parseFiles)
И применить к каждому файлу парсер.
if (file.toString().endsWith(".html")) {
ArrayList<String> Xpaths= new
ArrayList<String>(Arrays.asList("//title","//td/h1"));
ArrayList<String>
ResultList=parseWithXPathList(file.toString(),Xpaths);
System.out.format(" %d ) %s %n", count,""+ResultList);
//castOneAsync(ignite, "" + file);
}
Парсинг миллиона файлов на одной машине последовательно занимает ~12 часов
Попробовал распараллелить код с помощью фреймфорка GridGain, а точнее его некоммерческой версии Apache Ignite.
Работает эта штука так: вам необходимо запустить ноды (на одной или нескольких машинах), ноды найдут друг друга по сети и напишут у себя в консолях сколько там процессоров и памяти у вас организовалось в кластер (это ваши slaves). Я запустил 12 нод на 3 машинах (каждая по 4 ядра и 16 GB RAM).
После выкачки сайта (~500GB) папочка с html-ками расшаривается для доступа по сетке. Расшаренная папочка должна быть видна всем нодам (проверьте права доступа!)
Далее, вы должны написать простенькое java app в котором тоже должны стартовать master-ноду:
Ignite ignite = Ignition.start("D:\\grid\\1.4\\apache-ignite-fabric-1.4.0-bin\\config\\default-config.xml");
После этого вы сможете у нее спрашивать состояние кластера и кастовать job’ы на произвольные ноды.
Деление на master и slave тут условное. Ноды равноправны. Я называю мастером ноду на которой будет выполняться начальный код. Вообще-то, можно сделать сегментацию кластера по типам нод, но нам это сейчас ни к чему.
Код кастующий job с парсингом на ноду:
public static void castOneAsync(Ignite ignite, String param) {
// Enable asynchronous mode.
IgniteCluster cluster = ignite.cluster();
IgniteCompute asyncCompute = ignite.compute(cluster.forRemotes()).withAsync();
// Asynchronously execute a job.
asyncCompute.call(() -> {
System.out.println("processing: " + param);
ArrayList<String> Xpaths = new ArrayList<String>(Arrays.asList("//title", "//td/h1"));
ArrayList<String> ResultList = parseWithXPathList(param.toString(), Xpaths);
System.out.format(" %d ) %s \n %n", count, "" + ResultList);
return ""+param+" :" + ResultList;
});
// Get the future for the above invocation.
IgniteFuture<String> fut = asyncCompute.future();
// Asynchronously listen for completion and print out the result.
fut.listen(f -> {
String resultStr = f.get()+" \n";
// System.out.println("Job result: " + resultStr);
count++;
try {
Files.write(Paths.get("d:\\grid\\result.txt"), resultStr.getBytes(), StandardOpenOption.APPEND);
} catch (IOException e) {
System.out.println("" + e);
}
if (count%100==0) System.out.println( "processed: "+count );
});
}
Важные моменты:
- Нода представляет из себя папку с java программой и конфигурацией в виде default-config.xml файла. Стартует из ignite.bat.
Папку с нодой копируете на произвольные машины в локальной сети. Можно запускать несколько инстансов ноды на одной машине.
Скачать ноду (Они ее называют fabric'ой)
- У всех нод должен быть один и тот же конфигурационный файл.
- Ignite поддерживает концепцию peer class loading (Zero Deployment). Это означает, что вам не надо переписывать ваш проект на все ноды. Он сам задеплоится. Это очень крутая фича. Экономит кучу времени.
Но надо включать эту фичу в конфигурационном файле. Пишут, что с выключенной фичей работает быстрее. Не проверял.
- Вы можете запустить несколько нод на одной машине и эмулировать работу с кластером.
- Да, в кластер можно добавлять и удалять ноды в процессе работы апп.
- В конфигурационном файле надо указать IP-адреса машин в кластере
- Нужна java 8, так как используются lambda
- Если остановить master node, то таски, к-е она кастанула, на других машинах умрут.
- Вы можете спрашивать у фреймворка данные о любой машине кластера: загрузка CPU, свободная память, количество job'ов, но я доверился визарду решающему на какую машину лучше кинуть job
В итоге, мне удалось уместить весь проект в один java class ~200 строк кода с комментариями. Классу нужны jar files с htmlcleaner и apache ignite.
Можно вместо html cleaner воспользоваться внешней утилитой Xidel. Она поддерживает XQuery и XPath.
Tогда надо ее прописать на всех машинах с нодами в системную переменную PATH а затем вызываьт прямо из java. Зато будете наслаждаться XQuery.
Если публикация вызовет интерес, то напишу еще про распределенный кэш, очереди и другие distributed штучки на этом фреймворке
Исходный код проекта для Eclipse
—
package gridE;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.Ignition;
import org.apache.ignite.lang.IgniteFuture;
import org.htmlcleaner.CleanerProperties;
import org.htmlcleaner.DomSerializer;
import org.htmlcleaner.HtmlCleaner;
import org.htmlcleaner.TagNode;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathFactory;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Scanner;
import static java.nio.file.FileVisitResult.CONTINUE;
/**
* Created by Veaceslav Kunitki on 11/13/2015.
* This class parse files on cluster with "Apache Ignite" framework
*/
import static java.nio.file.FileVisitResult.*;
public class ParseFilesOnCluster extends SimpleFileVisitor<Path> {
Ignite ignite;
public static long count = 0; // counter of parsed files
// Java standart FileTree walker
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attr) {
if (attr.isSymbolicLink()) {
System.out.format("Symbolic link: %s ", file);
} else if (attr.isRegularFile()) {
// System.out.format("Regular file: %s ", file);
if (file.toString().endsWith(".html") ) {
//if (file.toString().endsWith(".html") ) { // uncomment it for serial processing
//ArrayList<String> Xpaths = new ArrayList<String>(Arrays.asList("//title", "//td/h1"));
// ArrayList<String>
// ResultList=parseWithXPathList(file.toString(),Xpaths);
// System.out.format(" %d ) %s %n", count,""+ResultList);
castOneAsync(ignite, "" + file); // parallel processing
}
} else {
System.out.format("Other: %s ", file);
}
return CONTINUE;
}
// Print each directory visited.
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
System.out.format("Directory: %s%n", dir);
return CONTINUE;
}
@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) {
System.err.println(exc);
return CONTINUE;
}
static String readFile(String path, Charset encoding) throws IOException {
byte[] encoded = Files.readAllBytes(Paths.get(path));
return new String(encoded, encoding);
}
public static ArrayList parseWithXPathList(String path, ArrayList<String> XPathList) {
count++;
String str = "";
ArrayList list = new ArrayList();
try {
String content = readFile(path, StandardCharsets.UTF_8);
TagNode tagNode = new HtmlCleaner().clean(content);
org.w3c.dom.Document doc = new DomSerializer(new CleanerProperties()).createDOM(tagNode);
// And then use the standard JAXP interfaces to query it:
XPath xpath = XPathFactory.newInstance().newXPath();
// String str = (String) xpath.evaluate("//div//td[contains(@id,
// 'foo')]/text()",
Iterator<String> it = XPathList.iterator();
while (it.hasNext()) {
String XPath = it.next();
String res = (String) xpath.evaluate(XPath, doc, XPathConstants.STRING);
list.add(res);
}
// System.out.println(str);
} catch (Exception e) {
str = "" + e;
list.add(str);
}
return list;
}
/*
* Asynchronously execute a job on external PC
*/
public static void castOneAsync(Ignite ignite, String param) {
// Enable asynchronous mode.
IgniteCluster cluster = ignite.cluster();
// IgniteCompute compute1 = ignite.compute(cluster.forRemotes());
IgniteCompute asyncCompute = ignite.compute(cluster.forRemotes()).withAsync();
// Asynchronously execute a job.
asyncCompute.call(() -> {
// Print hello world on some cluster node and wait for completion.
System.out.println("processing: " + param);
ArrayList<String> Xpaths = new ArrayList<String>(Arrays.asList("//title", "//li/@data-zoom"));
ArrayList<String> ResultList = parseWithXPathList(param.toString(), Xpaths);
System.out.format(" %d ) %s \n %n", count, "" + ResultList);
String text = new Scanner(new File(param.toString()), "UTF-8").useDelimiter("\\A").next();
return "{ 'url':" + param + " ,'ResultList'=" + ResultList + " }";
});
// Get the future for the above invocation.
IgniteFuture<String> fut = asyncCompute.future();
// Asynchronously listen for completion and print out the result.
fut.listen(f -> {
String resultStr = f.get() + " \n";
// System.out.println("Job result: " + resultStr);
count++;
try {
Files.write(Paths.get("d:\\grid\\result.txt"), resultStr.getBytes(), StandardOpenOption.APPEND ); //Warning! File must be exist, do it manual!
} catch (IOException e) {
System.out.println("" + e);
}
if (count % 100 == 0)
System.out.println("processed: " + count);
});
}
public static void main(String[] args) throws Exception {
System.out.println("# Distributed parser!");
Ignite ignite = Ignition.start("D:\\grid\\1.4\\apache-ignite-fabric-1.4.0-bin\\config\\default-config.xml");
IgniteCluster cluster = ignite.cluster();
// Compute instance over remote nodes.
IgniteCompute compute4remote = ignite.compute(cluster.forRemotes());
// Print hello message on all remote nodes.
compute4remote.broadcast(
() -> System.out.println("---===Distributed parser started===---: " + cluster.localNode().id()));
System.out.println( "Cluster ready!" );
if (true) { // start parsing job
// final Path startingDir = Paths.get("d:/home/familytree.ru/");
Path startingDir = Paths.get("\\\\SERGIU-PC\\temp"); // shared directory with HTML-files
EnumSet<FileVisitOption> opts = EnumSet.of(FileVisitOption.FOLLOW_LINKS);
ParseFiles parseFiles = new ParseFiles();
parseFiles.ignite = ignite;
// log time to file
PrintWriter writer = new PrintWriter("d:\\grid\\start.txt", "UTF-8");
String dateTime = "" + (new Date());
writer.println(dateTime + "\n");
System.out.println(dateTime + "\n");
writer.close();
System.out.println("# walking...!");
Files.walkFileTree(startingDir, opts, Integer.MAX_VALUE, parseFiles);
// log end time
dateTime = "" + (new Date());
Files.write(Paths.get("d:\\grid\\start.txt"), dateTime.getBytes(), StandardOpenOption.APPEND);
}
}
}
POM- file с зависимостями проекта
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>gridE</groupId>
<artifactId>gridE</artifactId>
<version>0.0.1-SNAPSHOT</version>
<repositories>
<repository>
<id>GridGain External Repository</id>
<url>http://www.gridgainsystems.com/nexus/content/repositories/external</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-examples</artifactId>
<version>1.0.0-RC1</version>
</dependency>
<dependency>
<groupId>net.sourceforge.htmlcleaner</groupId>
<artifactId>htmlcleaner</artifactId>
<version>2.15</version>
</dependency>
</dependencies>
</project>
Конфигурационный файл для ноды
<?xml version="1.0" encoding="UTF-8"?>
<!--
_________ _____ __________________ _____
__ ____/___________(_)______ /__ ____/______ ____(_)_______
_ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
\____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
Copyright (C) GridGain Systems. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Ignite Spring configuration file.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-2.0.xsd">
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable grid-aware class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>
<property name="marshaller">
<bean class="org.apache.ignite.marshaller.optimized.OptimizedMarshaller">
<!-- Set to false to allow non-serializable objects in examples, default is true. -->
<property name="requireSerializable" value="false"/>
</bean>
</property>
<!-- Enable events for examples. -->
<property name="includeEventTypes">
<util:constant static-field="org.apache.ignite.events.EventType.EVTS_ALL"/>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!-- Uncomment multicast IP finder to enable multicast-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>127.0.0.1:47500..47509</value>
<value>192.168.4.110:47500..47509</value>
<value>192.168.4.117:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="cacheName"/>
<!-- Set cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
</bean>
</property>
</bean>
</beans>
Комментарии (14)
ksm
23.11.2015 21:45+1Маленькие поправки внесу для pom.xml: секция repositories не нужна, зависимости вытягиваются из апачевского репозитория; для ignite-spring и ignite-examples м.б. версия 1.4.0 (не уверен реально нужна ли зависимость на examples)?
bitec
25.11.2015 14:08Спасибо за статью, главное преимущество GG — их специализированный classloading through network, в противном случае деплоймент был бы геморойным.
Только вот что такое «кастовать»? Ужасное слово, режет слух, во всем программном мире «cast» понимается как «приводить тип» все-таки, а не что-то иноеtumikosha
25.11.2015 20:12cast spell- бросить заклинание
в смысле «выполнить код на другой машине»
да, жаргонизм канечно же
tumikosha
25.11.2015 22:26Кстати, по умолчанию у них classloading выключен. Надо включать. Свое главное преймущество скрывают. Поди узнай о нем, вот и написал статью чтобы восполнить этот пробел
sergeypid
Как Ignite соотносится с Akka? Похоже, что Ignite < Akka, но вопрос рекомендаций — в каких случаях применять Ignite?
tumikosha
Я не знаю Akka, но читал сравнения
Разница в концепте:
Akka — distributed actors
Ignite — distributed closures
Akka и Erlang для реалтайма. Они быстро гоняют сообщения между нодами.
Т.е. используются в Massively Concurrent Application.
Ignite гоняет код между нодами, т.е. это для MapReduce и BigData
В Ignite есть Zero Deployment. (нет необходимости деплоить код на все ноды, сам расползается)
В Erlang нет zero deploymenta. (Но можно сделать через метапрограмминг)
Не знаю есть ли он в Akka. Думаю что нет.
В Ignite есть много другой функциональности: distributed caching, service grid, streaming, etc.
Затрудняюсь ответить есть ли она в Akke.
kefirr
Ignite и Akka слишком разные продукты, чтобы знаки неравенства между ними ставить. Достаточно взглянуть на API.
Ignite — это распределенная SQL+noSQL БД в памяти + map/reduce и прочие виды кластерных вычислений.
Akka решает несколько другие задачи, насколько я понимаю.
bitec
Тысячу лет назад GridGain позиционировался как IMDG — грубо говоря распределенная мапа, «все данные в памяти». Прошло много лет, они уже ушли от этого термина, но в-целом главный юзкейс остается прежним — данные, распределенные по узлам в виде ключ-значение (лучшего пока не придумали), которые можно обрабатывать локально на узле. Если бы автор не использовал расшаренный диск, то правильным сценарием было бы — распределить данные по узлам, запустить код, обрабатывающий данные, а локальность (affinity) была бы достигнута автоматически
П.с. не туда ответил, веткой выше хотел