Привет! Меня зовут Артем Гаврилов и я работаю в Tarantool. Сегодня я расскажу, как быстро создать объектное хранилище на основе платформы in-memory вычислений Tarantool и распределённой файловой системы IPFS (InterPlanetary File System).


Мы рассмотрим пример шардирования стороннего приложения с помощью Tarantool и сделаем MVP объектного хранилища с отказоустойчивостью на уровне ЦОДа, в то время как более простые решения отказоустойчивы только на уровне нескольких серверов.


Тем, кто знаком с IPFS, вероятно, будет интересно читать начиная с раздела «С чем мы столкнёмся».


IPFS — это вообще что?


Если кратко, IPFS (от англ. InterPlanetary File System) — это полностью распределённое файловое хранилище. Оно обладает несколькими характерными свойствами:


  • Доступ к данным по их хешу.
  • Управление и доступ к файлам через протокол HTTP.
  • Все файлы на ноде делятся на закреплённые (pinned) и незакреплённые. Различие между ними в том, что первые никогда не удаляются из ноды. По-настоящему нода хранит именно закреплённые файлы, а все остальные — это просто кеш на диске.
    При отсутствии файла на конкретной ноде IPFS скачает его с другой. Закрепление делается вручную через API.
  • Если запросить у сервера файл, который на нём не хранится, то сервер сначала скачает его как незакреплённый, а потом отдаст по HTTP. А если ему не хватит места, то вытеснит один из незакреплённых файлов.
  • Хранилище публично, все ноды связаны и образуют глобальный кластер. Однако данные, которые никто не запрашивает, по логике предыдущего пункта со временем уйдут из всех нод и навсегда потеряются.

Таким образом каждая нода IPFS является сразу шлюзом в HTTP, кешем и холодным хранилищем. Из этого следует, что система сама может переносить данные между нодами. Но IPFS нужна помощь в выборе того, как складывать данные для холодного хранения. Также крайне желательно всегда обращаться к серверу, на котором закреплён нужный файл.


Почему мы выбрали IPFS?


На то есть несколько причин.


  1. IPFS — законченный продукт, готовый хранить большие объёмы данных.
  2. IPFS широко применяется в различных сферах. Так, например, он используется криптовалютой Ethereum для реализации NFT токенов: в блокчейне сохраняются только 256 бит хеша файла, а сам файл публикуется в IPFS, не занимая место в блокчейне.
  3. Некоторые крупные компании используют публичные серверы, для кеширования своих данных (или для обхода блокировок). Например, Cloudflare.
  4. IPFS также устраняет некоторые проблемы безопасности. В частности, адресация на основе содержимого позволяет хранить и рассчитывать хеши в метаданных без создания отдельного решения.

Подробне про преимущества IPFS можно почитать здесь.


При чём тут Tarantool?


С помощью Tarantool (точнее, Tarantool Cartridge) мы будем шардировать данные и искать IPFS-ноды, на которых эти данные расположены.


Tarantool подходит для этой задачи, так как:


  • У платформы есть готовый механизм шардирования (vshard), к которому можно добавить свою логику.
  • Tarantool может общаться по HTTP с IPFS.

Кроме того, платформа Tarantool — это база данных со встроенным сервером приложений, что также сильно упростит реализацию.


Почему не IPFS Cluster?


Существует нативное решение IPFS Cluster, однако у него есть недостатки. Он просто считает количество копий закреплённых данных, а не распределяет их по наборам метрик, как типовой модуль шардирования Tarantool vshard. Это влияет на уровень отказоустойчивости: если собирать набор реплик из серверов в разных ЦОДах, то можно добиться отказоустойчивости даже при проблеме с одним из ЦОДов (а такие бывают). Помимо прочего, IPFS-кластер ограничивает нас отсутствием метаданных и поисковыми индексами только по хешу.


С чем мы столкнёмся?


Длительные операции
Работа с большими файлами — всегда долгая операция. При этом триггеры также накладывают ограничение: они должны выполняться в рамках одной транзакции и как следствие не могут делать сетевые вызовы. Поэтому надо будет предусмотреть асинхронное обновление.


Асинхронная обработка предполагает работу с очередями, и для этого мы будем использовать библиотеку Tarantool-queue, которая позволяет заводить локальные (не распределённые) очереди задач.


Большой объём трафика


Надо будет очень аккуратно пользоваться HTTP, не читая лишний раз тело запроса и активно пользуясь редиректами.


Сторонний API, под который надо подстроиться


IPFS — готовый продукт, менять исходный код которого мы не можем. Это означает, что наши возможности ограничиваются возможностями его API.


Базовая концепция решения


Мы будем придерживаться такой концепции:


  • Разворачиваем рядом с каждым хранилищем Tarantool ноду IPFS.
  • Управляем IPFS-нодой только с локального хранилища.
  • При этом на IPFS-ноде лежат те же объекты, что и в локальном хранилище.
  • Задачи, связанные с IPFS, выполняем в очередях.
  • Когда мы хотим закрепить или открепить новый файл, мы запишем данные о нём в хранилище и поставим в соответствующую очередь задачу.

В конечном итоге получится такая топология:



Минимальный вариант: поиск по хешам


Схема данных и очереди задач


Делаем две записи о хеше:


  1. Шардированную — которая находится на тех серверах, на чьих локальных нодах IPFS мы хотим закрепить файлы (желаемое расположение данных).
  2. Нешардированную — которая соответствует последнему состоянию (закреплённости) в локальном хранилище (фактическое расположение данных).

Также сделаем две очереди: на закрепление (добавление) и открепление (удаление) данных:


Код
local queue = require('queue')

local function init_schema(is_master)
  local pinned = box.schema.space.create('pinned',{format={
      {'ipfs_id', 'string'},
      {'bucket_id', 'unsigned'}
    }, if_not_exists=true})

  pinned:create_index('primary', {parts={{1, 'string'}}, unique=true, if_not_exists=true})
  pinned:create_index('bucket_id', {parts={{2, 'unsigned'}}, unique=false, if_not_exists=true})

  if is_master then
    queue.create_tube('ipfs_add', 'fifottl', {if_not_exists = true})
    queue.create_tube('ipfs_remove', 'fifottl', {if_not_exists = true})
  end

  local pinned_no_shard = box.schema.space.create('pinned_no_shard',{format={
      {'ipfs_id', 'string'},
      {'status', 'string'}
    }, if_not_exists=true})

  pinned_no_shard:create_index('primary', {parts={{1, 'string'}}, unique=true, if_not_exists=true})
end

return {
  init_schema=init_schema
}

Задачи на взаимодействие с IPFS будем ставить при записи в шардированную очередь, а при их выполнении будем обновлять данные в нешардированной.


Немного технических полезностей


Небольшой модуль для взаимодействия с IPFS


Нам понадобятся:


  • чтение конфигурации IPFS (чтобы узнать, куда кидать запрос);
  • get_gateway_address — функция для получения адреса шлюза; не мудрим, просто добавим в конфигурацию IPFS-ноды свой параметр gateway_pub_address;
  • управление закреплением файлов(добавление и удаление).

Код
local http = require('http.client').new()
local json = require('json')

local api_prefix = 'http://127.0.0.1:5001'

local function get_config()
  local resp = http:post(api_prefix..'/api/v0/config/show')
  if resp.status == 200 then
    return json.decode(resp.body)
  else
    error(resp.reason)
  end
end

local function get_address()
  local config, err = get_config()
  if err ~= nil then
    return nil, err
  end

  return assert(config.gateway_pub_address, 'no gateway_pub_address in ipfs config')
end

local function pin_add(ipfs_id)
  local resp = http:post(api_prefix..'/api/v0/pin/add?arg='..ipfs_id)
  assert(resp.status == 200, resp.reason)
end

local function pin_rm(ipfs_id)
  local resp = http:post(api_prefix..'/api/v0/pin/rm?arg='..ipfs_id)
  assert(resp.status == 200, resp.reason)
end

return {
  get_config = get_config,
  get_address = get_address,
  pin_add = pin_add,
  pin_rm = pin_rm
}

Выполнение операции на всём наборе реплик одновременно


Наша очередь, а также спейс с нешардированными данными будут синхронно реплицироваться на весь набор реплик. При этом для полного повторения топологии нам надо положить рядом с каждым экземпляром Tarantool экземпляр IPFS. Поэтому при выполнении задачи из очереди надо вызывать функцию на всём наборе реплик. Для этого напишем небольшую функцию:


local function get_curr_replicaset()
  local me = cartridge_lua_api_topology.get_self()
  return cartridge.admin_get_servers(me.uuid)[1].replicaset
end

local function call_on_whole_replicaset(func, params, options)
  local servers_uri = {}
  for _, server in pairs(get_curr_replicaset().servers) do
    table.insert(servers_uri, server.uri)
  end

  local map_results, map_errors = cartridge_pool.map_call(func, params, {uri_list = servers_uri, timeout = options.timeout})
  assert(map_errors == nil, map_errors)
  return map_results
end

Как сделать отложенную синхронизацию с IPFS


Покажем в картинках, как будет происходить работа с асинхронным данными. Обратите внимание, что помимо простых операций записи и удаления у нас есть операция решардинга.Чтобы её правильно выполнять, будем при обычном удалении делать пометки в нешардируемом спейсе. Если пометка есть, то можно удалять сразу, иначе надо дождаться закрепления пина на новом хранилище.


Добавление



Удаление



Решардинг



Реализация


Нам понадобится обрабатывать следующие запросы к роутеру:


  1. HTTP-запрос, перенаправления на объект в IPFS: GET /tnt_ipfs/pin/:ipfs_id
  2. HTTP-запрос на добавление пина: PUT /tnt_ipfs/pin/:ipfs_id
  3. HTTP-запрос на удаление пина: DELETE /tnt_ipfs/pin/:ipfs_id
  4. HTTP-запрос на получение статуса пина: GET /tnt_ipfs/pin/:ipfs_id/status

А также следующие запросы к хранилищу:


  1. Запрос, который отдаст нам адрес соответствующего ему шлюза: ipfs.get_local_ipfs_addr
  2. Запрос на добавление пина: ipfs.pin_write
  3. Запрос на удаление пина: ipfs.pin_delete
  4. Запрос на получение статуса пина: ipfs.pin_status
  5. Запрос между хранилищами на вызов: ipfs.pin_addipfs.internal.pin_add
  6. Запрос между хранилищами на вызов: ipfs.pin_rmipfs.internal.pin_rm

Код для роли роутера


Код
local function init(opts) -- luacheck: no unused args
  local httpd = assert(cartridge.service_get('httpd'), "Failed to get httpd service")
  httpd:route({method = 'GET', path = '/tnt_ipfs/pin/:ipfs_id', public = true}, function(req)
      local ipfs_id = req:stash('ipfs_id')
      local bucket_id = vshard.router.bucket_id_mpcrc32(ipfs_id)
      local gateway_addr, err = vshard.router.callbro(bucket_id, 'ipfs.get_local_ipfs_addr')
      if err ~= nil then
        return {
          status = 500,
          body = err
        }
      end
      return req:redirect_to(gateway_addr..'/ipfs/'..ipfs_id)
    end)

  httpd:route({method = 'PUT', path = '/tnt_ipfs/pin/:ipfs_id', public = true}, function(req)
      local ipfs_id = req:stash('ipfs_id')
      local bucket_id = vshard.router.bucket_id_mpcrc32(ipfs_id)
      local _, err = vshard.router.callrw(bucket_id, 'ipfs.pin_write', {{ipfs_id, bucket_id}})
      if err ~= nil then
        return {
          status = 500,
          body = err
        }
      end
      return {status = 200}
    end)

  httpd:route({method = 'DELETE', path = '/tnt_ipfs/pin/:ipfs_id', public = true}, function(req)
      local ipfs_id = req:stash('ipfs_id')
      local bucket_id = vshard.router.bucket_id_mpcrc32(ipfs_id)
      local _, err = vshard.router.callrw(bucket_id, 'ipfs.pin_delete', {ipfs_id})
      if err ~= nil then
        return {
          status = 500,
          body = err
        }
      end
      return {status = 200}
    end)

  httpd:route({method = 'GET', path = '/tnt_ipfs/pin/:ipfs_id/status', public = true}, function(req)
      local ipfs_id = req:stash('ipfs_id')
      local bucket_id = vshard.router.bucket_id_mpcrc32(ipfs_id)
      local pin_status, err = vshard.router.callro(bucket_id, 'ipfs.pin_status', {ipfs_id})
      if err ~= nil then
        return {
          status = 500,
          body = err
        }
      end
      return {
        status = 200,
        body = pin_status
      }
    end)

  return true
end

Код для роли хранилища


Тут всё немного сложнее:


  1. Напишем в отдельном файле реализацию всех API-запросов:

Код
local ipfs = require('app.ipfs')

local function get_local_ipfs_addr()
  return ipfs.get_address()
end

local function pin_write(tuple)
  box.space.pinned:put(tuple)
end

local function pin_delete(ipfs_id)
  box.space.pinned_no_shard:put({ipfs_id, 'CAN_UNPIN'})
  box.space.pinned:delete(ipfs_id)
end

local function pin_status(ipfs_id)
  if box.space.pinned:get(ipfs_id) ~= nil then
    -- Want to pin
    if box.space.pinned_no_shard:get(ipfs_id) ~= nil then
      return 'Pinned'
    else
      return 'Wait pining'
    end
  else
    -- Want to unpin
    local pns = box.space.pinned_no_shard:get(ipfs_id)
    if pns ~= nil then
      if pns.status == 'CAN_UNPIN' then
        return 'Wait unpining'
      else
        return 'Wait reshard'
      end
    else
      return 'Not found'
    end
  end
end

return {
  ipfs = {
    get_local_ipfs_addr = get_local_ipfs_addr,
    pin_write = pin_write,
    pin_delete = pin_delete,
    pin_status = pin_status,

    internal = {
      pin_add = ipfs.pin_add,
      pin_rm = ipfs.pin_rm
    }
  }
}

  1. Нам понадобится триггер для постановки задач в очередь:

Код
local function on_replace(old, new, s, op)
  if old == nil then
    if new == nil then
      -- ничего не делаем
    else
      queue.tube.ipfs_add:put(new.ipfs_id, {delay=1})
    end
  else
    if new == nil then
      queue.tube.ipfs_remove:put(old.ipfs_id, {delay=1})
    else
      if old.ipfs_id ~= new.ipfs_id then
        queue.tube.ipfs_add:put(new.ipfs_id, {delay=1})
        queue.tube.ipfs_remove:put(old.ipfs_id, {delay=1})
      end
    end
  end
end

  1. Ещё нам понадобятся воркеры для обработки задач:

Код
local ran = false

local function add_queue_worker()
  while ran do
    local task = queue.tube.ipfs_add:take()

    -- проверяем, актуально ли задание
    local p = box.space.pinned:get(task[3])
    local pns = box.space.pinned_no_shard:get(task[3])
    if p == nil or pns ~= nil then
      queue.tube.ipfs_add:ack(task[1])
    else
      -- пытаемся закрепить
      local ok, err = pcall(function()
          call_on_whole_replicaset('ipfs.internal.pin_add', {task[3]})
        end)

      if ok then -- удалось
        box.space.pinned_no_shard:put({task[3], 'PINNED'})
        queue.tube.ipfs_add:ack(task[1])
      else -- не удалось
        log.error(err)
        queue.tube.ipfs_add:release(task[1], {delay=10})
      end
    end
  end
end

local function rm_queue_worker()
  while ran do
    local task = queue.tube.ipfs_remove:take()

    -- проверяем, актуально ли задание
    local p = box.space.pinned:get(task[3])
    local pns = box.space.pinned_no_shard:get(task[3])
    if p ~= nil or pns == nil then
      queue.tube.ipfs_add:ack(task[1])
    else
      local ok, err

      -- проверяем, появился ли объект на новом месте
      ok, err = pcall(function()
          local pns = box.space.pinned_no_shard:get(task[3])
          if pns and pns.status == 'PINNED' then
            -- проверяем, появился ли пин на новом месте
            local bucket_id = vshard.router.bucket_id_mpcrc32(task[3])
            assert(vshard.router.callro(bucket_id, 'ipfs.pin_status', {task[3]}) == 'Pinned')
          end
        end)

      if ok == nil then --рано откреплять
        queue.tube.ipfs_remove:release(task[1], {delay=10})
      else -- пытаемся открепить
        ok, err = pcall(function()
          call_on_whole_replicaset('ipfs.internal.pin_rm', {task[3]})
        end)

        if ok ~= nil then -- удалось
          box.space.pinned_no_shard:delete(task[3])
          queue.tube.ipfs_remove:ack(task[1])
        else -- не удалось
          log.error(err)
          queue.tube.ipfs_remove:release(task[1], {delay=10})
        end
      end
    end
  end
end

  1. Добавим функции для включения и выключения триггера и воркеров:

Код
local function activate()
  if not ran then
    -- запускаем воркеры
    ran = true
    fiber.create(add_queue_worker)
    fiber.create(rm_queue_worker)

    -- включаем триггеры
    if #box.space.pinned:on_replace() == 0 then
      box.space.pinned:on_replace(on_replace)
    end
  end
end

local function deactivate()
  if ran then
    -- останавливаем воркеры
    ran = false

    -- выключаем триггеры
    if #box.space.pinned:on_replace() == 1 then
      box.space.pinned:on_replace(nil, on_replace)
    end
  end
end

  1. Ну и наконец запихнём в роль включение триггера и воркеров только для мастера:

Код
local function init(opts) -- luacheck: no unused args
  init_schema.init_schema(opts.is_master)

  rawset(_G, 'ipfs', storage_api)

  if opts.is_master then
    storage_master_service.activate()
  else
    storage_master_service.deactivate()
  end

  return true
end

local function stop() -- luacheck: no unused args
  rawset(_G, 'ipfs')

  storage_master_service.deactivate()

  return true
end

local function apply_config(conf, opts) -- luacheck: no unused args
  init_schema.init_schema(opts.is_master)

  if opts.is_master then
    storage_master_service.activate()
  else
    storage_master_service.deactivate()
  end
  return true
end

Запускаем и тестируем


Запускать будем в Docker. При этом в один контейнер поместим сразу и Tarantool, и IPFS (по одному экземпляру). Для этого воспользуемся cartridge pack docker и следующим Docker-файлом:


FROM centos:7

RUN yum -y update; yum -y install wget; \
    wget https://dist.ipfs.io/go-ipfs/v0.12.2/go-ipfs_v0.12.2_linux-amd64.tar.gz; \
    tar -xvzf go-ipfs_v0.12.2_linux-amd64.tar.gz; \
    cd go-ipfs; \
    ./install.sh

Теперь с помощью docker-compose соберём кластер (полный Docker-файл можно посмотреть в репозитории):


Код
version: '2.1'

services:
  Tarantool-ipfs1:
    image: tnt_ipfs:0.1.0.0
    container_name: Tarantool-ipfs1
    environment:
      - IPFS_PATH=/data-ipfs
      - CARTRIDGE_DATA_DIR=/data
      - TARANTOOL_INSTANCE_NAME=r1-router
      - TARANTOOL_ADVERTISE_URI=172.19.1.1:3301
      - TARANTOOL_CLUSTER_COOKIE=secret
      - TARANTOOL_HTTP_PORT=8001
    ports:
      - 4001:4001 # IPFS p2p
      - 5001:5001 # IPFS API
      - 8081:8081 # IPFS Gateway
      - 3301:3301 # Tarantool Iproto
      - 8001:8001 # Tarantool HTTP
    volumes:
      - ../data/Tarantool-ipfs1:/data
      - ../data/Tarantool-ipfs1-ipfs:/data-ipfs
    networks: # Нужно. чтобы Tarantool не ругался при запуске на смену IP
      main:
        ipv4_address: 172.19.1.1

...

networks:
  main:
    ipam:
      config:
        - subnet: 172.19.1.0/16

Чтобы IPFS запускалась вместе с Tarantool, добавим в начало init.lua строку require('os').execute('ipfs daemon --init --writable --enable-gc &')


Остаётся сконфигурировать IPFS. Сделать это можно только из Docker, так как по умолчанию API принимает запросы только от 127.0.0.1. Также надо указать IP, на котором мы открывали порт для входящих соединений.


#!/usr/bin/env bash

my_ip="X.X.X.X"

for i in 1 2 3 4 5; do
  docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config?arg=gateway_pub_address&arg=http://192.168.1.69:808$i"
  docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config?arg=Addresses.API&arg=/ip4/0.0.0.0/tcp/5001"
  docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config?arg=Addresses.Announce&arg=\[\"/ip4/$my_ip/tcp/400$i\"\]&json=1"
  docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config?arg=Addresses.Gateway&arg=/ip4/0.0.0.0/tcp/8081"
  docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config/show"
  docker-compose restart "Tarantool-ipfs$i"
done

Заработало. Теперь можно поиграть, получив, например, вот этот объект: Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu. При этом IPFS будет поначалу довольно долго его искать, прежде чем закрепить, но закрепив, будет отдавать мгновенно.


И ещё немного скриншотов. Для REST API я предпочитаю использовать Insomnia. Файлик с коллекцией запросов в конце статьи.


Для теста мы будем использовать вот этот хеш: Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu.


  1. Получим по API IPFS список закреплённых сейчас файлов. Изначально он не пустой, это нормально.
  2. Посмотрим статус закрепления: GET /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu/status.

    Как и ожидалось, о закреплении пока ничего не известно.
  3. Воспользуемся нашим API и закрепим наш объект PUT /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu.
  4. Посмотрим статус закрепления снова, если делать это очень быстро, то можно поймать момент, когда он будет Wait pining, но потом станет Pinned:



  5. Снова получим по API IPFS список закреплённых сейчас файлов.

    Файл закреплён и теперь точно никуда не уйдёт из хранилища.
  6. Посмотрим на сам файл GET /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu:

    Если раскрыть раздел Timeline, то можно увидеть тот самый редирект.
  7. Попробуем открепить DELETE /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu:
  8. Посмотрим статус закрепления снова, если делать это очень быстро, то можно поймать момент, когда он будет Wait pining, но потом станет Pinned:




Обратите внимание, что GET /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu вполне успешно выполнится, даже если файл не закреплён. Однако, если вы ранее не запрашивали или не закрепляли файл, запрос будет выполняться дольше. Дело в том что IPFS Gateway будет пытаться найти файл на чужих нодах. А незакрепленный файл со временем может быть вытеснен из нашей ноды. И если при этом никто в интернете его не сохранит, то он и вовсе будет безвозвратно утрачен.


Добавляем индексный слой


Если вы не знакомы с индексными слоями, то общие идеи их построения можно посмотреть здесь: https://habr.com/ru/company/vk/blog/657789/.


Схема данных


Индексный слой будет состоять из следующих записей:


path bucket_id name is_dir ipfs_id
полный путь до родительского каталога bucket_id имя является ли запись каталогом хеш в IPFS

*bucket_id строится по path


Таким образом можно будет рекурсивно искать файлы.


Сразу добавим в код создание спейсов:


  local objects = box.schema.space.create('objects',{format={
      {'path', 'string'},
      {'bucket_id', 'unsigned'},
      {'name', 'string'},
      {'is_dir', 'boolean'},
      {'ipfs_id', 'string', is_nullable = true}
    }, if_not_exists=true})
  objects:create_index('primary', {parts={{1, 'string'},{3, 'string'}}, unique=true, if_not_exists=true})
  objects:create_index('bucket_id', {parts={{2, 'unsigned'}}, unique=false, if_not_exists=true})

Реализация


Нам понадобится обрабатывать такие запросы к роутеру:


  1. HTTP-запрос на добавление директории: PUT /tnt_ipfs/storage/*full_path?type=dir
  2. HTTP-запрос на добавление объекта: PUT /tnt_ipfs/storage/*full_path?type=object&ipfs_id=...
  3. HTTP-запрос, перенаправляющий на объект в IPFS или отдающий листинг директории: GET /tnt_ipfs/storage/*full_path
  4. HTTP-запрос на удаление директории или объекта: DELETE /tnt_ipfs/storage/*full_path

И запросы к хранилищу:


  1. Запрос на добавление директорий или объектов: storage.add
  2. Запрос на удаление директорий или объектов: storage.del
  3. Запрос на получение информации о директории или объекте: storage.get
  4. Запрос на листинг директории: storage.ls

Код для роли роутера


Код
local function convert_path(full_path)
  local path, name = full_path:match("^(.*)/(.+)$")
  if not path then
    return '', full_path
  end
  return path, name
end

...

  httpd:route({method = 'PUT', path = '/tnt_ipfs/storage/*full_path', public = true}, function(req)
      local full_path = req:stash('full_path')
      local path, name = convert_path(full_path)
      local bucket_id = vshard.router.bucket_id_mpcrc32(path)
      local type = req:query_param('type')
      if type == 'dir' then
        local _, err = vshard.router.callrw(bucket_id, 'storage.add', {path, bucket_id, name, true})
        if err ~= nil then
          return {
            status = 500,
            body = err
          }
        end
        return {status = 200}
      elseif type == 'object' then
        local ipfs_id = req:query_param('ipfs_id')
        local bucket_id2 = vshard.router.bucket_id_mpcrc32(ipfs_id)
        local _, err = vshard.router.callrw(bucket_id2, 'ipfs.pin_write', {{ipfs_id, bucket_id2}})
        if err ~= nil then
          return {
            status = 500,
            body = err
          }
        end
        _, err = vshard.router.callrw(bucket_id, 'storage.add', {path, bucket_id, name, false, ipfs_id})
        if err ~= nil then
          return {
            status = 500,
            body = err
          }
        end
        return {status = 200}
      end
      return {status = 400}
    end)

  httpd:route({method = 'DELETE', path = '/tnt_ipfs/storage/*full_path', public = true}, function(req)
      local full_path = req:stash('full_path')
      local path, name = convert_path(full_path)
      local bucket_id = vshard.router.bucket_id_mpcrc32(path)
      local deleted, err = vshard.router.callrw(bucket_id, 'storage.del', {path, name})
      if deleted == nil or err ~= nil then
        return {
          status = 500,
          body = err
        }
      end
      if deleted.is_dir then
        return {status = 200}
      else
        bucket_id = vshard.router.bucket_id_mpcrc32(deleted.ipfs_id)
        local _, err2 = vshard.router.callrw(bucket_id, 'ipfs.pin_delete', {deleted.ipfs_id})
        if err2 ~= nil then
          return {
            status = 500,
            body = err2
          }
        end
        return {status = 200}
      end
    end)

  httpd:route({method = 'GET', path = '/tnt_ipfs/storage/*full_path', public = true}, function(req)
      local full_path = req:stash('full_path')
      local path, name = convert_path(full_path)
      local bucket_id = vshard.router.bucket_id_mpcrc32(path)
      local main_record, err
      if path == '' then --каталог в корне
        main_record = {is_dir=true}
      else
        main_record, err = vshard.router.callro(bucket_id, 'storage.get', {path,name})
        if err ~= nil then
          return {
            status = 500,
            body = err
          }
        end
        if main_record == nil then
          return {status = 404}
        end
      end
      if main_record.is_dir then
        bucket_id = vshard.router.bucket_id_mpcrc32(full_path)
        local result, err2 = vshard.router.callro(bucket_id, 'storage.ls', {full_path})
        if err2 ~= nil then
          return {
            status = 500,
            body = err2
          }
        end
        return {
          status = 200,
          body = result
        }
      else
        bucket_id = vshard.router.bucket_id_mpcrc32(main_record.ipfs_id)
        local gateway_addr, err2 = vshard.router.callbro(bucket_id, 'ipfs.get_local_ipfs_addr')
        if err2 ~= nil then
          return {
            status = 500,
            body = err2
          }
        end
        return req:redirect_to(gateway_addr..'/ipfs/'..main_record.ipfs_id)
      end
    end)

API хранилища


Код
local function storage_add(path,bucket_id,name,is_dir,ipfs_id)
  if is_dir then
    box.space.objects:put({path,bucket_id,name,true})
  else
    box.space.objects:put({path,bucket_id,name,false,ipfs_id})
  end
end

local function storage_del(path,name)
  local data = box.space.objects:delete({path,name})
  assert(data ~= nil)
  return data:tomap({names_only=true})
end

local function storage_get(path,name)
  local data = box.space.objects:get({path,name})
  assert(data ~= nil)
  return data:tomap({names_only=true})
end

local function storage_ls(path)
  local result = {}
  for _,i in box.space.objects:pairs({path}) do
    table.insert(result, {name = i.name, is_dir = i.is_dir})
  end
  return result
end

Запускаем и тестируем


  1. Создадим каталог PUT /tnt_ipfs/storage/a?type=dir. Его мы увидеть не сможем, потому что он пустой.
  2. Создадим второй каталог внутри первого: PUT /tnt_ipfs/storage/a/b?type=dir.
  3. Посмотрим содержимое первого каталога: GET /tnt_ipfs/storage/a.
  4. Создадим объект внутри первого каталога: PUT /tnt_ipfs/storage/a/c?type=object&ipfs_id=Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu.
  5. Снова посмотрим содержимое первого каталога: GET /tnt_ipfs/storage/a.
  6. Посмотрим на сам файл: GET /tnt_ipfs/storage/a/c:

    Если в разделе Timeline всё тот же редирект.
  7. Попробуем удалять файлы и директории:
    DELETE /tnt_ipfs/storage/a/b:

    DELETE /tnt_ipfs/storage/a/с:

Смотрим, как всё классно. Обсуждаем, что получилось


Чем хорошо это решение?


Приложив не так много усилий, мы получили работающий MVP объектного хранилища. И
если заменить gateway_pub_address, то можно использовать серверы сторонних провайдеров для кеша ваших данных.


Решение гарантирует отказоустойчивость при проблемах с серверами в n-1 ЦОДах при условии, что все наборы реплик собраны из n хранилищ в разных ЦОДах. Также мы получили из коробки балансировку по всем нодам IPFS.


Как можно применить это на практике?


  • Хранить большие объёмы публичных данных.
  • Бекапить большие объёмы данных, уже залитых в IPFS. К примеру, NFT-токены записаны в блокчейне Ethereum как хеши, при этом сами файлы находятся в IPFS.
  • По описанным в статье принципам можно шардировать и другие приложения.

Чего нехватает, чтобы это решение можно было назвать Prod-Ready?


Есть много причин считать это решение, скорее, MVP, нежели Prod-Ready. Однако его доработка явно выходит за рамки статьи.


  • Инструменты. Настоящее Prod-Ready решение должно содержать инструменты для мониторинга, исправления ошибок и замера использования пространства.


  • Reference counter. По-хорошему, на закреплениях должен быть Reference counter, чтобы их можно было безопасно добавлять в поиск несколько раз (иначе при удалении одного места в поиске произойдёт открепление и данные могут потеряться).


  • Добавление метаданных. В настоящих объектных хранилищах есть метаданные, которые могут использоваться для поиска, управления длительностью хранения файлов и прочего.


  • Разграничение прав. Хоть это и публичное хранилище, управление правами на запись в поисковый слой и ограничение пространства для разных пользователей всё равно является полезной функциональностью.


  • Альтернативные варианты шардирования. Существует более продвинутые варианты шардирования, учитывающие объём данных, нагрузку на диск и сеть. Их также можно применить к конечному приложению, поставив для управления и поиска Tarantool. Однако это не будет простым повторением шардирования, что многократно усложнит задачу.



Репозиторий с полным кодом: https://github.com/Artem3213212/TNT-IPFS

Комментарии (3)


  1. Gabibovv
    11.08.2022 19:24
    -1

    Полезная статья!


  1. slimlv
    13.08.2022 10:06

    Что происходит при потере одной ноды со всеми данными ?


    1. Artem3213212 Автор
      13.08.2022 10:24

      Данные останутся на остальных нодах в репликасете. Если умер мастер, и включен failover то, репликасет сам выберет нового мастера и продолжит работу в штатном режиме. Если умер мастер и файловер выключен, то выбрать нового мастера можно руками. Если в репликасете остался один сервер, то рекомендуется или добавить в репликасет новые сервера или, если нет такой возможности и места хватает, можно выставить вес репликасета в 0 и данные распределяться по другим серверам. Все манипуляции описанные выше можно сделать через WEB UI тарантула.