Команда Python for Devs подготовила перевод статьи о том, как найти самый быстрый способ загрузки данных в PostgreSQL с помощью Python. Автор пошагово сравнил разные методы — от построчных вставок до COPY с потоковой генерацией CSV — и показал, как ускорить процесс более чем в 250 раз при нулевом потреблении памяти.
Как разработчики, часто выполняющие роль «сантехников для данных», мы нередко сталкиваемся с задачей загрузки данных, полученных из удалённого источника, в наши системы. Если повезёт, данные будут сериализованы в формате JSON или YAML. Если повезёт меньше — получаем Excel-таблицу или CSV-файл, который всегда почему-то сломан.
Данные от крупных компаний или из старых систем почти всегда закодированы каким-то странным способом, а системные администраторы уверены, что помогают нам, запаковывая файлы в архив (пожалуйста, используйте gzip) или разбивая их на куски с случайными именами.
Современные сервисы иногда предоставляют вменяемый API, но чаще приходится скачивать файл с FTP, SFTP, S3 или из какой-то проприетарной системы, которая работает только под Windows.
В этой статье мы разберём лучший способ импортировать «грязные» данные из удалённого источника в PostgreSQL.
Чтобы предложить рабочее решение для реального кейса, мы определили следующие условия:
Данные загружаются из удалённого источника.
Данные грязные и требуют преобразования.
Данные большие.
Настройка: пивоварня
Я нашёл отличный публичный API связанный c пивом, поэтому будем загружать данные в таблицу beer в базе данных.
Данные
Один объект пива из API выглядит так:
$ curl https://api.punkapi.com/v2/beers/?per\_page=1&page=1
[
{
"id": 1,
"name": "Buzz",
"tagline": "A Real Bitter Experience.",
"first_brewed": "09/2007",
"description": "A light, crisp and bitter IPA ...",
"image_url": "https://images.punkapi.com/v2/keg.png",
"abv": 4.5,
"ibu": 60,
"target_fg": 1010,
"target_og": 1044,
"ebc": 20,
"srm": 10,
"ph": 4.4,
"attenuation_level": 75,
"volume": {
"value": 20,
"unit": "litres"
},
"contributed_by": "Sam Mason <samjbmason>"
"brewers_tips": "The earthy and floral aromas from...",
"boil_volume": {},
"method": {},
"ingredients": {},
"food_pairing": [],
}
]
Я немного обрезал вывод для краткости, но информации о пиве здесь очень много. В этой статье мы хотим загрузить в таблицу базы данных все поля до brewers_tips
.
Поле volume
вложенное. Нам нужно вытащить только значение и сохранить его в поле volume
в таблице:
volume = beer['volume']['value']
Поле first_brewed
содержит только год и месяц, а иногда только год. Нужно преобразовать это значение в корректную дату.
Например, значение 09/2007
должно стать 2007-09-01
.
А значение 2006
— 2006-01-01
.
Напишем простую функцию, которая преобразует текстовое значение поля в объект datetime.date
:
import datetime
def parse_first_brewed(text: str) -> datetime.date:
parts = text.split('/')
if len(parts) == 2:
return datetime.date(int(parts[1]), int(parts[0]), 1)
elif len(parts) == 1:
return datetime.date(int(parts[0]), 1, 1)
else:
assert False, 'Unknown date format'
Быстро проверим, что она работает:
>>> parse_first_brewed('09/2007')
datetime.date(2007, 9, 1)
>>> parse_first_brewed('2006')
datetime.date(2006, 1, 1)
В реальных задачах преобразования могут быть гораздо сложнее. Но для нашей цели этого более чем достаточно.
Загрузка данных
API возвращает результаты постранично. Чтобы инкапсулировать пагинацию, создадим генератор, который будет выдавать объекты пива по одному:
from typing import Iterator, Dict, Any
from urllib.parse import urlencode
import requests
def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]:
session = requests.Session()
page = 1
while True:
response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({
'page': page,
'per_page': page_size
}))
response.raise_for_status()
data = response.json()
if not data:
break
yield from data
page += 1
Чтобы воспользоваться функцией-генератором, просто вызываем её и итерируемся:
>>> beers = iter_beers_from_api()
>>> next(beers)
{'id': 1,
'name': 'Buzz',
'tagline': 'A Real Bitter Experience.',
'first_brewed': '09/2007',
'description': 'A light, crisp and bitter IPA brewed...',
'image_url': 'https://images.punkapi.com/v2/keg.png',
'abv': 4.5,
'ibu': 60,
'target_fg': 1010,
...
}
>>> next(beers)
{'id': 2,
'name': 'Trashy Blonde',
'tagline': "You Know You Shouldn't",
'first_brewed': '04/2008',
'description': 'A titillating, ...',
'image_url': 'https://images.punkapi.com/v2/2.png',
'abv': 4.1,
'ibu': 41.5,
Вы заметите, что первый результат каждой страницы появляется чуть дольше. Это потому, что в этот момент выполняется сетевой запрос для получения страницы.
Создание таблицы в базе данных
Следующий шаг — создать таблицу в базе, в которую будем загружать данные.
Сначала создаём базу данных:
$ createdb -O haki testload
Замените haki
на имя вашего локального пользователя.
Чтобы подключиться к базе PostgreSQL из Python, используем библиотеку psycopg:
$ python -m pip install psycopg2
Создадим подключение к базе:
import psycopg2
connection = psycopg2.connect(
host="localhost",
database="testload",
user="haki",
password=None,
)
connection.autocommit = True
Мы включаем autocommit=True
, чтобы каждая выполненная команда сразу применялась. Для целей этой статьи это приемлемо.
Теперь, когда подключение готово, напишем функцию для создания таблицы:
def create_staging_table(cursor) -> None:
cursor.execute("""
DROP TABLE IF EXISTS staging_beers;
CREATE UNLOGGED TABLE staging_beers (
id INTEGER,
name TEXT,
tagline TEXT,
first_brewed DATE,
description TEXT,
image_url TEXT,
abv DECIMAL,
ibu DECIMAL,
target_fg DECIMAL,
target_og DECIMAL,
ebc DECIMAL,
srm DECIMAL,
ph DECIMAL,
attenuation_level DECIMAL,
brewers_tips TEXT,
contributed_by TEXT,
volume INTEGER
);
""")
Эта функция принимает курсор и создаёт unlogged-таблицу с именем staging_beers
.
Unlogged table: Данные, записанные в unlogged-таблицу, не попадают в журнал предзаписи (WAL), что делает её удобной для временных промежуточных таблиц. Однако стоит помнить, что unlogged-таблицы не будут восстановлены в случае сбоя и не участвуют в репликации.
Пример вызова функции с использованием ранее созданного подключения:
>>> with connection.cursor() as cursor:
>>> create_staging_table(cursor)
Теперь можно переходить к следующему шагу.
Метрики
В этой статье нас интересуют две основные метрики: время и память.
Измерение времени
Чтобы измерить время выполнения каждого метода, используем встроенный модуль time
:
>>> import time
>>> start = time.perf_counter()
>>> time.sleep(1) # выполняем работу
>>> elapsed = time.perf_counter() - start
>>> print(f'Time {elapsed:0.4}')
Time 1.001
Функция perf_counter
предоставляет таймер с наибольшим доступным разрешением, что делает её идеальной для наших целей.
Измерение памяти
Для измерения потребления памяти используем пакет memory-profiler:
$ python -m pip install memory-profiler
Этот пакет показывает использование памяти и приращение памяти на каждой строке кода. Это очень полезно при оптимизации по памяти. Для примера возьмём пример с PyPI:
$ python -m memory_profiler example.py
Line # Mem usage Increment Line Contents
==============================================
3 @profile
4 5.97 MB 0.00 MB def my_func():
5 13.61 MB 7.64 MB a = [1] * (10 ** 6)
6 166.20 MB 152.59 MB b = [2] * (2 * 10 ** 7)
7 13.61 MB -152.59 MB del b
8 13.61 MB 0.00 MB return a
Самая интересная колонка здесь — Increment, показывающая, сколько памяти дополнительно выделяется на каждой строке.
В этой статье нас интересует пиковое использование памяти функцией. Пиковое значение — это разница между стартовым значением колонки Mem usage и её максимальным значением (так называемая «high watermark»).
Чтобы получить список значений Mem usage, используем функцию memory_usage
из memory_profiler
:
>>> from memory_profiler import memory_usage
>>> mem, retval = memory_usage((fn, args, kwargs), retval=True, interval=1e-7)
При таком вызове memory_usage
выполняет функцию fn
с переданными аргументами args
и kwargs
, а также запускает отдельный процесс для отслеживания использования памяти каждые interval
секунд.
Для очень быстрых операций функция fn
может быть выполнена несколько раз. Чтобы этого избежать, устанавливаем interval
меньше 1e-6
, тогда функция выполнится только один раз.
Аргумент retval
говорит функции вернуть результат выполнения fn
.
Декоратор profile
Чтобы объединить всё вместе, создадим следующий декоратор, который будет измерять и выводить время и память:
import time
from functools import wraps
from memory_profiler import memory_usage
def profile(fn):
@wraps(fn)
def inner(*args, **kwargs):
fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items())
print(f'\n{fn.__name__}({fn_kwargs_str})')
# Измеряем время
t = time.perf_counter()
retval = fn(*args, **kwargs)
elapsed = time.perf_counter() - t
print(f'Time {elapsed:0.4}')
# Измеряем память
mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7)
print(f'Memory {max(mem) - min(mem)}')
return retval
return inner
Чтобы исключить взаимное влияние измерения времени на измерение памяти и наоборот, мы выполняем функцию дважды:
первый раз — чтобы замерить время,
второй раз — чтобы замерить потребление памяти.
Декоратор выводит имя функции, переданные именованные аргументы, а затем сообщает время и память, которые она использовала:
>>> @profile
>>> def work(n):
>>> for i in range(n):
>>> 2 ** n
>>> work(10)
work()
Time 0.06269
Memory 0.0
>>> work(n=10000)
work(n=10000)
Time 0.3865
Memory 0.0234375
Обратите внимание: выводятся только именованные аргументы. Это сделано специально — мы будем использовать это в параметризованных тестах.
Бенчмарк
На момент написания статьи в API с пивом всего 325 сортов. Чтобы поработать с большим набором данных, мы размножим его 100 раз и будем хранить в памяти. Итоговый датасет будет содержать 32 500 записей:
>>> beers = list(iter_beers_from_api()) * 100
>>> len(beers)
32,500
Чтобы имитировать удалённый API, наши функции будут принимать итераторы, похожие на возвращаемое значение iter_beers_from_api
:
def process(beers: Iterator[Dict[str, Any]])) -> None:
# Process beers...
Для бенчмарка мы будем импортировать данные о пиве в базу данных. Чтобы исключить внешние факторы (например, сеть), мы заранее получим данные из API и раздаём их локально.
Чтобы получить точные замеры времени, «подделаем» удалённый API:
>>> beers = list(iter_beers_from_api()) * 100
>>> process(beers)
В реальной ситуации вы бы вызывали функцию iter_beers_from_api
напрямую:
>>> process(iter_beers_from_api())
Мы готовы начать!
Вставка строк по одной
Для базовой линии начнём с самого простого подхода — вставлять строки по одной:
@profile
def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
for beer in beers:
cursor.execute("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", {
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
})
Обратите внимание: по мере итерации по списку мы преобразуем first_brewed
в datetime.date
и извлекаем volume
из вложенного поля volume
.
Запуск этой функции даёт следующий вывод:
>>> insert_one_by_one(connection, beers)
insert_one_by_one()
Time 128.8
Memory 0.08203125
Функция потратила 129 секунд на импорт 32 тыс. строк. Профилировщик памяти показывает, что функция потребляла совсем немного памяти.
Интуитивно понятно, что вставка строк по одной — не самый эффективный подход. Постоянные переключения контекста между программой и базой данных, скорее всего, сильно тормозят процесс.
Execute Many
Psycopg2 предоставляет способ вставлять сразу много строк с помощью executemany
. Из документации:
Выполнить операцию с базой данных (запрос или команду) для всех кортежей параметров или отображений, найденных в последовательности
vars_list
.
Звучит многообещающе!
Попробуем импортировать данные с использованием executemany
:
@profile
def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
cursor.executemany("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers)
Эта функция очень похожа на предыдущую, и преобразования те же самые. Главное отличие в том, что мы сначала преобразуем все данные в памяти, и только потом загружаем их в базу.
Запуск даёт такой вывод:
>>> insert_executemany(connection, beers)
insert_executemany()
Time 124.7
Memory 2.765625
Не слишком радует. Время лишь чуть лучше, зато функция теперь потребляет 2.7 МБ памяти.
Чтобы оценить масштаб, JSON-файл, содержащий только импортируемые данные, весит на диске 25 МБ. Если сохранить пропорцию, импорт 1 ГБ таким методом потребует около 110 МБ памяти.
Вставка с итератором для executemany
Предыдущий подход потреблял много памяти, потому что преобразованные данные сохранялись в памяти перед тем, как их обрабатывал psycopg.
Посмотрим, можно ли использовать итератор, чтобы не хранить данные в памяти:
@profile
def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
cursor.executemany("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers))
Отличие в том, что преобразованные данные «стримятся» в executemany
через итератор.
Эта функция даёт такой результат:
>>> insert_executemany_iterator(connection, beers)
insert_executemany_iterator()
Time 129.3
Memory 0.0
Наше «стриминговое» решение сработало как задумано: нам удалось свести потребление памяти к нулю. Однако по времени всё осталось примерно на уровне метода с построчной вставкой.
Execute Batch
В документации psycopg в разделе «fast execution helpers» есть примечание про executemany
:
Текущая реализация executemany() (мягко говоря) не отличается высокой производительностью. Эти функции можно использовать для ускорения повторного выполнения одного и того же выражения с набором параметров. За счёт сокращения количества обращений к серверу производительность может быть на порядки выше, чем при использовании executemany().
То есть мы изначально шли не тем путём!
Сразу под этим разделом описана функция execute_batch
:
Выполняет группу выражений с меньшим количеством обращений к серверу.
Реализуем загрузку с использованием execute_batch
:
import psycopg2.extras
@profile
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers)
Выполним функцию:
>>> insert_execute_batch(connection, beers)
insert_execute_batch()
Time 3.917
Memory 2.50390625
Вау! Это огромный скачок. Функция завершилась чуть меньше чем за 4 секунды — примерно в 33 раза быстрее, чем исходные 129 секунд.
Execute Batch с итератором
Функция execute_batch
использует меньше памяти, чем executemany
для тех же данных. Попробуем полностью убрать хранение данных в памяти и «передавать» их в execute_batch
с помощью итератора:
@profile
def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
iter_beers = ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers)
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", iter_beers)
Выполнение функции:
>>> insert_execute_batch_iterator(connection, beers)
insert_execute_batch_iterator()
Time 4.333
Memory 0.2265625
Мы получили примерно то же самое время, но с меньшим использованием памяти.
Execute Batch с итератором и размером страницы
При чтении документации по execute_batch
обращает на себя внимание аргумент page_size
:
page_size – максимальное количество элементов
argslist
, которые включаются в одно SQL-выражение. Если элементов больше — функция выполнит несколько выражений.
В документации также указано, что функция работает быстрее за счёт уменьшения количества обращений к серверу. Логично предположить, что больший page_size
уменьшит количество таких обращений и ускорит загрузку.
Добавим аргумент page_size
в нашу функцию, чтобы поэкспериментировать:
@profile
def insert_execute_batch_iterator(
connection,
beers: Iterator[Dict[str, Any]],
page_size: int = 100,
) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
iter_beers = ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers)
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", iter_beers, page_size=page_size)
Теперь мы можем варьировать page_size
и сравнивать скорость загрузки, чтобы подобрать оптимальное значение.
Размер страницы по умолчанию — 100. Проведём бенчмарк с разными значениями и сравним результаты:
>>> insert_execute_batch_iterator(connection, iter(beers), page_size=1)
insert_execute_batch_iterator(page_size=1)
Time 130.2
Memory 0.0
>>> insert_execute_batch_iterator(connection, iter(beers), page_size=100)
insert_execute_batch_iterator(page_size=100)
Time 4.333
Memory 0.0
>>> insert_execute_batch_iterator(connection, iter(beers), page_size=1000)
insert_execute_batch_iterator(page_size=1000)
Time 2.537
Memory 0.2265625
>>> insert_execute_batch_iterator(connection, iter(beers), page_size=10000)
insert_execute_batch_iterator(page_size=10000)
Time 2.585
Memory 25.4453125
Получились любопытные результаты, разберём по пунктам:
1: Результаты похожи на вставку строк по одной.
100: Это значение по умолчанию для
page_size
, поэтому результаты близки к предыдущему бенчмарку.1000: Время примерно на 40% лучше, при этом память используется мало.
10000: По времени почти не быстрее, чем при размере 1000, но потребление памяти значительно выше.
Результаты показывают компромисс между памятью и скоростью. В данном случае «золотая середина» — page_size = 1000
.
Execute Values
На этом «сокровища» в документации psycopg не заканчиваются. Пока листал документацию, на глаза попалась ещё одна функция — execute_values
:
Выполнить выражение с использованием
VALUES
и последовательности параметров.
execute_values
формирует большой список VALUES
прямо в запросе.
Попробуем:
import psycopg2.extras
@profile
def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", [(
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers])
Импорт пива с помощью этой функции:
>>> insert_execute_values(connection, beers)
insert_execute_values()
Time 3.666
Memory 4.50390625
Сразу из коробки мы получили небольшое ускорение по сравнению с execute_batch
. Однако потребление памяти чуть выше.
Execute Values c итератором
Как и раньше, чтобы снизить потребление памяти, постараемся не хранить данные в памяти и использовать итератор вместо списка:
@profile
def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", ((
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers))
Выполнение функции даёт следующие результаты:
>>> insert_execute_values_iterator(connection, beers)
insert_execute_values_iterator()
Time 3.677
Memory 0.0
По времени почти то же самое, а память снова равна нулю.
Execute Values с итератором и размером страницы
Как и execute_batch
, функция execute_values
тоже принимает аргумент page_size
:
@profile
def insert_execute_values_iterator(
connection,
beers: Iterator[Dict[str, Any]],
page_size: int = 100,
) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", ((
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers), page_size=page_size)
Запуск с разными размерами страниц:
>>> insert_execute_values_iterator(connection, iter(beers), page_size=1)
insert_execute_values_iterator(page_size=1)
Time 127.4
Memory 0.0
>>> insert_execute_values_iterator(connection, iter(beers), page_size=100)
insert_execute_values_iterator(page_size=100)
Time 3.677
Memory 0.0
>>> insert_execute_values_iterator(connection, iter(beers), page_size=1000)
insert_execute_values_iterator(page_size=1000)
Time 1.468
Memory 0.0
>>> insert_execute_values_iterator(connection, iter(beers), page_size=10000)
insert_execute_values_iterator(page_size=10000)
Time 1.503
Memory 2.25
Как и в случае с execute_batch
, здесь виден компромисс между памятью и скоростью. Оптимум снова примерно на уровне page_size = 1000
. Однако с execute_values
при том же размере страницы мы получили результат примерно на 20% быстрее, чем с execute_batch
.
COPY
В официальной документации PostgreSQL есть целый раздел о заполнении базы данных. Согласно документации, самый быстрый способ загрузить данные в базу — использовать команду COPY
.
Чтобы вызывать COPY
из Python, в psycopg есть специальная функция copy_from
. Команда COPY
ожидает CSV-файл. Попробуем преобразовать наши данные в CSV и загрузить их в базу с помощью copy_from
:
import io
def clean_csv_value(value: Optional[Any]) -> str:
if value is None:
return r'\N'
return str(value).replace('\n', '\\n')
@profile
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
csv_file_like_object = io.StringIO()
for beer in beers:
csv_file_like_object.write('|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['contributed_by'],
beer['brewers_tips'],
beer['volume']['value'],
))) + '\n')
csv_file_like_object.seek(0)
cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|')
Разберём по шагам:
-
clean_csv_value
: преобразует одиночное значение.Экранируем переводы строк: некоторые текстовые поля содержат переносы, поэтому заменяем
\n
→\\n
.Пустые значения превращаем в
\N
: строка"\N"
— это значение по умолчанию, которым PostgreSQL обозначаетNULL
вCOPY
(это можно изменить опциейNULL
).
csv_file_like_object
: создаём «файлоподобный» объект черезio.StringIO
. ОбъектStringIO
хранит строку, но ведёт себя как файл. В нашем случае — как CSV-файл.-
csv_file_like_object.write
: превращаем один объект пива в строку CSV.Трансформации: здесь же выполняем преобразования
first_brewed
иvolume
.Разделитель: в наборе данных есть поля со свободным текстом и запятыми. Чтобы избежать конфликтов, в качестве разделителя берём
"|"
(альтернатива — использоватьQUOTE
).
Посмотрим, окупились ли усилия:
>>> copy_stringio(connection, beers)
copy_stringio()
Time 0.6274
Memory 99.109375
COPY
— самый быстрый способ из всех, что мы видели! Процесс завершился меньше чем за секунду. Однако этот метод заметно прожорливее по памяти: функция потребляет 99 МБ — это более чем вдвое больше размера нашего JSON-файла на диске.
Копирование данных из итератора строк
Один из главных недостатков использования COPY
с StringIO
— весь файл создаётся в памяти. А что, если вместо формирования целого файла в памяти мы создадим файловый объект, который будет служить буфером между удалённым источником и командой COPY
? Такой буфер будет получать JSON через итератор, очищать и преобразовывать данные и на выходе выдавать «чистый» CSV.

Вдохновившись этим ответом на Stack Overflow, мы создали объект, который «питается» от итератора и предоставляет интерфейс, как у файла:
from typing import Iterator, Optional
import io
class StringIteratorIO(io.TextIOBase):
def __init__(self, iter: Iterator[str]):
self._iter = iter
self._buff = ''
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> str:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret):]
return ret
def read(self, n: Optional[int] = None) -> str:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return ''.join(line)
Чтобы показать, как это работает, вот как можно получить «файлоподобный» CSV-объект из списка чисел:
>>> gen = (f'{i},{i**2}\n' for i in range(3))
>>> gen
<generator object <genexpr> at 0x7f58bde7f5e8>
>>> f = StringIteratorIO(gen)
>>> print(f.read())
0,0
1,1
2,4
Обратите внимание, что мы использовали f
как файл. Внутри он подтягивал строки из gen
только тогда, когда его внутренний буфер строк был пуст.
Функция загрузки с использованием StringIteratorIO
выглядит так:
@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
beers_string_iterator = StringIteratorIO((
'|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']).isoformat(),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
))) + '\n'
for beer in beers
))
cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|')
Главное отличие в том, что CSV с пивом потребляется по запросу, и данные не хранятся в памяти после использования.
Запустим функцию и посмотрим на результат:
>>> copy_string_iterator(connection, beers)
copy_string_iterator()
Time 0.4596
Memory 0.0
Отлично! Время небольшое, а память снова равна нулю.
Копирование данных из итератора строк с указанием размера буфера
Пытаясь «выжать» ещё каплю производительности, замечаем, что так же, как и page_size
, команда COPY
принимает схожий параметр size
:
size
— размер буфера, используемого для чтения из файла.
Добавим аргумент size
в функцию:
@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
beers_string_iterator = StringIteratorIO((
'|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']).isoformat(),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
))) + '\n'
for beer in beers
))
cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size)
Значение по умолчанию для size
— 8192, то есть 2 ** 13
, поэтому будем использовать степени двойки:
>>> copy_string_iterator(connection, iter(beers), size=1024)
copy_string_iterator(size=1024)
Time 0.4536
Memory 0.0
>>> copy_string_iterator(connection, iter(beers), size=8192)
copy_string_iterator(size=8192)
Time 0.4596
Memory 0.0
>>> copy_string_iterator(connection, iter(beers), size=16384)
copy_string_iterator(size=16384)
Time 0.4649
Memory 0.0
>>> copy_string_iterator(connection, iter(beers), size=65536)
copy_string_iterator(size=65536)
Time 0.6171
Memory 0.0
В отличие от предыдущих примеров, здесь, похоже, нет компромисса между скоростью и памятью. Это логично, потому что метод изначально спроектирован так, чтобы не потреблять память. Тем не менее, при изменении размера буфера время меняется. Для нашего набора данных оптимальным оказалось значение по умолчанию — 8192.
Русскоязычное сообщество про Python

Друзья! Эту статью перевела команда Python for Devs — канала, где каждый день выходят самые свежие и полезные материалы о Python и его экосистеме. Подписывайтесь, чтобы ничего не пропустить!
Результаты
ФУНКЦИЯ ВРЕМЯ (СЕК) ПАМЯТЬ (МБ)
insert_one_by_one() 128.8 0.08203125
insert_executemany() 124.7 2.765625
insert_executemany_iterator() 129.3 0.0
insert_execute_batch() 3.917 2.50390625
insert_execute_batch_iterator(page_size=1) 130.2 0.0
insert_execute_batch_iterator(page_size=100) 4.333 0.0
insert_execute_batch_iterator(page_size=1000) 2.537 0.2265625
insert_execute_batch_iterator(page_size=10000) 2.585 25.4453125
insert_execute_values() 3.666 4.50390625
insert_execute_values_iterator(page_size=1) 127.4 0.0
insert_execute_values_iterator(page_size=100) 3.677 0.0
insert_execute_values_iterator(page_size=1000) 1.468 0.0
insert_execute_values_iterator(page_size=10000) 1.503 2.25
copy_stringio() 0.6274 99.109375
copy_string_iterator(size=1024) 0.4536 0.0
copy_string_iterator(size=8192) 0.4596 0.0
copy_string_iterator(size=16384) 0.4649 0.0
copy_string_iterator(size=65536) 0.6171 0.0
Главный вопрос теперь: «Что использовать?» Как обычно, ответ: «Зависит от ситуации». У каждого метода есть свои плюсы и минусы, и каждый подходит под разные условия:
Вывод 1: Предпочитайте встроенные подходы для сложных типов данных.
executemany
,execute_values
иexecute_batch
сами занимаются преобразованием типов из Python в типы базы данных. Варианты с CSV требуют экранирования.Вывод 2: Предпочитайте встроенные подходы для небольших объемов данных.
Встроенные подходы читаемее и с меньшей вероятностью сломаются в будущем. Если время и память не критичны — оставляйте все простым!Вывод 3: Предпочитайте подходы с COPY для больших объемов данных.
COPY лучше подходит для больших объемов, где память может стать проблемой.
Исходный код для этого бенчмарка доступен здесь.
Akuma
Лучше конечно делать тесты на больших данных. Там лучше видно разницу.
32 тыс строк - это ни о чем, просто засунул в транзакцию и ладно.