Команда Python for Devs подготовила перевод статьи о новых фоновых задачах в Django 6.0. Фреймворк наконец получил встроенный API для очередей задач — но без воркеров, так что чудес пока ждать рано. Автор показывает, зачем это обновление всё равно важно, как оно работает внутри и что можно построить поверх него уже сегодня.
В Django 6.0 появился встроенный фреймворк фоновых задач — django.tasks. Но пока что не стоит ждать, что вы сможете отказаться от Celery, Huey или любых других привычных решений.
В релиз-ноутсах это подчеркнуто напрямую:
Django умеет создавать и ставить задачи в очередь, но не предоставляет механизма воркеров для их выполнения. Исполнение должно обеспечиваться внешней инфраструктурой — отдельным процессом или сервисом.
Основная цель нового модуля django.tasks — дать единый API для реализации очередей задач. Инициатор этого улучшения — Джейк Ховард. Почитайте его введение на форуме Django.
Его референсная реализация, одновременно являющаяся бэкпортом для более ранних версий Django, доступна на GitHub как django-tasks.
Но мы это пока оставим и поиграем с минимальной версией, которая включена в Django 6.0. Напишем собственный бэкенд и воркер.
Наш проект: уведомления
Мы создадим приложение, отправляющее уведомления на телефоны и другие устройства с помощью ntfy.sh. (Я большой фанат этой штуки!)
Если хотите сразу нырнуть в код — финальный вариант проекта лежит на GitHub.
Чтобы отправить уведомление на телефон через ntfy, нужно всего лишь:
Создать топик
Установить приложение на телефон и войти
Отправлять HTTP-запросы на
https://ntfy.sh/<yourtopic>
Бесплатная версия предоставляет только публичные топики и сообщения. То есть любой человек, подписавшийся на топик, увидит всё, что вы туда отправляете. Для наших целей мы просто создадим топик со случайным именем — например, UUID.
Настройки проекта ожидают URL из шага 4 в переменной окружения. Например:
NTFY_URL=https://ntfy.sh/062519693d9c4913826f0a39aeea8a4c
Вот наша функция, которая делает всю работу:
import httpx
from django.conf import settings
def send_notification(message: str, title: str | None):
# Pass the title if specified.
headers = {"title": title} if title else {}
httpx.post(
settings.NTFY_URL,
content=message,
headers=headers,
)
Вот и всё — после этого можно начинать отправлять и получать уведомления.
Краткое введение
Стоит пробежать глазами документацию Django по Task framework — но мы немного сэкономим вам время и сделаем короткое введение.
Определение задачи
Главная цель фреймворка — давать способ определять задачи с помощью стандартного API Django, без специфичных декораторов конкретной очереди или сторонних приёмов.
Вот так:
# ...
from django.tasks import task
@task
def send_notification(message: str, title: str | None):
# ...как раньше
Теперь наша функция — задача. Точнее, это django.tasks.Task.
Вы больше не можете вызывать send_notification напрямую. Задачи запускаются только через метод enqueue. Это может быть не тем поведением, которое вы ожидаете, но так задумано: это предотвращает случайный запуск задачи в текущем процессе вместо фонового исполнения.
Декоратор task позволяет указать приоритет задачи, имя очереди и имя бэкенда. Эти настройки можно переопределить методом using, который возвращает новый экземпляр django.tasks.Task.
Если нужен больший контроль, можно указать takes_context=True в декораторе и добавить context первым аргументом. Контекст даёт доступ к результату задачи, включая полезную информацию вроде числа попыток.
Нет способа задать число ретраев, бэкоффы и прочие штуки, которые вы ждёте от полноценной очереди задач. Но это и не задача django.tasks. При необходимости вы можете сами реализовать логику повторных попыток, изучая контекст задачи.
Добавление задачи в очередь
Просто:
task_result = send_notification.enqueue(
message="Season's greeting!",
title="Santa has something to tell you"
)
Выполнение задачи
Тут начинаются ограничения. Django 6.0 поставляется с ImmediateBackend и DummyBackend. Первый выполняет задачу сразу же, второй — не выполняет вовсе.
Поэтому в нашем проекте есть (демо) бэкенд на базе базы данных и воркер-процесс!
Получение результата
Если вы не ждёте результат сразу, можно запросить его позже по id. Просто вызовите get_result(result_id) у задачи.
В проекте есть view, который с помощью htmx периодически опрашивает статус выполнения.
UI проекта
Список под формой показывает результаты запусков нашей задачи. При отправке формы новый результат появляется сверху. Htmx продолжает опрашивать обновления, пока статус результата не станет FAILED или SUCCESSFUL.
def task_result(request, result_id, status):
result = send_notification.get_result(result_id)
if result.status == status:
# No need to swap the result.
return HttpResponse(status=204)
return TemplateResponse(request, "index.html#result", {"result": result})
Почему index.html#results? В Django 6.0 появились template partials. В этом случае view отправляет только частичный шаблон с именем result.
Под капотом
Когда вы декорируете функцию как task, вызывается task_class, указанный в конфигурации бэкенда. По умолчанию это django.task.Task.
Его метод enqueue вызывает метод enqueue сконфигурированного бэкенда.
Его метод get_result делает то же самое — вызывает соответствующий метод бэкенда.
Поскольку воркеров нет, это практически всё, что обязан делать бэкенд задач. Отлично — давайте добавим свой.
Бэкенд задач на базе БД
Наши цели:
простой бэкенд задач на базе БД
поддержка «автоматических» ретраев
Методы enqueue и get_result должны возвращать django.tasks.TaskResult. Это определяет минимальный набор данных, которые нужно хранить. Сохранять будем в модели Task.
Модели
Черновой вариант модели Task на основе свойств TaskResult и Task из django.tasks:
class Task(models.Model):
priority = models.IntegerField(default=0)
callable_path = models.CharField(max_length=255)
backend = models.CharField(max_length=200)
queue_name = models.CharField(max_length=100)
run_after = models.DateTimeField(null=True, blank=True)
takes_context = models.BooleanField(default=False)
# Stores args and kwargs
arguments = models.JSONField(null=True, blank=True)
status = models.CharField(
choices=TaskResultStatus.choices, max_length=10, default=TaskResultStatus.READY
)
enqueued_at = models.DateTimeField()
started_at = models.DateTimeField(blank=True, null=True)
finished_at = models.DateTimeField(blank=True, null=True)
last_attempted_at = models.DateTimeField(blank=True, null=True)
return_value = models.JSONField(null=True, blank=True)
Чего не хватает? TaskResult включает список ошибок и id воркеров, которые обрабатывали задачу. Можно было бы проигнорировать.
Но TaskResult.attempts основан на количестве worker id. И если вы используете контекст задачи, вы наверняка полагаетесь на эту информацию.
Можно добавить эти данные в модель, используя JSONField, как сделано в референсной реализации.
Но мы пойдём явным путём и зададим модели и для этих сущностей. Мы будем записывать каждую попытку выполнения задачи и её ошибку, связывая их с задачей:
class Error(models.Model):
exception_class_path = models.TextField()
traceback = models.TextField()
class AttemptResultStatus(TextChoices):
FAILED = TaskResultStatus.FAILED
SUCCESSFUL = TaskResultStatus.SUCCESSFUL
class Attempt(models.Model):
task = models.ForeignKey(Task, related_name="attempts", on_delete=models.CASCADE)
error = models.OneToOneField(
Error, related_name="attempt", on_delete=models.CASCADE, null=True, blank=True
)
worker_id = models.CharField(max_length=MAX_LENGTH_WORKER_ID)
started_at = models.DateTimeField()
stopped_at = models.DateTimeField(blank=True, null=True)
status = models.CharField(
choices=AttemptResultStatus.choices, max_length=10, blank=True
)
Так у нас есть вся информация для выполнения задачи и полный набор данных для формирования TaskResult.
Но есть и требования со стороны воркера:
быстро получать список доступных задач
забирать одну из них
выполнять и отмечать результат
Можно сделать и на текущей модели, но слегка улучшим структуру:
class Task(models.Model):
# ...
available_after = models.DateTimeField()
attempt_count = models.IntegerField(default=0)
worker_id = models.CharField(max_length=MAX_LENGTH_WORKER_ID, blank=True)
# ...
available_after — время, когда задачу можно запускать. Если run_after указан, используем его. Иначе — текущее UTC-время.
Для ретраев available_after обновляется на следующий момент времени. То есть мы можем делать бэкофф.
attempt_count упрощает поиск задач. Если attempt_count превышает максимальное число попыток — задачу можно игнорировать.
worker_id заполняется при закреплении задачи за воркером.
Постановка в очередь и получение результата
Поставить задачу в очередь просто: создать экземпляр Task из dataclass Task, сохранить — готово. ID TaskResult — строковое значение первичного ключа модели.
Получение результата — загрузить задачу и попытки, собрать TaskResult.
Упрощённая версия нашего бэкенда:
class DatabaseBackend(BaseTaskBackend):
supports_defer = True
supports_async_task = False
supports_get_result = True
supports_priority = True
def enqueue(self, task: Task, args, kwargs):
self.validate_task(task)
model = self.queue_store.enqueue(task, args, kwargs)
task_result = TaskResult(
task=task,
id=str(model.pk),
# ...
)
return task_result
def get_result(self, result_id):
return self.model_to_result(
self.queue_store.get(result_id)
)
def model_to_result(self, model: models.Task) -> TaskResult:
...
Большая часть логики вынесена в queue_store. Давайте сначала разберём конфигурацию.
Конфигурация
Нам нужны параметры по умолчанию:
максимальное число попыток
коэффициент бэкоффа —
math.pow(factor, attempts)
Они могут настраиваться для каждой очереди. Получается что-то вроде:
TASKS = {
"default": {
"BACKEND": "messagecenter.dbtasks.backend.DatabaseBackend",
"OPTIONS": {
"queues": {
"low_priority": {
"max_attempts": 5,
}
},
"max_attempts": 10,
"backoff_factor": 3,
"purge": {"finished": "10 days", "unfinished": "20 days"},
},
}
}
Задачи low_priority будут иметь 5 попыток и коэффициент бэкоффа 3. Остальные — 10 попыток.
Queue store
QueueStore — вспомогательный класс нашего бэкенда. Он отвечает за:
получение задач
постановку в очередь
проверку доступных задач
закрепление задач
И главное — упрощает работу воркера: у воркера своя копия QueueStore, ограниченная нужными очередями.
Воркер
Задача воркера — сообщать раннеру о наличии задач и запускать их обработку через бэкенд. Выглядит так:
class Worker:
def __init__(...):
# ...
def has_more(self) -> bool:
return self.queue_store.has_more()
def process(self):
with transaction.atomic():
tm = self.queue_store.claim_first_available(worker_id=self.id)
if tm is not None:
self.backend.process_task(tm)
Чтобы получить работающего воркера, нужно:
Создать экземпляр воркера
Спросить
has_moreЕсли есть задачи — выполнить process
Если нет — подождать
Повторить
Команда dbtasks_worker делает именно это.
Захват задачи
QueueStore предоставляет peek, который возвращает id наиболее «срочной» задачи. Затем мы пытаемся взять задачу по этому id.
Упрощённая версия claim_first_available:
def claim_first_available(self, worker_id: str, attempts: int = 3):
qs = models.Task.filter(
worker_id="",
status=TaskResultStatus.READY,
)
for _ in range(attempts):
task_id = self.peek()
if not task_id:
return None
count = qs.filter(pk=task_id).update(
worker_id=self.id_,
status=TaskResultStatus.RUNNING,
)
if count:
return models.Task.objects.get(pk=task_id)
return None
Если count == 0 — задачу успел забрать другой воркер. Тогда пробуем снова.
Обработка задачи
process_task делает следующее:
создаёт
AttemptиTaskResultвыполняет задачу, ловит
BaseExceptionили сохраняет результатотмечает успех или неудачу
при неудаче проверяет, можно ли повторить
Если хотите детали — смотрите репозиторий.
Вот и всё
Этот демо-проект не затрагивает всё, о чём стоит серьёзно подумать: сигналы для воркера, транзакции в БД… Всё это возможно, просто не входило в цель статьи.
Появление django.tasks наверняка вызовет появление новых библиотек или адаптеров для существующих очередей. И, конечно, скоро кто-нибудь скажет, что django.tasks слишком ограничен.
И правда: если вы используете продвинутые возможности своей очереди задач, вам тут многого может не хватить.
Сложная оркестрация
Некоторые очереди задач, например Celery, позволяют комбинировать задачи: прогонять список задач, передавать результат одной в другую и т.д.
django.tasks откровенно не про это — и это нормально. Унифицировать такой API невозможно. Я сам хлебнул проблем с библиотеками, утверждающими, что умеют это.
Ретреи
Как уже упоминалось, автоматические ретреи есть только если ваш бэкенд сам это умеет. Как наш.
Можно сделать и вручную, например через декоратор:
def retry(func):
@functools.wraps(func)
def wrapper(context: TaskContext, *args, **kwargs):
try:
return func(context, *args, **kwargs)
except BaseException as e:
result = context.task_result
backoff = math.pow(2, result.attempts)
run_after = datetime.now(tz=UTC) + timedelta(seconds=backoff)
result.task.using(run_after=run_after).enqueue(*args, **kwargs)
raise e
return wrapper
@task(takes_context=True)
@retry
def send_email(context: TaskContext, to: str, subject: str, body: str):
# Do your thing
...
Настоящий механизм воркеров
Да, в Django его нет. Но референсная реализация предоставляет вполне рабочих воркеров. Потерпите — или, ещё лучше, помогите проекту!
Идеальных решений нет
Думаю, django.tasks со временем покроет как минимум 80% типовых сценариев. Да, его API простой и ограниченный — но по мне это плюс, а не минус. Это, пожалуй, максимально близкий к стандартизированному подход.
Русскоязычное сообщество про Python

Друзья! Эту статью подготовила команда Python for Devs — канала, где каждый день выходят самые свежие и полезные материалы о Python и его экосистеме. Подписывайтесь, чтобы ничего не пропустить!