При работе с базой данных (в частности с PostgreSQL) у меня появилась идея выбирать данные из таблицы параллельно (используя ЯП Go). И я задался вопросом «возможно ли сканировать строки выборки в отдельных гоурутинах».
Как оказалось, func (*Rows) Scan нельзя вызывать одновременно в гоурутинах. Исходя из этого ограничения, я решил выполнять параллельно со сканированием строк другие процессы, в частности, подготовку результирующих данных.
Т.к. Scan складывает данные по указателям, я решил сделать два среза (позже я поясню почему именно два), между которыми я буду переключать Scan, в то время как остальные гоурутины будут разбираться с уже выбранными данными.
Изначально мне нужно знать количество колонок выборки:
columns, err = rows.Columns()
count := len(columns)
Далее я создаю по два среза со значениями и с указателями на эти значения (куда и буду складывать данные во время сканирования строк):
values := make([]interface{}, count)
valuesPtrs := make([]interface{}, count)
values_ := make([]interface{}, count)
valuesPtrs_ := make([]interface{}, count)
for i := range columns {
valuesPtrs[i] = &values;[i]
valuesPtrs_[i] = &values;_[i]
}
В данном примере я буду складывать результат выборки в map[string]string, где ключами будут имена колонок. Можно использовать конкретную структуру с указанием типов, но т.к. цель данной публикации узнать у хабрасообщества насколько жизнеспособный предлагаемый подход, остановимся на выборке в map.
Далее я отделяю две гоурутины, одна из которых будет формировать результирующий map:
func getData(deleteNullValues bool, check, finish chan bool, dbData chan interface{}, columns []string, data *[]map[string]string) {
lnc := len(columns)
for <-check {
row := make(map[string]string)
for i := 0; i < lnc; i++ {
el := <-dbData
b, ok := el.([]byte)
if ok {
row[columns[i]] = string(b)
} else {
if el == nil {
if deleteNullValues == false {
row[columns[i]] = ""
}
} else {
row[columns[i]] = fmt.Sprint(el)
}
}
}
*data = append(*data, row)
}
finish <- true
}
А вторая будет переключаться между двумя срезами со значениями, сформированными Scan и отправлять их в канал для предыдущей гоурутины (которая формирует результат):
func transferData(values, values_ []interface{}, dbData chan interface{}, swtch, working, check chan bool) {
for <-working {
check <- true
switch <-swtch {
case false:
for _, v := range values {
dbData <- v
}
default:
for _, v := range values_ {
dbData <- v
}
}
}
}
Основной процесс будет заниматься переключением между срезами указателей и выбирать данные:
for rows.Next() {
switch chnl {
case false:
if err = rows.Scan(valuesPtrs...); err != nil {
fmt.Printf("rows.Scan: %s\n%s\n%#v\n", err, query, args)
return nil, nil, err
}
default:
if err = rows.Scan(valuesPtrs_...); err != nil {
fmt.Printf("rows.Scan: %s\n%s\n%#v\n", err, query, args)
return nil, nil, err
}
}
working <- true
swtch <- chnl
chnl = !chnl
}
В базе данных я сформировал таблицу с 32-я колонками и добавил в нее 100k строк.
В результате теста (при выборке данных 50 раз) у меня получились следующие данные:
Time spent: 1m8.022277124s — выборка результата с использованием одного среза
Time spent: 1m7.806109441s — выборка результата с использованием двух срезов
При увеличении количества итераций до 100:
Time spent: 2m15.973344023s — выборка результата с использованием одного среза
Time spent: 2m15.057413845s — выборка результата с использованием двух срезов
Разница увеличивается при увеличении объема данных и увеличении колонок в таблице.
Однако обратный результат наблюдался при уменьшении объема данных или при уменьшении кол-ва колонок таблицы, что, в принципе, понятно, т.к. накладные расходы подготовительных шагов и отделения гоурутин «съедают» драгоценное время и результат нивелируется.
Что касается двух срезов и двух гоурутин: я проводил тесты с большим количеством срезов, но время выборки увеличивалось, т.к., очевидно, функции getData и transferData обрабатывают данные быстрее, чем происходит сканирование значений из базы. Поэтому, даже при наличии большего количества ядер, нет смысла добавлять новые срезы для Scan и дополнительные горутины (разве что на совсем диких объемах данных).
В коде на гитхаб я привожу рабочий пример данного подхода. В моих задачах используются еще другие пакеты, которые я вычистил из приведенного когда, но основная идея не должна от этого пострадать.
В общем, жду от заинтересованного сообщества конструктивной критики. Спасибо!
astec
Прямо какой то Go-антипатерн.
Если обрабочик быстрее потока, то нет смысла делать несколько потоков потому что пропусканая способность канала вряд ли от этого увеличится.
Если обработчик медленнее потока достаточно в Scan просто запускать новую рутину на каждую запись на обработку. Ну или складывать в канал для последующей обработки ограниченным количеством рутин.
helgihabr Автор
Запускать новую рутину на каждую запись нельзя, т.к. накладные расходы на отделение потока будут огромны и больше затормозят весь процесс выборки, чем как-то помогут.
Рутины нужно запускать до процесса/цикла выборки и уже отправлять им данные через буферизированные каналы.
Deosis
Попробуйте организовать пул воркеров из примера и сравнить производительность
Jouretz
Вы это померяли или просто так пишете?
Рутины так-то не создают потоки, а запускаются в уже готовых. Там затраты на запуск рутины чуть больше чем на вызов функции.
helgihabr Автор
Я это мерял (и не только в этой задаче). В принципе, это известная тема: запуск рутины «дешевый», но если вы будете делать это тысячи раз в цикле, деградация в скорости будет колоссальной.
Кстати, вы, случайно, не путаете поток с каналом?