Использование монги в production — достаточно спорная тема.
С одной стороный все просто и удобно: положили данные, настроили репликацию, понимаем как шардировать базу при росте объема данных. С другой стороны существует достаточно много страшилок, Aphyr в своем последнем jepsen тесте сделал не очень позитивные выводы.


По факту оказывается, что есть достаточно много проектов, где mongo является основным хранилищем данных, и нас часто спрашивали про поддержку mongodb в окметр. Мы долго тянули с этой задачей, потому что сделать "осмысленный" мониторинг на порядок сложнее, чем просто собрать какие-то метрики и настроить какие-нибудь алерты. Нужно сначала разобраться в особенностях поведения софта, чтобы понять, какие именно показатели отслеживать.


Как раз про сложности и проблемы я и хочу рассказать на примере реализации мониторинга запросов к mongodb.


На любую базу данных нужно смотреть с трех сторон:


  • Мониторинг ресурсов сервера (процессор, память, дисковая подсистема, сеть). Тут нет ничего сложного, большинство систем мониторинга с этим справляются достаточно хорошо.


  • Мониторинг внутренностей БД (соединения, индексы, кэши, работа с диском, временные таблицы, репликация, сортировки, ...). Такие метрики обычно нужны для понимания, как перенастроить БД, каких ресурсов сервера не хватает, какие индексы создать.


  • Мониторинг запросов (сколько каких, какие запросы создают нагрузку, трафик, времена запросов). По нашему опыту большинство проблем с базой возникают при изменении профиля нагрузки/запросов от приложения, например:


    • появился какой-то новый неоптимальный запроса от приложения
    • изменились условия запроса и индекс перестал эффективно работать
    • выросла таблица и последовательное чтение перестало быть быстрым


Мы пока ограничились только мониторингом запросов.


Так как мы говорим о мониторинге, нас не интересует каждый конкретный запрос, мы скорее хотим все запросы сгруппировать по некоторому одинаковому плану выполнения (например postgresql в pg_stat_statements группирует запросы по реальному плану).


Для mongodb идентификатором запроса является тип запроса (find, insert, update, findAndModify, aggregate и другие), база данных, коллекция и bson документ с самим запросом.
Для простоты мы решили, что запросы можно сгруппировать, заменив все значения полей из запроса на "?" и отсортировав по полям.


Например запрос:


{"country": "RU", "city": "Moscow", "$orderby": {"age": -1}}

превращаем в


{country: ?, city: ?, $orderby: {age: ?}}

а потом сортируем по ключам


{$orderby: {age: ?}, city: ?, country: ?}

Скорее всего подобные запросы будут использовать одни и те же индексы вне зависимости от конкретных условий.


Следующий большой вопрос: как получать в реальном времени весь поток запросов.


Единственный штатный способ в mongodb — это profiler. Он записывает статистику по каждому запросу в ограниченную по размеру коллекцию (capped collection). Профайлер может записывать или только медленные запросы (если время исполнения больше заданного в slowOpThresholdMs) или записывать абсолютно все запросы. Во втором случае может просесть производительность самой mongodb.


К преимуществам данного подхода стоит отнести очень подробную статистику о выполнении каждого запроса.


Но для нас очень критично не оказать негативного влияния на производительность серверов наших клиентов, поэтому использовать профайлер в режиме записи всех запросов мы не можем. Только "медленных" запросов нам недостаточно, так как мы не увидим полной картины:


  • какие запросы создают наибольшую нагрузку на сервер
  • какие запросы создают основной входящий/исходящий трафик на сервер
  • по интересующим запросам посмотреть распределение времени ответа
  • каких запросов сколько в штуках в секунду

По нашему опыту проблемы чаще создают высокочастотные запросы, которые раньше выполнялись 1ms, а потом по какой-то причине стали выполняться к примеру 5ms. А запросы >100ms (дефолтный slowOpThresholdMs) обычно служебные (админка/статистика) и очень редкие.


Так как стандартный профайлер не подошел, мы стали копать в сторону сниффинга трафика. На первом этапе было необходимо выяснить ряд вопросов:


  • библиотеки для go (наш агент написан на golang) для сниффинга
  • производительность (сколько агент будет потреблять ресурсов при прослушивании большого потока трафика)
  • разбор протокола mongodb

Прототип нашего плагина mongodb был написан за несколько дней с использование библиотеки gopacket. Мы перехватывали пакеты через libpcap, разбирали протокол, bson документы десериализовались с использованием mgo.


Так как у нас нет инсталяции mongodb под нагрузкой, мы сделали стенд и запустили готовый benchmark. В нашем случае mongodb и грузилка жили на одной виртуальной машине с 2 ядрами и 2Gb памяти. По нагрузкой мы видели около 10 тысяч пакетов в секунду при трафике ~60Mbit/s.


Наш прототип под такой нагрузкой утилизировал около 70% одного процессорного ядра. Стало понятно, что необходимо профилировать и оптимизировать код. Тут стоит отдать должное стандартному профайлеру golang, нам не нужно было ничего изобретать, а просто тюнить самые прожорливые по CPU участки кода и стараться как можно меньше аллоцировать память для снижения нагрузки на GC.


В точности процесс оптимизации я уже воспроизвести не смогу, но приведу примеры самых значительных изменений:


bson.Unmarshal медленный


Bson документ запроса в mongo — это грубо говоря словарь, значения которого могут быть в том числе и такими же словарями.
Так как с самого начала мы решили, что будем нормализовывать запросы, можем вообще не читать значения элементов исходного словаря, если они не являются словарями.
Берем спецификацию и пишем свой примитивный десериализатор. В итоге получилась функция ~100 строк


для примера приведу кусок разбора элемента словаря
elementValueType, err = reader.ReadByte()
if err != nil {
    break
}
payload, err = reader.ReadBytes(nullByte)
if err != nil {
    break
}
elementName = string(payload)
switch elementValueType {
case bsonDouble, bsonDatetime, bsonTimestamp, bsonInt64:
    if _, err = reader.ReadN(8); err != nil {
        break
    }
case bsonString:
    l, err = reader.ReadInt()
    if err != nil {
        break
    }
    payload, err = reader.ReadN(l)
    if err != nil {
        break
    }
    elementValue = string(payload[:len(payload)-1])
case bsonJsCode, bsonDeprecated, bsonBinary, bsonJsWithScope, bsonArray:
    l, err = reader.ReadInt()
    if err != nil {
        break
    }
    if _, err = reader.ReadN(l - 4); err != nil {
        break
    }
case bsonDoc:
    elementValue, _, _, err = readDocument(reader)
    if err != nil {
        break
    }
case bsonObjId:
    if _, err = reader.ReadN(12); err != nil {
        break
    }
case bsonBool:
    if _, err = reader.ReadByte(); err != nil {
        break
    }
case bsonRegexp:
    if _, err = reader.ReadBytes(nullByte); err != nil {
        break
    }
    if _, err = reader.ReadBytes(nullByte); err != nil {
        break
    }
case bsonDbPointer:
    l, err = reader.ReadInt()
    if err != nil {
        break
    }
    if _, err = reader.ReadN(l - 4 + 12); err != nil {
        break
    }
case bsonInt32:
    if _, err = reader.ReadN(4); err != nil {
        break
    }
}

Из всех вариантов полей мы читаем значения только для bsonDocument (рекурсивно вызывая себя же) и bsonString (у нас есть дополнительная логика по определению коллекции и типа запроса), остальные поля мы просто пропускаем.


Как ловить пакеты


На наших тестах использование raw sockets напрямую оказалось быстре, чем через pcap.
Возможно это было из-за старой версии libpcap, но мы планировали делать сниффер только под linux, поэтому решили не разбираться, а использовать gopacket.af_packet (тем более не нужно линковать агента с libpcap).


Raw sockets — это специльные сокеты в linux, через которые можно отправить полностью сформированный в userspace (а не ядре) пакет или получить пакеты с определенного сетевого интерфейса. Если говорить про сниффинг, пакеты от ядра попадают в userspace через циклический буфер, что позволяет не делать syscall на перехват каждого пакета. На эту тему есть подробный хардкор в документации ядра.


ZeroCopy


Так как мы обрабатываем пакеты в один поток, то можем использовать "ZeroCopy" интерфейс сниффера. Но при этом нужно помнить, что ссылок на данный участок памяти дальше в коде оставлять нельзя.


Разбор пакетов


Интерфейс разбора пакетов в gopacket устроен довольно гибко, поддерживает из коробки много разных протоколов, пользователю не нужно думать о том, как инкапсулированы данные верхнего уровня. Но вместе с этим этот интерфейс навязывает необходимость большого числа копирований данных и как следствие большую нагрузку как на CPU так и на GC.


Мы опять решили откинуть все лишнее:)


Наша задача из исходного ethernet фрэйма (а на выходе AF_PACKET мы получаем всегда ethernet) получить:


  • source ip
  • destination ip
  • source port
  • destination port
  • TCP seq (ниже объясню, зачем он нужен)
  • TCP payload (собственно данные протокола верхнего уровня)

Для простоты было решено пока не поддерживать IPv6.


В итоге получилась вот такая страшная функция
func DecodePacket(data []byte, linkType layers.LinkType, packet *TcpIpPacket) (err error) {
    var l uint16
    switch linkType {
    case layers.LinkTypeEthernet:
        if len(data) < 14 {
            ethernetTooSmall.Inc(1)
            err = errors.New("Ethernet packet too small")
            return
        }
        l = binary.BigEndian.Uint16(data[12:14])
        switch layers.EthernetType(l) {
        case layers.EthernetTypeIPv4:
            data = data[14:]
        case layers.EthernetTypeLLC:
            l = uint16(data[2])
            if l&0x1 == 0 || l&0x3 == 0x1 {
                data = data[4:]
            } else {
                data = data[3:]
            }
        default:
            ethernetUnsupportedType.Inc(1)
            err = errors.New("Unsupported ethernet type")
            return
        }
    default:
        unsupportedLinkProto.Inc(1)
        err = errors.New("Unsupported link protocol")
        return
    }
    //IP
    var cmp int
    if len(data) < 20 {
        ipTooSmallLength.Inc(1)
        err = errors.New("Too small IP length")
        return
    }
    version := data[0] >> 4
    switch version {
    case 4:
        if binary.BigEndian.Uint16(data[6:8])&0x1FFF != 0 {
            ipNonFirstFragment.Inc(1)
            err = errors.New("Non first IP fragment")
            return
        }
        if len(data) < 20 {
            ipTooSmall.Inc(1)
            err = errors.New("Too small IP packet")
            return
        }
        hl := uint8(data[0]) & 0x0F
        l = binary.BigEndian.Uint16(data[2:4])
        packet.SrcIp[0] = data[12]
        packet.SrcIp[1] = data[13]
        packet.SrcIp[2] = data[14]
        packet.SrcIp[3] = data[15]

        packet.DstIp[0] = data[16]
        packet.DstIp[1] = data[17]
        packet.DstIp[2] = data[18]
        packet.DstIp[3] = data[19]

        if l < 20 {
            ipTooSmallLength.Inc(1)
            err = errors.New("Too small IP length")
            return
        } else if hl < 5 {
            ipTooSmallHeaderLength.Inc(1)
            err = errors.New("Too small IP header length")
            return
        } else if int(hl*4) > int(l) {
            ipInvalieHeaderLength.Inc(1)
            err = errors.New("Invalid IP header length > IP length")
            return
        }
        if cmp = len(data) - int(l); cmp > 0 {
            data = data[:l]
        } else if cmp < 0 {
            if int(hl)*4 > len(data) {
                ipTruncatedHeader.Inc(1)
                err = errors.New("Not all IP header bytes available")
                return
            }
        }
        data = data[hl*4:]
    case 6:
        ipV6IsNotSupported.Inc(1)
        err = errors.New("IPv6 is not supported")
        return
    default:
        ipInvalidVersion.Inc(1)
        err = errors.New("Invalid IP packet version")
        return
    }
    //TCP
    if len(data) < 13 {
        tcpTooSmall.Inc(1)
        err = errors.New("Too small TCP packet")
        return
    }
    packet.SrcPort = binary.BigEndian.Uint16(data[0:2])
    packet.DstPort = binary.BigEndian.Uint16(data[2:4])
    packet.Seq = binary.BigEndian.Uint32(data[4:8])

    dataOffset := data[12] >> 4
    if dataOffset < 5 {
        tcpInvalidDataOffset.Inc(1)
        err = errors.New("Invalid TCP data offset")
        return
    }
    dataStart := int(dataOffset) * 4
    if dataStart > len(data) {
        tcpOffsetGreaterThanPacket.Inc(1)
        err = errors.New("TCP data offset greater than packet length")
        return
    }
    packet.Payload = data[dataStart:]
    return
}

Для подобных функций всегда стоит писать бенчмарки, на этот раз получилась достаточно приятная картина:


Benchmark_DecodePacket-4    50000000            27.9 ns/op
Benchmark_Gopacket-4         1000000          3351 ns/op

То есть мы получили ускорение больше чем в 100 раз.


Значительнуя часть кода этой функции занимает обработка ошибок, там же видно инкременты разных счетчиков из которых мы потом делаем служебные метрики агента и можем легко понять, почему у нас как-то не так работает сниффер. Например, о необходимости добавить поддержку IPv6 мы планируем узнать именно по такой метрике.


Еще мы не пытаемся склеивать tcp payload из разных пакетов, в случае когда данные не влезают в 1 ethernet фрэйм.
Если такой пакет — ответ mongodb, нас интересует только заголовок, а для больших insert запросов например, мы просто возьмем часть запроса из первого пакета.


Дубли пакетов


Выяснилось, что если клиент и сервер находятся на одном сервере, то мы ловим один и тот же пакет 2 раза.
Пришлось делать простой дедубликатор пакетов на основе src ip+port, dest ip+port и TCP seq.


Итого


  • В результате на нашем бенчмарке агент стал потреблять ~5% ядра вместо 70%
  • На этом мы пока решили остановиться с оптимизациями, но осталось несколько идей, как еще немного ускориться
  • Под реальной нагрузкой у клиентов агент работает примерно с теми же показателями (потребление cpu в той же пропорции к количеству пакетов, что и на бенчмарке)
Поделиться с друзьями
-->

Комментарии (6)


  1. dukelion
    29.08.2016 11:39

    А почему не использовали в такой ситуации инструментирование на клиенте?


    1. NikolaySivko
      29.08.2016 11:42

      Как правило наши клиенты не хотят ничего настраивать, а тем более модифицировать свои приложения. Приходится подстраиваться:)


  1. MaprapuH
    29.08.2016 14:17

    а исходники клиента можно посмотреть?


    1. NikolaySivko
      29.08.2016 14:18
      +1

      О каком клиенте идет речь? Я рассказывал о нашем мониторинговом агенте, который ставится на сервера клиентов и собирает метрики. Агент в данный момент не opensource.


  1. mahimus
    30.08.2016 12:03

    Помнится, в монге версий 2.х, от порядка полей в запросе зависит выбор индекса для обработки запроса. Так что не думаю, что сортировка — хорошая идея. Ее отключение позволит выявить неоптимизированные запросы.

    Насчет влияния порядка полей в версиях 3.х — не уверен.


    1. NikolaySivko
      30.08.2016 12:09

      Вообще говоря bson тип Document (он же dict/map) не сохраняет порядок полей. Мы сортируем, чтобы покрыть этот случай в первую очередь.