image

Привет, Хабр! Карма слита из-за неосторожного комента под холиварной статьей, а значит нужно написать интересный (я надеюсь) пост и реабилитироваться.

Я несколько лет пользуюсь серверным telegram клиентом на php. И как многие пользователи — устал от постоянного роста потребления памяти. Некоторые сессии могут занимать от 1 до 8 гигабайт RAM! Поддержка баз данных была уже давно обещана, но подвижек в этом направлении не было. Пришлось решать проблему самому :) Популярность open source проекта, накладывала интересные требования на pull request:

  1. Обратная совместимость. Все существующие сессии должны продолжить работать в новой версии (сессия — это сериализованный инстанс приложения в файле);
  2. Свобода выбора БД. Возможность менять тип хранилища без потери данных и в любой момент, так как у пользователей разные конфигурации окружения;
  3. Расширяемость. Простота добавления новых типов баз данных;
  4. Сохранить интерфейс. Код приложения, работающий с данными, не должен меняться;
  5. Асинхронность. Проект использует amphp, поэтому все операции с базами должны быть неблокирующими;

За подробностями приглашаю всех под кат.

Что будем переносить


Большую часть памяти в MadelineProto занимают данные о чатах, пользователях и файлах. Например, в кэше пиров (peer), у меня насчитывается более 20 тысяч записей. Это все пользователи, которых когда либо видел аккаунт (включая участников всех групп), а так же каналы, боты и группы. Чем старше и активнее аккаунт, тем больше данных в будет в памяти. Это десятки и сотни мегабайт, и большая часть из них не используется. Но очищать кэш целиком нельзя, потому что телеграмм сразу будет жестко ограничивать аккаунт при попытках многократно получать одни и те же данные. Например, после пересоздания сессии на моем публичном демо сервере, телеграм в течении недели на большинство запросов отвечал ошибкой FLOOD_WAIT и ничего толком не работало. После того как кеш прогрелся все пришло в норму.

С точки зрения кода эти данные хранятся в виде массивов в свойствах пары классов.

Архитектура


Исходя из требований родилась схема:

  • Все «тяжелые» массивы заменяем на объекты реализующие ArrayAccess;
  • Для каждого типа базы данных создаем свои классы, наследующие базовый;
  • Обьекты создаются и записываются в свойства во время __consrtuct и __awake;
  • Абстрактная фабрика выбирает нужный класс для объекта, в зависимости от выбранной базы данных в настройках приложения;
  • Если в приложении уже есть другой тип хранилища, то считываем оттуда все данные и записываем массив в новое хранилище.

Проблемы асинхронного мира


Перым делом я создал интерфейсы и класс для хранения массивов в памяти. Это был вариант по умолчанию, идентичный по поведению старой версии программы. В первый вечер я был очень воодушевлен успехами прототипа. Код получался красивый и простой. Пока не обнаружилось, что нельзя использовать генераторы внутри методов интерфейса Iterator и внутри методов отвечающих за unset и isset.

Тут нужно пояснить, что amphp использует синтаксис генераторов для реализации асинхронности в php. Yield становится аналогом async… await из js. Если какой-то метод использует асинхронность, то что бы получить из него результат, нужно дождаться этого результата в коде с помощью yield. Например:

<?php

include 'vendor/autoload.php';

$MadelineProto = new \danog\MadelineProto\API('session.madeline');
$MadelineProto->async(true);

$MadelineProto->loop(function() use($MadelineProto) {
    $myAsyncFunction = function() use($MadelineProto): \Generator {
        $me = yield $MadelineProto->start();
        yield $MadelineProto->echo(json_encode($me, JSON_PRETTY_PRINT|JSON_UNESCAPED_UNICODE));
    };

    yield $myAsyncFunction();
});

Если из строки
yield $myAsyncFunction();
убрать yield, то приложение будет завершено до того, как этот код будет выполнен. Мы не получим результат.

Добавить yield перед вызовом методов и функций не очень сложно. Но так как используется интерфейс ArrayAccess, то методы не вызываются напрямую. Например, unset() вызывает метод offsetUnset(), а isset() — offsetIsset(). Аналогичная ситуация с итераторами типа foreach при использовании интерфейса Iterator.

Добавление yield перед встроенными методами вызывает ошибку, так как эти методы не предназначены для работы с генераторами. Чуть подробнее в коментах: тут и тут.

Пришлось пойти на компромис и переписать код на использование собственных методов. К счастью таких мест было очень мало. В большинстве случаев массивы использовались для чтения или записи по ключу. Этот функционал с генераторами отлично подружился.

Получился такой интерфейс:

<?php

use Amp\Producer;
use Amp\Promise;

interface DbArray extends DbType, \ArrayAccess, \Countable
{
    public function getArrayCopy(): Promise;
    public function isset($key): Promise;
    public function offsetGet($offset): Promise;
    public function offsetSet($offset, $value);
    public function offsetUnset($offset): Promise;
    public function count(): Promise;
    public function getIterator(): Producer;

    /**
     * @deprecated
     * @internal
     * @see DbArray::isset();
     *
     * @param mixed $offset
     *
     * @return bool
     */
    public function offsetExists($offset);
}

Примеры работы с данными

<?php
...
//Чтение
$existingChat = yield $this->chats[$user['id']];

//Запись. 
yield $this->chats[$user['id']] = $user;
//Можно использовать без yield, тогда мы не ждем записи в базу и код выполняется дальше.
$this->chats[$user['id']] = $user;


//unset
yield $this->chats->offsetUnset($id);

//foreach
$iterator = $this->chats->getIterator();
while (yield $iterator->advance()) {
    [$key, $value] = $iterator->getCurrent();
    //обрабатываем элементы массива
}

Хранение данных


Самый простой способ хранить данные — в сериализованном виде. От использования json пришлось отказаться ради поддержки объектов. У таблицы две основных колонки: ключ и значение.

Пример sql запроса на создание таблицы:

            CREATE TABLE IF NOT EXISTS `{$this->table}`
            (
                `key` VARCHAR(255) NOT NULL,
                `value` MEDIUMBLOB NULL,
                `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                PRIMARY KEY (`key`)
            )
            ENGINE = InnoDB
            CHARACTER SET 'utf8mb4' 
            COLLATE 'utf8mb4_general_ci'

При каждом старте приложения мы пытаемся создать таблицу для каждого свойства. Telegram клиенты не рекомендуется рестартить чаще чем раз в несколько часов, так что у нас не будет нескольких запросов на создание таблиц в секунду :)

Так как primary ключ без автоинкремента, то и вставку и обновление данных можно делать одним запросом, как в обычном массиве:

INSERT INTO `{$this->table}` 
            SET `key` = :index, `value` = :value 
            ON DUPLICATE KEY UPDATE `value` = :value

На каждую переменную создается таблица с именем в формате %id_аккаунта%_%класс%_%имя_переменной%. Но при первом старте приложения никакого аккаунта еще нет. В таком случае приходится генерировать случайный временный id с префиксом tmp. При каждом запуске класс каждой переменной проверяет не появился ли id аккаунта. Если id есть, то таблицы будут переименованы.

Индексы


Структура базы данных максимально простая, что бы добавление новых свойств в будущем происходило автоматически. Нет никаких связей. Используются только PRIMARY индексы по ключу. Но бывают ситуации, когда нужно искать по другим полям.

Например, есть массив/таблица chats. Ключ в ней — это id чата. Но часто приходится искать по username. Когда приложение хранило данные в массивах, то поиск по username осуществлялся обычным перебором массива в foreach. Такой перебор работал с приемлемой скоростью в памяти, но не в базе. Поэтому была создана еще одна таблица/массив и соответствющее свойство в классе. В ключе username, в значении — id чата. Единственный минус такого подхода — приходится писать дополнительный код для синхронизации двух таблиц.

Кэширование


Локальная mysql работает быстро, но немного кеширования никогда не помешает. Особенно если одно и тоже значение используется несколько раз подряд. Например, сначала проверяем наличие чата в базе, а потом достаем из него какие-нибудь данные.

Был написан простенький велосипед trait.

<?php

namespace danog\MadelineProto\Db;

use Amp\Loop;
use danog\MadelineProto\Logger;

trait ArrayCacheTrait
{
    /**
     * Values stored in this format:
     * [
     *      [
     *          'value' => mixed,
     *          'ttl' => int
     *      ],
     *      ...
     * ].
     * @var array
     */
    protected array $cache = [];
    protected string $ttl = '+5 minutes';
    private string $ttlCheckInterval = '+1 minute';

    protected function getCache(string $key, $default = null)
    {
        $cacheItem = $this->cache[$key] ?? null;
        $result = $default;

        if (\is_array($cacheItem)) {
            $result = $cacheItem['value'];
            $this->cache[$key]['ttl'] = \strtotime($this->ttl);
        }

        return $result;
    }

    /**
     * Save item in cache.
     *
     * @param string $key
     * @param $value
     */
    protected function setCache(string $key, $value): void
    {
        $this->cache[$key] = [
            'value' => $value,
            'ttl' => \strtotime($this->ttl),
        ];
    }

    /**
     * Remove key from cache.
     *
     * @param string $key
     */
    protected function unsetCache(string $key): void
    {
        unset($this->cache[$key]);
    }

    protected function startCacheCleanupLoop(): void
    {
        Loop::repeat(\strtotime($this->ttlCheckInterval, 0) * 1000, fn () => $this->cleanupCache());
    }

    /**
     * Remove all keys from cache.
     */
    protected function cleanupCache(): void
    {
        $now = \time();
        $oldKeys = [];
        foreach ($this->cache as $cacheKey => $cacheValue) {
            if ($cacheValue['ttl'] < $now) {
                $oldKeys[] = $cacheKey;
            }
        }
        foreach ($oldKeys as $oldKey) {
            $this->unsetCache($oldKey);
        }

        Logger::log(
            \sprintf(
                "cache for table:%s; keys left: %s; keys removed: %s",
                $this->table,
                \count($this->cache),
                \count($oldKeys)
            ),
            Logger::VERBOSE
        );
    }
}

Особое внимание хочется обратить на startCacheCleanupLoop. Благодаря магии amphp инвалидация кеша максимально простая. Коллбэк запускается по указанному интервалу, проходит по всем значениям и сморит на поле ts, в котором хранится timestamp последнего обращения к этому элементу. Если обращение было более 5 минут назад (конфигурируется в настройках), то элемент удаляется. C помощью amphp очень легко реализовать аналог ttl из redis или memcache. Все это происходит в фоне и не блокирует основной поток.

C помощью кеша и асинхронности ускоряется не только чтение, но и запись.

Вот исходный код метода, записывающего данные в базу.

/**
     * Set value for an offset.
     *
     * @link https://php.net/manual/en/arrayiterator.offsetset.php
     *
     * @param string $index <p>
     * The index to set for.
     * </p>
     * @param $value
     *
     * @throws \Throwable
     */

    public function offsetSet($index, $value): Promise
    {
        if ($this->getCache($index) === $value) {
            return call(fn () =>null);
        }

        $this->setCache($index, $value);

        $request = $this->request(
            "
            INSERT INTO `{$this->table}` 
            SET `key` = :index, `value` = :value 
            ON DUPLICATE KEY UPDATE `value` = :value
        ",
            [
                'index' => $index,
                'value' => \serialize($value),
            ]
        );

        //Ensure that cache is synced with latest insert in case of concurrent requests.
        $request->onResolve(fn () => $this->setCache($index, $value));

        return $request;
    }

$this->request создает Promise, который записывает данные асинхронно. А операции с кешом происходят синхронно. То есть можно не дожидаться записи в базу и при этом быть уверенным, что операции чтения сразу начнут возвращать новые данные.

Очень полезным оказался метод onResolve из amphp. После завершения вставки данные будут еще раз записаны в кэш. Если какая-то операция записи запоздает и кэш и база начнут отличаться, то кэш обновится значением записанным в базу последним. Т.е. наш кэш снова станет консистентен с базой.

Исходный код


> Ссылка на pull request

А вот так просто другой пользователь добавил поддержку postgre. Потребовалось всего 5 минут, что бы написать инструкцию для него.

Количество кода можно было бы сократить, если вынести дублирующиеся методы в общий абстрактный класс SqlArray.

One more thing


Было замечено, что во время скачивания медиафайлов из telegram стандартный garbage collector php не справляется с работой и куски файла остаются в памяти. Обычно размер утечек совпадал с размером файла. Возможная причина: garbage collector автоматически срабатывает, когда накапливается 10 000 ссылок. В нашем случае ссылок было мало (десятки), но каждая могла ссылаться на мегабайты данных в памяти. Изучать тысячи строк кода с реализацией mtproto было очень лень. Почему бы сначала не попробовать элегантный костыль с \gc_collect_cycles();?

Удивительно, но он решал проблему. Значит, достаточно настроить периодический запуск очистки. К счастью, amphp дает простые инструменты для фонового выполнения через указанный интервал.

Очищать память каждую секунду показалось слишком просто и не очень эффективно. Остановился на алгоритме, который проверяет прирост памяти после последнего очищения. Очистка происходит, если прирост больше порогового значения.

<?php

namespace danog\MadelineProto\MTProtoTools;

use Amp\Loop;
use danog\MadelineProto\Logger;

class GarbageCollector
{
    /**
     * Ensure only one instance of GarbageCollector
     * 		when multiple instances of MadelineProto running.
     * @var bool
     */
    public static bool $lock = false;

    /**
     * How often will check memory.
     * @var int
     */
    public static int $checkIntervalMs = 1000;

    /**
     * Next cleanup will be triggered when memory consumption will increase by this amount.
     * @var int
     */
    public static int $memoryDiffMb = 1;

    /**
     * Memory consumption after last cleanup.
     * @var int
     */
    private static int $memoryConsumption = 0;

    public static function start(): void
    {
        if (static::$lock) {
            return;
        }
        static::$lock = true;

        Loop::repeat(static::$checkIntervalMs, static function () {
            $currentMemory = static::getMemoryConsumption();
            if ($currentMemory > static::$memoryConsumption + static::$memoryDiffMb) {
                \gc_collect_cycles();
                static::$memoryConsumption = static::getMemoryConsumption();
                $cleanedMemory = $currentMemory - static::$memoryConsumption;
                Logger::log("gc_collect_cycles done. Cleaned memory: $cleanedMemory Mb", Logger::VERBOSE);
            }
        });
    }

    private static function getMemoryConsumption(): int
    {
        $memory = \round(\memory_get_usage()/1024/1024, 1);
        Logger::log("Memory consumption: $memory Mb", Logger::ULTRA_VERBOSE);
        return (int) $memory;
    }
}