В статье в ознакомительных целях рассматривается процесс создания простого хранилища простых текстовых документов на базе Riak версии 2.1.1 и организация поиска по ним с помощью Riak Search (Yokozuna). В качестве клиентской библиотеки используется официальный клиент для Erlang.

Для начала представим, что у нас есть огромное количество таких документов:
  • title — заголовок;
  • body — содержимое;
  • tags — тэги;
  • created_at — время создания;
  • smiles — количество смайликов (плюсиков, лайков, как хотите)

и огромное количество пользователей, желающих их изменять. Кому интересно, начнём.

Я подразумеваю, что читатели уже имеют некоторое представление о Riak. Если нет, то лучше сначала почитать здесь и тут, или, конечно, официальную документацию.

Установка и начальная настройка


Для запуска Riak Search в системе должна быть установлена Java. Установить сам Riak в OSX можно через Homebrew, а Erlang, если потребуется, будет установлен автоматически:
brew install riak

Для наших учебных целей нет смысла разворачивать целый кластер, можно ограничиться всего одним узлом, поэтому перед запуском потребуется выполнить лишь минимальную настройку. В конфиге нужно активировать Riak Search:
## To enable Search set this 'on'.
##
## Default: off
##
## Acceptable values:
##   - on or off
search = on

Если Riak установлен с помощью Homebrew, то конфиг лежит здесь — /usr/local/Cellar/riak/2.1.1/libexec/etc/riak.conf

Еще потребуется увеличить лимит на открытые файлы, если этого не сделать, при запуске будет выведено предупреждение. В OSX Yosemite я сделал так:
echo kern.maxfiles=65536 >> /etc/sysctl.conf
echo kern.maxfilesperproc=65536 >> /etc/sysctl.conf
sudo sysctl -w kern.maxfiles=65536
sudo sysctl -w kern.maxfilesperproc=65536
echo ulimit -n 65536 65536 >> ~/.bash_profile
ulimit -n 65536 65536

Теперь можно запустить Riak:
riak console

И можно протестировать из другой shell-сессии:
riak-admin test

Ответ должен быть примерно таким: Successfully completed 1 read/write cycle to 'riak@127.0.0.1'

Подробнее об установке и настройках:



Riak Data Types


Eventually Consistent хранилища, к которым относится Riak, допускают возникновение т.н. Data Incostistent ситуаций, когда содержимое одного и того же ключа на разных репликах отличается. В зависимости от настроек, Riak может попытаться решить конфликты сам с помощью vector clocks или timestamps, или же переложить обязанность определить правильную версию значения на приложение, предоставив ему все имеющиеся версии (siblings). В реальной ситуации, если ваши документы будут редактироваться в многопользовательском режиме и реплицироваться на несколько узлов, мердж конфликтных данных может стать весьма непростой задачей. В этом случае, возможно, лучшим решением будет использование Riak Data Types (также известных как CRDT). Эта технология позволяет описать данные с помощью специальных типов, которые возьмут на себя решение задачи конвергентности данных кластера и освободят приложение от обязанностей по решению конфликтов.

Riak Data Types, на текущий момент, реализуют следующие пять типов CRDT:
  • flag — Битовый флаг. Доступные операции — снять, установить. Может использоваться только внутри map;
  • counter — Счётчик. Доступные операции — увеличить, уменьшить. Тоже может использоваться только внутри map;
  • register — Какое-либо значение (хранится как строка);
  • set — Множество значений. Доступные операции — добавить элемент, удалить элемент;
  • map — Контейнер для других типов. Позволяет хранить внутри себя флаги, счётчики, регистры, множества и вложенные map'ы. Доступные операции — добавить поле, удалить поле, а также операции для внутренних полей, соответствующие их типам

В соответствие с этими типами, будем представлять наши документы как map со следующей структурой:
  • title — register;
  • tags — set;
  • body — register;
  • created_at — register;
  • smiles — counter

Создадим и активируем bucket-type под названием documents-type для хранения наших документов:
riak-admin bucket-type create documents-type '{"props":{"datatype":"map"}}'
riak-admin bucket-type activate documents-type

Более подробная информация по Eventually Consistent, Riak Data Types и CRDT:



Riak Search


Мы хотим реализовать поиск наших документов по тэгам, по заголовку, по дате создания и по содержимому. Для этого мы будем использовать технологию Riak Search под кодовым названием Yokozuna, которая по своей сути является посредником между хранилищем Riak и поисковиком Apache Solr. Yokuzuna сама запускает и мониторит на каждом узле кластера отдельный JVM-процесс с Solr, передаёт ему поисковые запросы и изменения в данных.

Для того, чтобы Solr знал как индексировать наши документы, нам необходимо создать поисковую схему. Вообще Riak Search имеет и дефолтную схему на все случаи жизни — _yz_default, которую удобно использовать во время разработки, но для рабочего окружения лучше создать свою.

Так как структура данных у нас уже определена, мы создадим схему сразу. В схеме нужно перечислить поля документа, для каждого из них указать тип, необходимо ли строить по нему индекс и хранить его значения, чтобы потом вернуть их в поисковой выдаче. Также в схему нужно обязательно включить служебные поля Riak Search. Следует заметить, что, при использовании Riak Data Types, к названиям полей добавляется суффикс, соответствующий их типу. Таким образом, у нас получится следующее описание:
    <field name="title_register" type="string" indexed="true" stored="false" multiValued="fase" />
    <field name="body_register" type="text_ru" indexed="true" stored="false" multiValued="true" />
    <field name="tags_set" type="string" indexed="true" stored="false" multiValued="true" />
    <field name="created_at_register" type="tdate" indexed="true" stored="false" multiValued="false" omitNorms="true" />

Под спойлером полное содержимое файла со схемой:
docs_seacrh_schema.xml
<?xml version="1.0" encoding="UTF-8" ?>
<schema name="schedule" version="1.5">
    <fields>
        <field name="title_register" type="string" indexed="true" stored="false" multiValued="fase" />
        <field name="body_register" type="text_ru" indexed="true" stored="false" multiValued="true" />
        <field name="tags_set" type="string" indexed="true" stored="false" multiValued="true" />
        <field name="created_at_register" type="tdate" indexed="true" stored="false" multiValued="false" omitNorms="true" />

        <!--Все остальные поля игнорируем-->
        <dynamicField name="*" type="ignored" />

        <!--Это обязательные поля для Riak Search-->
        <field name="_yz_id"   type="_yz_str" indexed="true" stored="true"  multiValued="false" required="true"/>
        <field name="_yz_ed"   type="_yz_str" indexed="true" stored="false" multiValued="false"/>
        <field name="_yz_pn"   type="_yz_str" indexed="true" stored="false" multiValued="false"/>
        <field name="_yz_fpn"  type="_yz_str" indexed="true" stored="false" multiValued="false"/>
        <field name="_yz_vtag" type="_yz_str" indexed="true" stored="false" multiValued="false"/>
        <field name="_yz_rk"   type="_yz_str" indexed="true" stored="true"  multiValued="false"/>
        <field name="_yz_rt"   type="_yz_str" indexed="true" stored="true"  multiValued="false"/>
        <field name="_yz_rb"   type="_yz_str" indexed="true" stored="true"  multiValued="false"/>
        <field name="_yz_err"  type="_yz_str" indexed="true" stored="false" multiValued="false"/>
    </fields>

    <uniqueKey>_yz_id</uniqueKey>

    <types>
        <fieldType name="string" class="solr.StrField" sortMissingLast="true" />
        <fieldType name="tdate" class="solr.TrieDateField" precisionStep="6" positionIncrementGap="0" sortMissingLast="true" />

        <fieldType name="text_ru" class="solr.TextField" positionIncrementGap="100">
            <analyzer>
               <tokenizer class="solr.StandardTokenizerFactory"/>
               <filter class="solr.LowerCaseFilterFactory"/>
               <filter class="solr.StopFilterFactory" ignoreCase="true" words="lang/stopwords_ru.txt" format="snowball" />
               <filter class="solr.SnowballPorterFilterFactory" language="Russian"/>
               <!-- less aggressive: <filter class="solr.RussianLightStemFilterFactory"/> -->
            </analyzer>
        </fieldType>

        <fieldtype name="ignored" stored="false" indexed="false" multiValued="true" class="solr.StrField" />

        <!-- YZ String: Used for non-analyzed fields -->
        <fieldType name="_yz_str" class="solr.StrField" sortMissingLast="true" />
    </types>
</schema>


Подробности по Riak Search:



Доступ к Riak из Erlang


Пришло время наконец подключиться к нашей ноде с помощью клиента для Erlang:
# скачаем и скомпилируем клиент:
git clone https://github.com/basho/riak-erlang-client.git
cd riak-erlang-client/
make
cd ..
# запустим REPL:
erl -pa riak-erlang-client/ebin riak-erlang-client/deps/*/ebin

Подключаемся
{ok, RiakPid} = riakc_pb_socket:start_link("127.0.0.1", 8087).
% Проверим соединение
pong = riakc_pb_socket:ping(RiakPid).

Создаём поисковую схему и индекс
{ok, Schema} = file:read_file("docs_search_schema.xml").
ok = riakc_pb_socket:create_search_schema(RiakPid, <<"documents-schema">>, Schema).
ok = riakc_pb_socket:create_search_index(RiakPid, <<"documents-index">>, <<"documents-schema">>, []).

Создаём bucket и назначаем поисковый индекс
ok = riakc_pb_socket:set_search_index(RiakPid, {<<"documents-type">>, <<"documents-bucket">>}, <<"documents-index">>).

Создаём новый документ
Map = riakc_map:new().

% Заголовок документа - регистр (register)
Map1 = riakc_map:update({<<"title">>, register},
                        fun(Reg) ->
                            riakc_register:set(<<"DocumentTitle">>, Reg)
                        end,
                        Map).

% Тело документа - тоже регистр (register)
Map2 = riakc_map:update({<<"body">>, register},
                        fun(Reg) ->
                            riakc_register:set(<<"Some Document Body">>, Reg)
                        end,
                        Map1).

% Тэги - множество (set)
Map3 = riakc_map:update({<<"tags">>, set},
                        fun(Set) ->
                            Set1 = riakc_set:add_element(<<"Tag One">>, Set),
                            Set2 = riakc_set:add_element(<<"Tag Two">>, Set1),
                            Set2
                        end,
                        Map2).

% Дата создания - регистр (register)
Map4 = riakc_map:update({<<"created_at">>, register},
                        fun(Reg) ->
                            % Cледует заметить что Solr понимает только даты в формате ISO8601.
                            % https://cwiki.apache.org/confluence/display/solr/Working+with+Dates.
                            ISODateFmtStr = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0BZ",
                            {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:universal_time(),
                            ISODate = list_to_binary(io_lib:format(ISODateFmtStr, [Year, Month, Day, Hour, Min, Sec])),

                            riakc_register:set(ISODate, Reg)
                        end,
                        Map3).

% Выполним операции по созданию нашего документа в хранилище
MapOperations = riakc_map:to_op(Map4).
ok = riakc_pb_socket:update_type(RiakPid, {<<"documents-type">>, <<"documents-bucket">>}, <<"DocumentKey">>, MapOperations).

Находим и получаем документы
% Загрузим определения записей Riak Client.
rr("riak-erlang-client/include/riakc.hrl").

% Пример запроса по заголовку тэгу, содержимому и дате создания
{ok, Results} = riakc_pb_socket:search(RiakPid, <<"documents-index">>, <<"title_register:DocumentTitle AND tags_set:\"Tag One\" AND body_register:Some AND created_at_register:[1972-05-20T17:33:18Z TO NOW]">>).

% Получение документов
Docs = Results#search_results.docs.
lists:foldr(fun({_Index, Doc}, Acc) ->
                {_, DocumentId} = lists:keyfind(<<"_yz_rk">>, 1, Doc),
                {ok, {map, Image, _, _, _}} = riakc_pb_socket:fetch_type(RiakPid, {<<"documents-type">>, <<"documents-bucket">>}, DocumentId),
                Image
            end,
            [],
            Docs).

Меняем документ
riakc_pb_socket:modify_type(RiakPid, fun(Map) ->
                                         % Обновим содержимое
                                         UpdatedMap1 = riakc_map:update({<<"body">>, register},
                                                                 fun(Register) ->
                                                                     riakc_register:set(<<"Новое содержимое">>, Register)
                                                                 end, Map),

                                         % Удалим тэг
                                         UpdatedMap2 = riakc_map:update({<<"tags">>, set},
                                                                 fun(Set) ->
                                                                     riakc_set:del_element(<<"Tag One">>, Set)
                                                                 end, UpdatedMap1),

                                         % Добавим 10 смайликов
                                         UpdatedMap3 = riakc_map:update({<<"smiles">>, counter},
                                                                 fun(Counter) ->
                                                                     riakc_counter:increment(10, Counter)
                                                                 end, UpdatedMap2),
                                         UpdatedMap3
                                     end,
                            {<<"documents-type">>, <<"documents-bucket">>}, <<"DocumentKey">>, []).

% Проверим
riakc_pb_socket:search(RiakPid, <<"documents-index">>, <<"body_register:\"Содержимое\"">>).

Удаляем документ
riakc_pb_socket:delete(RiakPid, {<<"documents-type">>, <<"documents-bucket">>}, <<"DocumentKey">>).

% Проверим
riakc_pb_socket:search(RiakPid, <<"documents-index">>, <<"body_register:\"Содержимое\"">>).

Больше информации по работе с клиентской библиотекой:



Пока на этом всё. Если эта статья окажется полезной, то в следующий раз напишу о том, как для всего этого сделать веб-интерфейс на базе cowboy.

Комментарии (0)