Периодически у меня появляются задачи обработать большое количество файлов. Обычно это конвертирование из одного формата в другой: XSLT-трансформация, парсинг, конвертация картинок или видео. Для решения этих проблем я приспособил фреймворк GridGain In-Memory Data Fabric. Он дает возможность делать distributed computing, MapReduce, распределенные кэши и очереди, распределенную файловую систему в памяти, перемещение кода к данным, job stealing, ускорители для Hadoop и многие другие модные ныне вещи. И все это легко и под разные операционки. Вы легко можете все это пощупать под виндовс.

Попробую рассказать о своем опыте использования на примере простенькой задачи.

Недавно появилась задача извлечь описания продуктов с большого сайта (2.5 миллиона продуктов).
Сайт был выкачан с помощью утилиты wget:
   C:>start wget110 --recursive  --level 10 -nc --no-clobber    --html-extension   --exclude-directories=it,fr   --convert-links   http://site.com


Можно запускать несколько инстансов утилиты, будут качать параллельно. Но все равно долго, неделю где-то.

Изначально я пробовал распарсить с помощью консольной программы Xidel:
REM обход дерева каталогов и запуск утилиты для каждого файла
FOR /R ./  %%G IN (dbfamily*.html) DO xidel  "%%G"  --quiet    --extract-file=mytemplate.txt 

Файл mytemplate.txt содержал шаблон:
//td/h1  || " # " ||  //td[contains(text(),"ЗНАЧ")] || " # " ||  //td[contains(text(),"ПОЛ")] || " # " ||  //td[contains(text(),"ПРИСХОЖДЕНИЕ")] || " # " ||  //td[contains(text(),"ПЕРЕВОД")] 

Но уж очень много времени занимал последовательный парсинг. Поэтому было решено сделать это параллельно на нескольких компьютерах. Удивительно, но в итоге это вылилось всего в один класс на java.

Каждая HTML-страничка парсится с помощью XPath-выражений. Документ предварительно чистится с помощью html-cleaner

Код парсинга HTML-странички:
public static ArrayList parseWithXPathList(String path, ArrayList<String> XPathList) {
	count++;
	String str = "";
	ArrayList list = new ArrayList();
	try {
		String content = readFile(path, StandardCharsets.UTF_8);
		TagNode tagNode = new HtmlCleaner().clean(content);
		org.w3c.dom.Document doc = new DomSerializer(new CleanerProperties()).createDOM(tagNode);
		// And then use the standard JAXP interfaces to query it:
		XPath xpath = XPathFactory.newInstance().newXPath();
		Iterator<String> it = XPathList.iterator();
		while (it.hasNext()) {
			String XPath = it.next();
			String res = (String) xpath.evaluate(XPath, doc, XPathConstants.STRING);
				list.add(res);
			}

			// System.out.println(str);
		} catch (Exception e) {
			str = "" + e;
			list.add(str);
		}
		return list;
	}


Вызывается так:
ArrayList Xpaths = new ArrayList(Arrays.asList("//title", "//td/h1"));
ArrayList ResultList = parseWithXPathList(param.toString(), Xpaths);

Теперь осталось только обойти дерево каталога с помощью стандартного волкера:
Files.walkFiletree(startingDir,opts,Integer.MAX_VALUE,parseFiles)
И применить к каждому файлу парсер.
if (file.toString().endsWith(".html")) {
	 ArrayList<String> Xpaths= new
	 ArrayList<String>(Arrays.asList("//title","//td/h1"));
	 ArrayList<String>
	 ResultList=parseWithXPathList(file.toString(),Xpaths);
	 System.out.format(" %d ) %s %n", count,""+ResultList);
	  //castOneAsync(ignite, "" + file);
	}


Парсинг миллиона файлов на одной машине последовательно занимает ~12 часов
Попробовал распараллелить код с помощью фреймфорка GridGain, а точнее его некоммерческой версии Apache Ignite.
Работает эта штука так: вам необходимо запустить ноды (на одной или нескольких машинах), ноды найдут друг друга по сети и напишут у себя в консолях сколько там процессоров и памяти у вас организовалось в кластер (это ваши slaves). Я запустил 12 нод на 3 машинах (каждая по 4 ядра и 16 GB RAM).
После выкачки сайта (~500GB) папочка с html-ками расшаривается для доступа по сетке. Расшаренная папочка должна быть видна всем нодам (проверьте права доступа!)

Далее, вы должны написать простенькое java app в котором тоже должны стартовать master-ноду:
Ignite ignite = Ignition.start("D:\\grid\\1.4\\apache-ignite-fabric-1.4.0-bin\\config\\default-config.xml");


После этого вы сможете у нее спрашивать состояние кластера и кастовать job’ы на произвольные ноды.
Деление на master и slave тут условное. Ноды равноправны. Я называю мастером ноду на которой будет выполняться начальный код. Вообще-то, можно сделать сегментацию кластера по типам нод, но нам это сейчас ни к чему.

Код кастующий job с парсингом на ноду:

public static void castOneAsync(Ignite ignite, String param) {
	// Enable asynchronous mode.
	IgniteCluster cluster = ignite.cluster();
IgniteCompute asyncCompute =                                                       ignite.compute(cluster.forRemotes()).withAsync();
// Asynchronously execute a job.
	asyncCompute.call(() -> {
		System.out.println("processing: " + param);

ArrayList<String> Xpaths = new  ArrayList<String>(Arrays.asList("//title", "//td/h1"));
ArrayList<String> ResultList = parseWithXPathList(param.toString(), Xpaths);

	System.out.format(" %d ) %s \n %n", count, "" + ResultList);
	return ""+param+" :" + ResultList;
	});

	// Get the future for the above invocation.
	IgniteFuture<String> fut = asyncCompute.future();

	// Asynchronously listen for completion and print out the result.
	fut.listen(f -> {
		String resultStr = f.get()+" \n";
		// System.out.println("Job result: " + resultStr);
		count++;
			try {
           Files.write(Paths.get("d:\\grid\\result.txt"), resultStr.getBytes(), StandardOpenOption.APPEND);
			} catch (IOException e) {
				System.out.println("" + e);
			}
			if (count%100==0) System.out.println( "processed: "+count   );
		});
	}


Важные моменты:
  • Нода представляет из себя папку с java программой и конфигурацией в виде default-config.xml файла. Стартует из ignite.bat.
    Папку с нодой копируете на произвольные машины в локальной сети. Можно запускать несколько инстансов ноды на одной машине.
    Скачать ноду (Они ее называют fabric'ой)
  • У всех нод должен быть один и тот же конфигурационный файл.
  • Ignite поддерживает концепцию peer class loading (Zero Deployment). Это означает, что вам не надо переписывать ваш проект на все ноды. Он сам задеплоится. Это очень крутая фича. Экономит кучу времени.
    Но надо включать эту фичу в конфигурационном файле. Пишут, что с выключенной фичей работает быстрее. Не проверял.
  • Вы можете запустить несколько нод на одной машине и эмулировать работу с кластером.
  • Да, в кластер можно добавлять и удалять ноды в процессе работы апп.
  • В конфигурационном файле надо указать IP-адреса машин в кластере
  • Нужна java 8, так как используются lambda
  • Если остановить master node, то таски, к-е она кастанула, на других машинах умрут.
  • Вы можете спрашивать у фреймворка данные о любой машине кластера: загрузка CPU, свободная память, количество job'ов, но я доверился визарду решающему на какую машину лучше кинуть job

В итоге, мне удалось уместить весь проект в один java class ~200 строк кода с комментариями. Классу нужны jar files с htmlcleaner и apache ignite.

Можно вместо html cleaner воспользоваться внешней утилитой Xidel. Она поддерживает XQuery и XPath.
Tогда надо ее прописать на всех машинах с нодами в системную переменную PATH а затем вызываьт прямо из java. Зато будете наслаждаться XQuery.

Если публикация вызовет интерес, то напишу еще про распределенный кэш, очереди и другие distributed штучки на этом фреймворке

Исходный код проекта для Eclipse

package gridE;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.Ignition;
import org.apache.ignite.lang.IgniteFuture;
import org.htmlcleaner.CleanerProperties;
import org.htmlcleaner.DomSerializer;
import org.htmlcleaner.HtmlCleaner;
import org.htmlcleaner.TagNode;

import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathFactory;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Scanner;
import static java.nio.file.FileVisitResult.CONTINUE;

/**
 * Created by Veaceslav Kunitki on 11/13/2015.
 * This class parse files on cluster with "Apache Ignite" framework
 */

import static java.nio.file.FileVisitResult.*;

public class ParseFilesOnCluster extends SimpleFileVisitor<Path> {

	Ignite ignite; 
	public static long count = 0; //  counter of parsed files

	// Java standart FileTree walker 
	@Override
	public FileVisitResult visitFile(Path file, BasicFileAttributes attr) {
		if (attr.isSymbolicLink()) {
			System.out.format("Symbolic link: %s ", file);
		} else if (attr.isRegularFile()) {
			// System.out.format("Regular file: %s ", file);

			if (file.toString().endsWith(".html") ) {
			//if (file.toString().endsWith(".html") ) { // uncomment it for serial  processing
				//ArrayList<String> Xpaths = new ArrayList<String>(Arrays.asList("//title", "//td/h1"));
				// ArrayList<String>
				// ResultList=parseWithXPathList(file.toString(),Xpaths);
				// System.out.format(" %d ) %s %n", count,""+ResultList);
				castOneAsync(ignite, "" + file); // parallel processing
			}
		} else {
			System.out.format("Other: %s ", file);
		}
		return CONTINUE;
	}

	// Print each directory visited.
	@Override
	public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
		System.out.format("Directory: %s%n", dir);
		return CONTINUE;
	}

	@Override
	public FileVisitResult visitFileFailed(Path file, IOException exc) {
		System.err.println(exc);
		return CONTINUE;
	}

	static String readFile(String path, Charset encoding) throws IOException {
		byte[] encoded = Files.readAllBytes(Paths.get(path));
		return new String(encoded, encoding);
	}

	public static ArrayList parseWithXPathList(String path, ArrayList<String> XPathList) {
		count++;
		String str = "";
		ArrayList list = new ArrayList();
		try {
			String content = readFile(path, StandardCharsets.UTF_8);
			TagNode tagNode = new HtmlCleaner().clean(content);
			org.w3c.dom.Document doc = new DomSerializer(new CleanerProperties()).createDOM(tagNode);
			// And then use the standard JAXP interfaces to query it:
			XPath xpath = XPathFactory.newInstance().newXPath();
			// String str = (String) xpath.evaluate("//div//td[contains(@id,
			// 'foo')]/text()",
			Iterator<String> it = XPathList.iterator();
			while (it.hasNext()) {
				String XPath = it.next();
				String res = (String) xpath.evaluate(XPath, doc, XPathConstants.STRING);
				list.add(res);
			}

			// System.out.println(str);
		} catch (Exception e) {
			str = "" + e;
			list.add(str);
		}
		return list;
	}

	/*
	 * Asynchronously execute a job on external PC
	 */
	public static void castOneAsync(Ignite ignite, String param) {
		// Enable asynchronous mode.
		IgniteCluster cluster = ignite.cluster();
		// IgniteCompute compute1 = ignite.compute(cluster.forRemotes());

		IgniteCompute asyncCompute = ignite.compute(cluster.forRemotes()).withAsync();

		// Asynchronously execute a job.
		asyncCompute.call(() -> {
			// Print hello world on some cluster node and wait for completion.
			System.out.println("processing: " + param);
			ArrayList<String> Xpaths = new ArrayList<String>(Arrays.asList("//title", "//li/@data-zoom"));
			ArrayList<String> ResultList = parseWithXPathList(param.toString(), Xpaths);
			System.out.format(" %d ) %s \n %n", count, "" + ResultList);
			String text = new Scanner(new File(param.toString()), "UTF-8").useDelimiter("\\A").next();
			return "{ 'url':" + param + " ,'ResultList'=" + ResultList + " }";
		});

		// Get the future for the above invocation.
		IgniteFuture<String> fut = asyncCompute.future();

		// Asynchronously listen for completion and print out the result.
		fut.listen(f -> {
			String resultStr = f.get() + " \n";
			// System.out.println("Job result: " + resultStr);
			count++;

			try {
				Files.write(Paths.get("d:\\grid\\result.txt"), resultStr.getBytes(), StandardOpenOption.APPEND ); //Warning! File must be exist, do it manual!
			} catch (IOException e) {
				System.out.println("" + e);
			}
			if (count % 100 == 0)
				System.out.println("processed: " + count);
		});
	}

	public static void main(String[] args) throws Exception {
		System.out.println("# Distributed parser!");
		Ignite ignite = Ignition.start("D:\\grid\\1.4\\apache-ignite-fabric-1.4.0-bin\\config\\default-config.xml");	
		IgniteCluster cluster = ignite.cluster();
		// Compute instance over remote nodes.
		IgniteCompute compute4remote = ignite.compute(cluster.forRemotes());
		// Print hello message on all remote nodes.
		compute4remote.broadcast(
				() -> System.out.println("---===Distributed parser started===---: " + cluster.localNode().id()));
		System.out.println( "Cluster ready!"   );
		if (true) { // start parsing job
			// final Path startingDir = Paths.get("d:/home/familytree.ru/");
			Path startingDir = Paths.get("\\\\SERGIU-PC\\temp"); // shared directory with HTML-files
			EnumSet<FileVisitOption> opts = EnumSet.of(FileVisitOption.FOLLOW_LINKS);
			ParseFiles parseFiles = new ParseFiles();
			parseFiles.ignite = ignite;
			// log time to file
			PrintWriter writer = new PrintWriter("d:\\grid\\start.txt", "UTF-8");			
			String dateTime = "" + (new Date());
			writer.println(dateTime + "\n");
			System.out.println(dateTime + "\n");
			writer.close();
			System.out.println("# walking...!");
			Files.walkFileTree(startingDir, opts, Integer.MAX_VALUE, parseFiles);
			// log end time			
			dateTime = "" + (new Date());
			Files.write(Paths.get("d:\\grid\\start.txt"), dateTime.getBytes(), StandardOpenOption.APPEND);
		}
	}
}



POM- file с зависимостями проекта

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>gridE</groupId>
	<artifactId>gridE</artifactId>
	<version>0.0.1-SNAPSHOT</version>


	<repositories>
		<repository>
			<id>GridGain External Repository</id>
			<url>http://www.gridgainsystems.com/nexus/content/repositories/external</url>
		</repository>
	</repositories>


	<dependencies>

		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-core</artifactId>
			<version>1.4.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-spring</artifactId>
			<version>1.1.4</version>
		</dependency>

		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-indexing</artifactId>
			<version>1.4.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-examples</artifactId>
			<version>1.0.0-RC1</version>
		</dependency>
		<dependency>
			<groupId>net.sourceforge.htmlcleaner</groupId>
			<artifactId>htmlcleaner</artifactId>
			<version>2.15</version>
		</dependency>
	</dependencies>





</project>


Конфигурационный файл для ноды

<?xml version="1.0" encoding="UTF-8"?>

<!--
    _________        _____ __________________        _____
    __  ____/___________(_)______  /__  ____/______ ____(_)_______
    _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __     / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
    \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
 Copyright (C) GridGain Systems. All Rights Reserved.
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at
     http://www.apache.org/licenses/LICENSE-2.0
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-->

<!--
    Ignite Spring configuration file.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util-2.0.xsd">
    <bean class="org.apache.ignite.configuration.IgniteConfiguration">
        <!-- Set to true to enable grid-aware class loading for examples, default is false. -->
        <property name="peerClassLoadingEnabled" value="true"/>

        <property name="marshaller">
            <bean class="org.apache.ignite.marshaller.optimized.OptimizedMarshaller">
                <!-- Set to false to allow non-serializable objects in examples, default is true. -->
                <property name="requireSerializable" value="false"/>
            </bean>
        </property>

        <!-- Enable events for examples. -->
        <property name="includeEventTypes">
            <util:constant static-field="org.apache.ignite.events.EventType.EVTS_ALL"/>
        </property>


        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <!-- Uncomment multicast IP finder to enable multicast-based discovery of initial nodes. -->
                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">-->
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <!-- In distributed environment, replace with actual host IP address. -->
                                <value>127.0.0.1:47500..47509</value>
				<value>192.168.4.110:47500..47509</value>
				<value>192.168.4.117:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>



   <property name="cacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
            <!-- Set a cache name. -->
            <property name="name" value="cacheName"/>
            
            <!-- Set cache mode. -->
            <property name="cacheMode" value="PARTITIONED"/>
 
        </bean>
    </property>



    </bean>
</beans>

Комментарии (14)


  1. sergeypid
    23.11.2015 18:20
    +1

    Как Ignite соотносится с Akka? Похоже, что Ignite < Akka, но вопрос рекомендаций — в каких случаях применять Ignite?


    1. tumikosha
      23.11.2015 19:18
      +1

      Я не знаю Akka, но читал сравнения
      Разница в концепте:
      Akka — distributed actors
      Ignite — distributed closures

      Akka и Erlang для реалтайма. Они быстро гоняют сообщения между нодами.
      Т.е. используются в Massively Concurrent Application.

      Ignite гоняет код между нодами, т.е. это для MapReduce и BigData
      В Ignite есть Zero Deployment. (нет необходимости деплоить код на все ноды, сам расползается)

      В Erlang нет zero deploymenta. (Но можно сделать через метапрограмминг)
      Не знаю есть ли он в Akka. Думаю что нет.

      В Ignite есть много другой функциональности: distributed caching, service grid, streaming, etc.
      Затрудняюсь ответить есть ли она в Akke.


    1. kefirr
      23.11.2015 20:10
      +1

      Ignite и Akka слишком разные продукты, чтобы знаки неравенства между ними ставить. Достаточно взглянуть на API.
      Ignite — это распределенная SQL+noSQL БД в памяти + map/reduce и прочие виды кластерных вычислений.
      Akka решает несколько другие задачи, насколько я понимаю.


      1. bitec
        25.11.2015 14:12
        +2

        Тысячу лет назад GridGain позиционировался как IMDG — грубо говоря распределенная мапа, «все данные в памяти». Прошло много лет, они уже ушли от этого термина, но в-целом главный юзкейс остается прежним — данные, распределенные по узлам в виде ключ-значение (лучшего пока не придумали), которые можно обрабатывать локально на узле. Если бы автор не использовал расшаренный диск, то правильным сценарием было бы — распределить данные по узлам, запустить код, обрабатывающий данные, а локальность (affinity) была бы достигнута автоматически

        П.с. не туда ответил, веткой выше хотел


  1. ksm
    23.11.2015 21:45
    +1

    Маленькие поправки внесу для pom.xml: секция repositories не нужна, зависимости вытягиваются из апачевского репозитория; для ignite-spring и ignite-examples м.б. версия 1.4.0 (не уверен реально нужна ли зависимость на examples)?


  1. Assargin
    24.11.2015 11:43
    +1

    Так а сколько по времени занял тот же объём работы, но уже распределённо?


    1. tumikosha
      24.11.2015 13:59
      +2

      ~3 часа. На 2 компах (4 ядра каждый. 16 Gb Ram) запущено было 12 нод. По несколько нод на каждом компе


  1. bald2b
    24.11.2015 15:24

    Для распараллеливания выполнения процессов можно использовать еще gearman


    1. tumikosha
      24.11.2015 17:11

      Спасибо. Интересная штука.
      Думаю вот поэкспериментировать с распараллеливанием Selenium RC


  1. kuaw26
    24.11.2015 17:21
    +1

    Интересно, пиши еще про другие распределенные штуки, только исходники можно в спойлеры заворачивать?


    1. tumikosha
      25.11.2015 22:27

      ок. Мне кажется что если я исправлю текст, то у меня карма обнулится в recovery mode


  1. bitec
    25.11.2015 14:08

    Спасибо за статью, главное преимущество GG — их специализированный classloading through network, в противном случае деплоймент был бы геморойным.

    Только вот что такое «кастовать»? Ужасное слово, режет слух, во всем программном мире «cast» понимается как «приводить тип» все-таки, а не что-то иное


    1. tumikosha
      25.11.2015 20:12

      cast spell- бросить заклинание
      в смысле «выполнить код на другой машине»
      да, жаргонизм канечно же


    1. tumikosha
      25.11.2015 22:26

      Кстати, по умолчанию у них classloading выключен. Надо включать. Свое главное преймущество скрывают. Поди узнай о нем, вот и написал статью чтобы восполнить этот пробел