Меня зовут Артём, я работаю в Rambler Group в проектe «Поток» на позиции Go lead developer.
Мы потратили достаточно много времени на укрощение mysql binlog. В этой статье рассказ о том, как быстро и с минимальным количеством подводных камней внедрить механизм работы с бинлогом на Go.
Зачем нам это нужно?
Под капотом Потока есть высоконагруженные модули, где каждый запрос к базе отдаляет пользователя от получения результата. Кешировать — неплохое решение, но когда сбрасывать кеш? Пускай сами данные нам сообщают, что обновились.
В mysql есть такая вещь, как master-slave репликация. Наш демон может прикинуться slave и по binlog получать данные. Binlog должен быть настроен в row формате. В нем содержатся все команды изменений базы данных, команды под транзакцией выполняются только после commit. При достижении максимально разрешённого размера (1 гиг по умолчанию) создаётся следующий файл. Каждый новый файл имеет порядковый номер после имени.
Чуть больше инфы здесь или здесь.
В статье две части:
1. Как быстро запустить обработку записей пришедших в лог.
2. Как настраивать и расширять то, что под капотом.
Часть 1. Запускаемся как можно скорее.
Для работы с binlog будем пользоваться библиотекой github.com/siddontang/go-mysql
Подключимся к новому каналу (для работы с каналами требуется ROW format для binlog`а).
func binLogListener() {
c, err := getDefaultCanal()
if err == nil {
coords, err := c.GetMasterPos()
if err == nil {
c.SetEventHandler(&binlogHandler{})
c.RunFrom(coords)
}
}
}
func getDefaultCanal() (*canal.Canal, error) {
cfg := canal.NewDefaultConfig()
cfg.Addr = fmt.Sprintf("%s:%d", "127.0.0.1", 3306)
cfg.User = "root"
cfg.Password = "root"
cfg.Flavor = "mysql"
cfg.Dump.ExecutionPath = ""
return canal.NewCanal(cfg)
}
Создадим обертку над бинлогом:
type binlogHandler struct {
canal.DummyEventHandler // Стандартная заготовка из библиотеки
BinlogParser // Наш кастомный хелпер для обработки
}
func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {return nil}
func (h *binlogHandler) String() string {return "binlogHandler"}
BinlogParser
Расширим логику работы с полученной строкой бинлога, добавив логику в метод OnRow().
func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {
var n int //начальное значение
var k int // шаг
switch e.Action {
case canal.DeleteAction:
return nil // за рамками примера
case canal.UpdateAction:
n = 1
k = 2
case canal.InsertAction:
n = 0
k = 1
}
for i := n; i < len(e.Rows); i += k {
key := e.Table.Schema + "." + e.Table.Name
switch key {
case User{}.SchemaName() + "." + User{}.TableName():
/*
Разбор данных юзера
*/
}
}
return nil
}
Суть данной обертки — разобрать пришедшие данные. Данные нам приходят по две записи на обновление строки (первая строка будет содержать исходные данные, вторая — обновленные). Тут же мы рассмотрим еще и возможность мультиинсертов и мультиапдейтов. В этом случае нам надо будет брать для UPDATE каждую вторую запись. Для этого в примере выше мы ввели n и k.
Сделаем модель для получения данных из binlog. В нее мы будем считывать данные из полученных строк. В аннотациях укажем названия колонок:
type User struct {
Id int `gorm:"column:id"`
Name string `gorm:"column:name"`
Status string `gorm:"column:status"`
Created time.Time `gorm:"column:created"`
}
func (User) TableName() string {
return "User"
}
func (User) SchemaName() string {
return "Test"
}
Структура таблицы в MYSQL:
CREATE TABLE Test.User
(
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(40) NULL ,
status ENUM("active","deleted") DEFAULT "active",
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP
)
ENGINE =InnoDB;
Разберем сам парсинг — на место заготовки под парсинг данных добавим:
user := User{}
h.GetBinLogData(&user, e, i)
По сути этого достаточно — у нас будут данные новой записи в модели user, но для наглядности выведем их:
if e.Action == canal.UpdateAction {
oldUser := User{}
h.GetBinLogData(&oldUser, e, i-1)
fmt.Printf("User %d is updated from name %s to name %s\n", user.Id, oldUser.Name, user.Name, )
} else {
fmt.Printf("User %d is created with name %s\n", user.Id, user.Name, )
}
Главный момент, к которому стремились — запустим наш “Hello binlog world”:
func main() {
go binLogListener()
// тут может быть ваш код
time.Sleep(2 * time.Minute)
fmt.Print("Thx for watching, goodbuy")
}
Дальше добавим и обновим значения:
INSERT INTO Test.User (`id`,`name`) VALUE (1,"Jack");
UPDATE Test.User SET name="Jonh" WHERE id=1;
Мы увидим:
User 1 is created with name Jack
User 1 name changed from Jack to Jonh
Полученный код работает с binlog и разбирает новые строки. При получении записи из нужной нам таблицы, код считывает данные в структуру и выводит результат. За кадром остался парсер данных (BinlogParser), который заполнял модель.
Часть 2. Как говорил Кобб, нам нужно на уровень ниже
Рассмотрим внутреннюю работу парсера, которая базируется на рефлексии.
Для заполнения модели данными мы использовали метод обработчика:
h.GetBinLogData(&user, e, i)
Он парсит простые типы данных:
bool
int
float64
string
time.Time
и может парсить сложные структуры из json.
Если поддерживаемых типов вам недостаточно или вы просто хотите понять, как работает разбор бинлога, то можно попрактиковаться в добавлении собственных типов.
Сначала рассмотрим как заполнить данные для поля модели на примере поля Id типа int:
type User struct {
Id int `gorm:"column:id"`
}
Через рефлексию получим наименование типа. Метод parseTagSetting преобразует аннотации в более удобную структуру:
element := User{} //на входе у нас обычно интерфейс, но тут рассмотрим сразу модель
v := reflect.ValueOf(element)
s := reflect.Indirect(v)
t := s.Type()
num := t.NumField()
parsedTag := parseTagSetting(t.Field(k).Tag)
if columnName, ok = parsedTag["COLUMN"]; !ok || columnName == "COLUMN" {
continue
}
for k := 0; k < num; k++ {
name := s.Field(k).Type().Name()
switch name {
case "int":
// тут будет разбор строки
}
}
Получив тип int, можно задать его значение через метод рефлексии:
func (v Value) SetInt(x int64) {//...
Метод для парсинга аннотаций:
func parseTagSetting(tags reflect.StructTag) map[string]string {
setting := map[string]string{}
for _, str := range []string{tags.Get("sql"), tags.Get("gorm")} {
tags := strings.Split(str, ";")
for _, value := range tags {
v := strings.Split(value, ":")
k := strings.TrimSpace(strings.ToUpper(v[0]))
if len(v) >= 2 {
setting[k] = strings.Join(v[1:], ":")
} else {
setting[k] = k
}
}
}
return setting
}
На вход он принимает int64. Сделаем метод, который переведет полученные данные из бинлога в int64:
func (m *BinlogParser) intHelper(e *canal.RowsEvent, n int, columnName string) int64 {
columnId := m.getBinlogIdByName(e, columnName)
if e.Table.Columns[columnId].Type != schema.TYPE_NUMBER {
panic("Not int type")
}
switch e.Rows[n][columnId].(type) {
case int8:
return int64(e.Rows[n][columnId].(int8))
case int32:
return int64(e.Rows[n][columnId].(int32))
case int64:
return e.Rows[n][columnId].(int64)
case int:
return int64(e.Rows[n][columnId].(int))
}
return 0
}
Все выглядит логично, кроме метода getBinlogIdByName().
Этот тривиальный хелпер нужен, чтобы работать с названиями колонок вместо их порядкового номера, что позволяет:
- брать названия колонок из gorm аннотаций;
- нет необходимости вносить правки при добавлении колонок в начало или середину;
- банально удобнее работать с полем name, чем с колонкой номер 3.
В итоге добавим сам обработчик:
s.Field(k).SetInt(m.intHelper(e, n, columnName))
Рассмотрим еще два примера
ENUM: тут значения приходят как index — то есть статус “active” придет как 1. Нам в большинстве случаев нужно строковое представление enum. Его можно получить из описания поля. При парсинге enum значения, оно приходит начиная с 1, но сам массив возможных значений начинается от 0.
Обработчик Enum может выглядеть вот так:
func (m *BinlogParser) stringHelper(e *canal.RowsEvent, n int, columnName string) string {
columnId := m.getBinlogIdByName(e, columnName)
if e.Table.Columns[columnId].Type == schema.TYPE_ENUM {
values := e.Table.Columns[columnId].EnumValues //значения полей
if len(values) == 0 || e.Rows[n][columnId] == nil {{
return ""
}
return values[e.Rows[n][columnId].(int64)-1] //первое значение в результате соответствует 0 значению в значениях
}
Я хочу хранить JSON
Хорошая идея, почему нет. JSON с точки зрения mysql — это строка. Надо как-то указать, что эти данные сериализованы — для этого мы добавим к gorm неканоничную аннотацию “fromJson”.
Представим, что такую структуру надо считать:
type JsonData struct {
Int int `gorm:"column:int"`
StructData TestData `gorm:"column:struct_data;fromJson"`
MapData map[string]string `gorm:"column:map_data;fromJson"`
SliceData []int `gorm:"column:slice_data;fromJson"`
}
type TestData struct {
Test string `json:"test"`
Int int `json:"int"`
}
Можно написать много условий и, наверное, получится. Но каждый новый тип данных убьет все старания. Хотя попытка найти ответы на stackoverflow — «как привести и десериализовать неизвестный тип структуры» начинается с фразы: «Непонятно зачем вам это надо, но попробуйте …».
Приведя нужный тип к интерфейсу, мы сможем это сделать:
if _, ok := parsedTag["FROMJSON"]; ok {
newObject := reflect.New(s.Field(k).Type()).Interface()
json := m.stringHelper(e, n, columnName)
jsoniter.Unmarshal([]byte(json), &newObject)
s.Field(k).Set(reflect.ValueOf(newObject).Elem().Convert(s.Field(k).Type()))
}
Если остались вопросы по типам данных, можно посмотреть тесты или задать их в комментариях.
Что получилось в итоге.
uran238
Не смотрели на dev.mysql.com/doc/refman/5.7/en/audit-log-legacy-filtering.html?
jacksparrow Автор
В audit_log у нас прилетают запросы. Row формат binlog'a содержит данные. Для задачи обновления данных в памяти демонов, это подходит лучше.