Привет, Хабр!

Сегодня рассмотрим решение одной непростой задачи, как в Django выдавать очень большие объёмы данных, например, выгрузку в CSV или потоковый JSON‑формат NDJSON) так, чтобы сервер не ложился от нагрузки, а пользователи быстрее получали первые данные. Разберём, как использовать StreamingHttpResponse и генераторы (в том числе асинхронные) для стриминга больших ответов и поговорим нюансах.

Зачем стримить ответы и в чем проблема больших выгрузок

Допустим у вас в приложении появилась необходимость отдать пользователю файл с огромным количеством данных, например, отчёт в CSV на сотни тысяч строк, или API должен вернуть большую выборку объектов в формате JSON. Если делать это обычным способом, т.е собрать весь контент в строку или список Python и вернуть через HttpResponse возникают две большие проблемы:

  • Память. Формирование большого ответа целиком требует держать все данные в памяти. Если записей сотни тысяч, это запросто выжрет гигабайты RAM и может положить сервер.

  • Время до первого байта (TTFB). Пока сервер собирает весь ответ целиком, пользователь ничего не получает. Может пройти много секунд (а то и минут), прежде чем первый байт долетит до клиента. Браузер или клиентский скрипт всё это время висит в ожидании. Это ухудшает UX и может привести к таймаутам на стороне балансировщика или прокси.

Итого, классический подход HttpResponse с единовременным формированием контента не вариант для больших ответов. Нам нужно начинать отправлять данные клиенту постепенно, кусочками, по мере генерации. И желательно не держа всё в памяти сразу.

StreamingHttpResponse – решение для больших ответов

К счастью большому счастью, Django предоставляет специальный класс ответа: StreamingHttpResponse. Его фича в том, что он позволяет отправлять тело ответа в виде итератора, выдавая данные чанками. Сервер начинает транслировать эти порции клиенту сразу, как только они готовы, не дожидаясь полного формирования всего ответа.

Если коротко, при возврате StreamingHttpResponse Django не хранит контент целиком, а передаёт генератор веб‑серверу (WSGI/ASGI), и тот последовательно читает из него и отправляет клиенту кусочки (обычно через механизм Transfer-Encoding: chunked в HTTP/1.1). Ваша задача дать StreamingHttpResponse итерируемый объект, который по требованию будет выдавать части ответа (строки или байты). Например, можно написать генератор‑функцию в Python с yield и передать её. Django сам позаботится, чтобы значения из генератора превратились в байты и улетели в ответan.

Простейший приме, возьмём строчку «Hello, Habr!» и отправим её по частям:

from django.http import StreamingHttpResponse

def stream_hello(request):
    def content():
        yield "Hello, "
        yield "Habr!"
    return StreamingHttpResponse(content(), content_type="text/plain")

Клиент в итоге получает объединённую строку «Hello, Habr!», но отправлена она будет двумя чанками. Конечно, наш пример игрушечный, строка маленькая, и смысла стримить её нет. Но принцип тот же самый для файлов в гигабайты: итерируем и отправляем кусками, не накапливая всё сразу.

Мы не указываем Content‑Length. При стриминге его и не может быть, ведь мы заранее не знаем, сколько данных выйдет (или нам невыгодно это вычислять). Отсутствие Content‑Length сигнал клиенту, что ответ chunked, и надо читать до закрытия соединения.

Синхронно или асинхронно?

Django поддерживает StreamingHttpResponse как в классическом синхронном режиме (WSGI), так и в асинхронном (ASGI). Понимаем разницу:

  • WSGI (sync): генератор будет выполняться в основном потоке запроса. Пока он не отдаст все данные, этот рабочий процесс (например, один из воркеров Gunicorn) занят. Да, мы сэкономим память, но время обработки запроса всё равно большое, и этот воркер весь период занят одной задачей. Если таких запросов много, может возникнуть очередь, каждый занимает воркер на длительное время.

  • ASGI (async): можно написать асинхронную view (через async def) и сделать генератор асинхронным. Тогда пока генератор ждёт I/O (например, чтение из базы или файла с помощью await), поток выполнения может переключаться на обработку других запросов. Это позволяет одновременно держать много открытых стриминговых соединений без выделения отдельного потока на каждое. Отлично подходит для задач типа Server‑Sent Events, длинных опросов и просто очень долгих выгрузок, сервер остаётся отзывчивым.

Проще говоря, синхронный стриминг экономит память и даёт данные частями, но не уменьшает нагрузки на воркеры по времени. Асинхронный стриминг позволяет ещё и не блокировать поток исполнения, если вы используете асинхронные операции внутри генератора.

Однако, если внутри генератора вы делаете тяжёлые вычисления CPU, асинхронность не поможет, GIL не позволит исполнять два Python‑треда параллельно на одном ядре. В таких случаях стоит выносить тяжёлую логику за пределы веб‑сервера (например, в Celery‑задачу, формировать файл заранее, а Django пусть только отдаёт). Но это уже другая история.

Тип итератора должен соответствовать типу view. Django требует: если view синхронный (исполняется под WSGI), то передавать обычный синхронный итератор; если view асинхронный (ASGI), то дать асинхронный async for итератор. Если перепутать, Django постарается сконвертировать, например, асинхронный генератор в синхронном контексте он полностью вычитает целиком в память (тем самым теряя весь смысл стриминга!) и выдаст warning. Так что не путайте: для async def view возвращайте StreamingHttpResponse с AsyncIterator (типа async def gen: yield ...), а для обычного def с обычным генератором. Мы рассмотрим примеры обоих подходов.

Генерируем CSV

Начнём с классического сценария: выгрузка большого CSV‑файла. Допустим, у нас в базе много записей, и мы хотим отдать их пользователю в виде CSV‑файла, чтобы он мог скачать и открыть в Excel/LibreOffice. Пусть это будет некий список пользователей или отчёт, неважно. Главное, что записей тысячи или сотни тысяч.

Старая школа: HttpResponse (не делайте так)

Для понимания масштаба проблемы взглянем на упрощённый код без стриминга (синхронный):

import csv
from django.http import HttpResponse

def download_big_csv(request):
    # Получаем все объекты (например, всех пользователей)
    users = User.objects.all()
    # Создаём ответ и writer
    response = HttpResponse(content_type="text/csv")
    response['Content-Disposition'] = 'attachment; filename="users.csv"'
    writer = csv.writer(response)
    # Записываем заголовок CSV
    writer.writerow(["ID", "Username", "Email"])
    # Пишем все строки
    for user in users:
        writer.writerow([user.id, user.username, user.email])
    return response

Мы читаем все объекты из QuerySet (при итерировании users Django загрузит их пачками, но в итоге переберёт всех). Для каждой строки вызываем writer.writerow, который пишет сразу в response (HttpResponse как файловый объект накапливает данные в памяти). Только после цикла весь контент будет готов, и Django отправит ответ целиком. Если пользователей десятки тысяч, этот код потратит кучу памяти на формирование CSV‑строки внутри HttpResponse (плюс сами объекты занимают память). Пользователь получит файл только когда цикл завершится, может пройти значительное время.

Для больших данных так делать нельзя. Нам надо переписать на стриминг.

Стриминг

Используем StreamingHttpResponse и генератор. В доках джанго есть интересный трюк с классом‑обёрткой, чтобы удобно использовать csv.writer. Дело в том, что csv.writer умеет писать в файлоподобный объект через вызовы метода .write(). А StreamingHttpResponse готов принимать итератор строк/байтов. Как совместить? Сделаем буфер‑обёртку:

class Echo:
    """Простой файловый объект, который вместо записи возвращает переданные данные."""
    def write(self, value):
        return value

Этот класс при вызове write просто возвращает value. Если передать экземпляр Echo в csv.writer как файл, то writer.writerow(row) будет вызывать Echo.write(...) и получит назад строку CSV. Таким образом writerow вернёт нам подготовленную CSV‑строчку! Мы можем сразу её yield‑нуть в поток.

Теперь соберём все части вместе:

import csv
from django.http import StreamingHttpResponse

class Echo:
    def write(self, value):
        return value

def stream_users_csv(request):
    # Готовим запрос к базе
    queryset = User.objects.all().iterator(chunk_size=1000)  # итератор по 1000 объектов, чтобы не грузить всё сразу
    # Создаём writer
    pseudo_buffer = Echo()
    writer = csv.writer(pseudo_buffer)
    # Генератор, который будет по запросу отдавать CSV строки
    def generate():
        # Заголовок CSV
        yield writer.writerow(["ID", "Username", "Email"])
        for user in queryset:
            # Преобразуем объект в строку CSV
            yield writer.writerow([user.id, user.username, user.email])
    # Формируем StreamingHttpResponse с генератором
    response = StreamingHttpResponse(generate(), content_type="text/csv; charset=utf-8")
    response['Content-Disposition'] = 'attachment; filename="users.csv"'
    return response

queryset = User.objects.all().iterator(chunk_size=1000): применяем метод .iterator() у QuerySet, чтобы итерироваться по результатам напрямую из базы без загрузки всех объектов в память сразу. Параметр chunk_size=1000, Django будет подтягивать записи порциями по 1000.

Используем наш Echo и csv.writer(pseudo_buffer). Теперь writer.writerow возвращает готовую CSV‑строку (типа str с завершающим переводом строки). Мы сразу yield эту строку из генератора.

Первым делом отправляем строку заголовков. Затем в цикле для каждого пользователя отправляем строку с его данными.

Возвращаем StreamingHttpResponse(generate(), ...), Django начнёт итерироваться по generate() и отправлять кусочки. Благодаря chunked‑трансферу, браузер начнёт приём файла сразу, а не по завершении всего процесса.

В этом подходе в памяти в каждый момент находится только одна строка CSV и один объект пользователя (плюс небольшой буфер от ORM). Пиковое потребление памяти резко падает по сравнению с наивной реализацией. Как только пользователь прочитан и сгенерирована строка, Python переходит к следующему, предыдущие строки уже могут быть отправлены и освобождены.

Time To First Byte тоже улучшается: заголовок и первая порция данных уйдут практически сразу, как только мы начнём итерацию. Пользователь может уже видеть начавшееся скачивание файла, пока сервер дописывает оставшиеся строки.

В режиме DEBUG стриминг может работать не так, как в продакшене. Dev‑сервер Django не всегда отсылает чанки немедленно. Поэтому тестировать лучше на боевом окружении. И обязательно убедитесь, что прокси не буферизует ответ, об этом чуть ниже.

Теперь наш CSV‑файл может быть сколь угодно большим, ограничение только во времени выполнения запроса. Если генерация займёт час, соединение будет час качать файл. Но сервер при этом не упадёт из‑за памяти. Однако, час держать воркер занятым тоже не идеально. Тут нас выручит асинхронность.

NDJSON: стриминг JSON по строкам

Прежде чем перейдём к асинхронным генераторам, рассмотрим другой формат, NDJSON (Newline Delimited JSON). Это такой способ передать много JSON‑объектов потоком: каждая строка — отдельный валидный JSON, объекты разделяются переводом строки. Такой формат удобен для стриминга: клиент может парсить поступающие строки по одной, не дожидаясь конца всего документа. В отличие от обычного JSON‑массива, NDJSON не требует сначала собрать весь массив целиком и не содержит общей структуры, просто подряд идущие JSON‑объекты, разделённые newline.

Кейс: API, которое отдаёт длинный список записей. Можно, конечно, сформировать огромный JSON‑массив и стримить его, но это сложно: надо сначала вывести [, потом между объектами ставить запятые, учесть последний элемент и закрыть ]. При NDJSON ничего этого не надо, просто шлём объекты подряд. Минус в том, что клиент не сможет автоматически распарсить как обычный JSON (он получит несколько JSON подряд). Но многие библиотеки и инструменты поддерживают NDJSON, да и написать парсер несложно (читаем строчку — делаем JSON.parse).

Сделаем view, которая стримит данные в NDJSON. Пусть это будет список пользователей, как раньше, но в JSON:

import json
from django.http import StreamingHttpResponse

def stream_users_ndjson(request):
    qs = User.objects.all().iterator(chunk_size=1000)
    def generate():
        for user in qs:
            data = {
                "id": user.id,
                "username": user.username,
                "email": user.email,
            }
            # Преобразуем в JSON-строку и добавляем перевод строки
            json_line = json.dumps(data, ensure_ascii=False)
            yield json_line + "\n"
    response = StreamingHttpResponse(generate(), content_type="application/x-ndjson; charset=utf-8")
    # Можно также добавить Content-Disposition, если хотим скачивать как файл .ndjson
    # response['Content-Disposition'] = 'attachment; filename="users.ndjson"'
    return response

Здесь всё похоже на предыдущий пример, только вместо CSV пишем JSON. Несколько моментов:

  • ensure_ascii=False позволяет получать Unicode прямо в JSON (не escape‑последовательности), полезно, если у вас есть не‑ASCII символы (например, кириллица в именах), чтобы в файле были читаемые буквы. Content‑Type указываем с charset=utf-8, чтобы клиент правильно понял кодировку.

  • Мы не используем никаких специальных библиотек, просто json.dumps построчно. Если структура сложная, можно вручную сериализовать или использовать Django REST Framework сериализатор, но тогда придётся позаботиться о превращении его вывода в итератор. Наш пример базовый.

  • Обратите внимание, стоит application/x-ndjson. Согласно спецификации NDJSON, именно этот MIME следует использовать. Не application/json, потому что формально весь поток не является валидным JSON‑документом целиком. Некоторые клиенты могут по Content‑Type пытаться парсить JSON автоматически, и у них ничего не выйдет. Поэтому ставим правильный тип. Для CSV мы ставили text/csv, для NDJSON application/x-ndjson.

Такой ответ можно потреблять, например, из JavaScript. Если вы делаете запрос из браузера через fetch, можно читать поток через стрим API (ReadableStream) и разбивать по newline, либо использовать EventSource (правда, для EventSource контент‑тайп должен быть text/event-stream с определённым форматом событий, это не совсем наш случай). Обычно NDJSON используется либо для скачивания файлов (типа логов), либо для API, где клиентская сторона скрипт, умеющий стримить.

Кстати, Django REST Framework из коробки не умеет стримить списки через стандартный Response. Но вы всегда можете вернуть из DRF чистый StreamingHttpResponse. Главное убедиться, что DRF не попробует обработать ответ (например, отключить пагинацию, которая пытается прочитать весь queryset). Про DRF и стриминг, тема для отдельной статьи.

Асинхронный стриминг

Теперь самое интересное, сделаем то же самое, но асинхронно. Предположим, наш проект работает на Django 4+ или 5 под ASGI‑сервером, и мы хотим использовать async def view. Зачем? Как упоминалось, асинхронная реализация позволит не блокировать поток выполнения на время стриминга. Например, если внутри генератора надо обращаться к базе, читать из файла или ждать ответа от внешнего API, мы можем await эти операции. Пока они выполняются, сервер может переключиться на обработку других запросов. В итоге даже при нескольких одновременных больших стримах сервер не встанет колом, а будет обслуживать их кооперативно.

Async-генератор для CSV

Начнём с CSV‑примера, адаптировав его под асинхронный режим. Код станет немного сложнее из‑за сочетания async/await и генератора, но понятный.

Django ожидает, что для асинхронного StreamingHttpResponse мы передадим асинхронный итератор. В Python можно сделать асинхронный генератор, используя async def с yield.

Например:

async def my_async_gen():
    yield b"chunk1"
    await asyncio.sleep(1)
    yield b"chunk2"

Такой генератор при итерации через async for будет выдавать байты, и между ними мы можем выполнять await.

Но проблема: библиотека csv синхронная, и наш трюк с Echo возвращает строку сразу.

Мы не можем напрямую await writer.writerow, он не асинхронный. И чтение QuerySet у Django тоже синхронное (на момент написания, хотя в Django 5 появился экспериментальный async iterator). Поэтому нам придётся комбинировать: вызывать синхронные операции через sync_to_async.

Напишем асинхронную view:

import asyncio
from asgiref.sync import sync_to_async
from django.http import StreamingHttpResponse

class Echo:
    def write(self, value):
        return value

async def stream_users_csv_async(request):
    # Обратите внимание: у async view доступен request.user, но обращаться к БД напрямую нельзя (синхронно).
    # Поэтому будем получать данные через sync_to_async.
    queryset = User.objects.all()
    pseudo_buffer = Echo()
    writer = csv.writer(pseudo_buffer)
    async def async_generate():
        # Заголовок CSV (синхронная операция writer.writerow, оборачиваем в синхронный вызов)
        yield writer.writerow(["ID", "Username", "Email"])
        # Итерируем по QuerySet асинхронно: кусками по 1000
        # Django не умеет просто async for по QuerySet, поэтому делаем манually:
        batch_size = 1000
        start = 0
        while True:
            # Выгружаем очередную порцию объектов в отдельном потоке, чтобы не блокировать event loop
            users_batch = await sync_to_async(list)(queryset[start:start+batch_size])
            if not users_batch:
                break
            for user in users_batch:
                # Генерируем строку CSV синхронно через writer (он быстренько отработает)
                csv_line = writer.writerow([user.id, user.username, user.email])
                yield csv_line
            start += batch_size
    response = StreamingHttpResponse(async_generate(), content_type="text/csv; charset=utf-8")
    response['Content-Disposition'] = 'attachment; filename="users_async.csv"'
    return response

Здесь основные изменения:

  • View объявлена как async def. Django выполнит её в event loop.

  • Мы не можем использовать .iterator() напрямую, потому что он вернёт синхронный генератор. Вместо этого, организуем выборку батчами: срезаем QuerySet. Внутри цикла используем await sync_to_async(list)(queryset[start:start+batch_size]) это выполнит срез (и загрузку объектов) в отдельном потоке (благодаря sync_to_async) и вернёт результат, не блокируя основной поток.

  • После получения списка объектов, перебираем их как обычно и yield CSV‑строки. Сам вызов writer.writerow всё ещё синхронный, но он очень быстрый (форматирование строки), поэтому можно делать его прямо так — это не будет ощутимой блокировкой. Если бы там была длительная операция, тоже завернули бы в sync_to_async.

  • Когда объектов больше нет, выходим из цикла.

В итоге async_generate() это асинхронный генератор (заметьте, async def + внутри есть yield). Мы передаём его в StreamingHttpResponse. Django поймёт, что это async‑итератор (поскольку view async) и будет итерировать по нему не блокируя другие задачи.

Каждый раз, когда генератор выполнит yield csv_line, кусок уйдёт клиенту, а если где‑то внутри был await (как у нас при выборке данных), на время ожидания этого await поток выполнения свободен.

Django не запускает несколько async_generate параллельно, он просто ждёт async for итерации. То есть внутри генератора наш sync_to_async(list) выполнится на отдельном треде, но для самого запроса это всё равно последовательная операция (просто неблокирующая event loop).

Поэтому общее время формирования ответа не сократится по сравнению с синхронным вариантом, мы просто освободим поток для других запросов во время ожидания I/O. Если нам нужно распараллелить генерацию самих данных, это выходит за рамки HTTP‑ответа — тут нужны фоновые задания или распределение по нескольким запросам.

Async-генератор для NDJSON

Аналогично можно адаптировать и NDJSON пример. Там проще, потому что мы напрямую использовали json.dumps (синхронный, но не тяжёлый). Всё упирается опять же в получение данных.

Если Django 5.x, можно попробовать async for user in User.objects.all().aiterator(chunk_size=1000). Есть асинхронный итератор QuerySet (на базе ORM прямиком пока не идеально, но с PostgreSQL и новым драйвером возможно работает).

Но более надёжный способ — тот же sync_to_async.

Пример:

async def stream_users_ndjson_async(request):
    queryset = User.objects.all()
    async def async_generate():
        start = 0
        batch = 1000
        while True:
            users_batch = await sync_to_async(list)(queryset[start:start+batch])
            if not users_batch:
                break
            for user in users_batch:
                data = {"id": user.id, "username": user.username, "email": user.email}
                yield json.dumps(data, ensure_ascii=False) + "\n"
            start += batch
    return StreamingHttpResponse(async_generate(), content_type="application/x-ndjson; charset=utf-8")

Логика та же, что и с CSV. В примере для краткости я опустил установку Content‑Disposition и прочих заголовков.

В асинхронных view Django всё равно разрешает обращаться к ORM, но автоматически обернёт вызовы в sync_to_async, что эквивалентно запуску в отдельном треде. Это так называемая встроенная защита от синхронных вызовов, Django не даст вам случайно блокировать event loop синхронной ORM, выбросив исключение. Поэтому лучше явно выносить такие штуки в sync_to_async, как мы сделали.

Полезные мелочи

Мы почти дошли до финала, но есть несколько нюансов при стриминге.

1. Буферизация прокси. Если ваше Django‑приложение работает за Nginx (или другим прокси‑сервером), по дефолту Nginx может буферизовать ответ. Он может начать получать от приложений кусочки и не отправлять клиенту, пока не накопится определённый объем или весь ответ. Это убьёт идею стриминга, пользователь снова будет ждать конца генерации. Чтобы этого избежать, нужно отключить буферизацию для конкретного location или ответа. В Nginx есть заголовок X-Accel-Buffering: no. Вы можете установить его в ответе:

response['X-Accel-Buffering'] = 'no'

Это укажет Nginx не буферизовать и слать сразу. Также полезно выставить Cache-Control: no-cache для стриминговых ответов, чтобы прокси не пытались их кешировать и не задерживали отправку.

2. GZip и прочие middleware. Помните, что некоторые Django middleware не дружат со стримингом. Например, сжатие GZip: раньше был баг, когда StreamingHttpResponse с unicode‑данными не работал через GZip.

Сейчас это исправлено, но в целом, любые middleware, которые хотят посмотреть или изменить весь контент, не смогут этого сделать, ведь контент мы не храним целиком. ETag, Content‑Length тоже не будут выставлены автоматически. Если вам нужны эти заголовки, скорее всего, стриминг вам не подходит, или придётся вычислять их самому (что обычно невозможно для больших данных заранее). Поэтому иногда нужно отключать или обходить некоторые middleware для стриминговых ответов. Например, можно не подключать GZipMiddleware, а сжимать данные вручную в генераторе (отдавать уже gzip‑байты и ставить Content-Encoding: gzip — но тогда вы не сможете строковый CSV просто так генерировать, тут уж сразу bytes).

3. Безопасность и ресурсы. Стриминг позволяет передавать большие объёмы, но не стоит злоупотреблять. Всегда думайте о нагрузке: если вы даёте выгрузку на миллион строк и у вас одновременно могут запросить это 10 пользователей, то в случае синхронного стриминга вы потратите 10 воркеров на долгое время. В случае асинхронного не заблокируете воркеры, но сама работа‑то делается: чтение из базы миллионов записей нагружает СУБД и CPU на сериализацию. Это может замедлить всю систему. Иногда имеет смысл внедрить ограничители, например, требовать специфических прав для таких выгрузок, ставить капчу (шутка) или как‑то предупреждать об их тяжести. Другой вариант генерировать файл в фоне и отдавать ссылку на него (но тогда и память нужна под хранение файла, и пользователю ждать). Каждая ситуация уникальна.

4. Контроль разъединения. При долгих стримах клиент может отвалиться (например, юзер закрыл браузер). Django предоставляет механизм отслеживать обрыв: у объекта ответа есть свойство streaming_content, при итерации можно ловить исключение RequestAborted. В документации есть пример обработки отключения клиента. В наших генераторах мы этого не делали, но вы можете завернуть генерацию в try/except. Обычно, если клиент ушёл, попытка отправить следующий chunk вызовет ConnectionError внутри, который Django обернёт. В принципе, достаточно в логах смотреть, если поток оборвался, Python‑приложение просто прекратит итерацию.

5. Выбор между FileResponse и StreamingHttpResponse. Если ваш большой файл уже существует на диске, и нужно лишь его отдать, не надо изобретать стриминг через yield. Django имеет FileResponse, оптимизированный для отдачи файлов. Он сам под капотом использует стриминг (через файловый дескриптор и отправку по частям). FileResponse хорош тем, что умеет работать через zero‑copy sendfile в некоторых серверах, или хотя бы через эффективное чтение большими блоками. Поэтому, если задача просто отдать уже готовый CSV/ZIP/видео — смело используйте FileResponse. StreamingHttpResponse нужен, когда вы генерируете содержимое динамически или читаете из источника, который не представлен одним файлом.

Надеюсь, и вам эти приёмы пригодятся в бою. Если у вас есть свои кейсы или вопросы по теме — добро пожаловать в комментарии. Спасибо за внимание и приятных случайностей.


Если вы работаете с Python и хотите системно углубить знания, обратите внимание на курс «Python Developer. Специализация». Программа построена вокруг реальных задач: проектирование архитектуры, работа с базами данных, оптимизация и тестирование кода.

Ближайшие открытые уроки:

Рост в IT быстрее с Подпиской — дает доступ к 3-м курсам в месяц по цене одного. Подробнее

Комментарии (0)