Это двадцать вторая часть серии мега-учебника Flask, в которой я собираюсь рассказать вам, как создавать фоновые задания, которые выполняются независимо от веб-сервера.

Оглавление

Эта глава посвящена реализации длительных или сложных процессов, которые необходимо запускать как часть приложения. Эти процессы не могут выполняться синхронно в контексте запроса, потому что это заблокировало бы ответ клиенту на время выполнения задачи. Я кратко затронул эту тему в главе 10, когда перенес отправку электронных писем в фоновые потоки, чтобы клиенту не приходилось ждать в течение 3-4 секунд, которые требуются для отправки электронного письма. Хотя использование потоков для электронных писем приемлемо, это решение плохо масштабируется, когда рассматриваемые процессы намного длиннее. Общепринятой практикой является передача длинных задач рабочему процессу или, что более вероятно, их пулу.

Чтобы обосновать необходимость выполнения длительных задач, я собираюсь представить функцию экспорта в Microblog, с помощью которой пользователи смогут запрашивать файл данных со всеми своими записями в блоге. Когда пользователь использует эту опцию, приложение запускает задачу экспорта, которая сгенерирует файл JSON со всеми публикациями пользователя, а затем отправит его пользователю по электронной почте. Все эти действия будут выполняться в рабочем процессе, и пока это происходит, пользователь увидит уведомление, показывающее процент завершения.

Ссылки на GitHub для этой главы: BrowseZipDiff.

Введение в очереди задач

Очереди задач предоставляют приложению удобное решение для выполнения задачи рабочим процессом. Рабочие процессы выполняются независимо от приложения и даже могут быть расположены в другой системе. Связь между приложением и рабочими элементами осуществляется через очередь сообщений. Приложение отправляет задание, а затем отслеживает его ход, взаимодействуя с очередью. На следующей диаграмме показана типичная реализация:

Самая популярная очередь задач для Python - это Celery. Это довольно сложный пакет, который имеет множество опций и поддерживает несколько очередей сообщений. Другая популярная очередь задач Python - это очередь Redis или просто RQ, которая жертвует некоторой гибкостью, например, поддерживает только очередь сообщений Redis, но в взамен ее настроить намного проще, чем Celery.

И Celery, и RQ идеально подходят для поддержки фоновых задач в приложении Flask, поэтому мой выбор в пользу этого приложения будет в пользу простоты RQ. Однако реализовать ту же функциональность с помощью Celery должно быть относительно легко. Если Celery интересует вас больше, чем RQ, вы можете прочитать статью о использовании Celery с Flask, которая есть в моем блоге.

Использование RQ

RQ - это стандартный пакет Python, который устанавливается с pip:

(venv) $ pip install rq
(venv) $ pip freeze > requirements.txt

Как я упоминал ранее, обмен данными между приложением и рабочими процессами RQ будет осуществляться в очереди сообщений Redis, поэтому вам необходимо запустить сервер Redis. Существует множество вариантов установки и запуска сервера Redis, от программы установки в один клик до загрузки исходного кода и его компиляции непосредственно в вашей системе. Если вы используете Windows, у Microsoft есть установщики здесь. В Linux вы, вероятно, можете получить его в виде пакета через диспетчер пакетов вашей операционной системы, а пользователи macOS могут запустить brew install redis, а затем запустить службу вручную с помощью команды redis-server.

Вам вообще не нужно будет взаимодействовать с Redis, кроме как убедиться, что служба запущена и доступна для RQ.

Обратите внимание, что RQ не запускается в собственном интерпретаторе Python для Windows. Если вы используете платформу Windows, вы можете запускать RQ только под эмуляцией Unix. Два уровня эмуляции Unix, которые я рекомендую пользователям Windows, - это Cygwin и подсистема Windows для Linux (WSL), и оба они совместимы с RQ.

Создание задачи

Я собираюсь показать вам, как запустить простую задачу через RQ, чтобы вы с ней ознакомились. Задача - это не что иное, как функция Python. Вот пример задачи, которую я собираюсь включить в новый модуль app/tasks.py:

app/tasks.py: Пример фоновой задачи.

import time

def example(seconds):
    print('Starting task')
    for i in range(seconds):
        print(i)
        time.sleep(1)
    print('Task completed')

Эта задача получает некоторое количество секунд в качестве аргумента, а затем ожидает это количество времени, печатая счетчик раз в секунду.

Запуск RQ Worker

Теперь, когда задача готова, рабочий процесс может приступить к работе. Это делается с помощью команды rq worker:

(venv) $ rq worker microblog-tasks
18:55:06 RQ worker 'rq:worker:miguelsmac.90369' started, version 0.9.1
18:55:06 Cleaning registries for queue: microblog-tasks
18:55:06
18:55:06 *** Listening on microblog-tasks...

Рабочий процесс теперь подключен к Redis и следит за любыми заданиями, которые могут быть назначены ему в очереди с именем microblog-tasks. В случаях, когда вы хотите, чтобы рабочие процессы имели большую пропускную способность, все, что вам нужно сделать, это запустить больше экземпляров rq worker, подключенных к одной очереди. Затем, когда задание появляется в очереди, любой из доступных рабочих процессов забирает его. В производственной среде вы, вероятно, захотите иметь по крайней мере столько рабочих процессов, сколько доступно ядер процессора.

Выполнение задач

Теперь откройте второе окно терминала и активируйте в нем виртуальную среду. Я собираюсь использовать сеанс командной оболочки для запуска задачи example() в рабочем процессе:

>>> from redis import Redis
>>> import rq
>>> queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://'))
>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.get_id()
'c651de7f-21a8-4068-afd5-8b982a6f6d32'

Класс Queue из RQ представляет очередь задач, видимую со стороны приложения. В качестве аргументов он принимает имя очереди и объект подключения Redis, который в данном случае я инициализирую URL-адресом по умолчанию. Если ваш сервер Redis запущен на другом хостинге или с другим номером порта, вам нужно будет использовать другой URL.

Метод enqueue() используется для добавления задания в очередь. Первый аргумент - это имя задачи, которую вы хотите выполнить, заданное непосредственно как объект функции или как строка импорта. Я нахожу опцию string намного более удобной, поскольку это избавляет от необходимости импортировать функцию на стороне приложения. Все оставшиеся аргументы, указанные в enqueue(), будут переданы функции, запущенной в рабочем процессе.

Как только вы выполните вызов enqueue(), вы заметите некоторую активность в первом окне вашего терминала, в котором запущен рабочий процесс RQ. Вы увидите, что функция example() теперь запущена и печатает счетчик один раз в секунду. В то же время ваш другой терминал не заблокирован, и вы можете продолжать работать в командной оболочке. В приведенном выше примере я вызвал метод job.get_id() для получения уникального идентификатора, присвоенного задаче. Еще одно интересное выражение, которое вы можете попробовать с объектом job, - это проверить, завершилось ли выполнение функции в объекте .:

>>> job.is_finished
False

Если вы передали 23 как я сделал в моем примере выше, то функция будет выполняться около 23 секунд. По истечении этого времени свойство job.is_finished примет значение True. Разве это не круто? Мне действительно нравится простота RQ.

После завершения работы функции рабочий модуль возвращается к ожиданию новых заданий, поэтому вы можете повторить вызов enqueue() с другими аргументами, если хотите поэкспериментировать еще. Данные, которые хранятся в очереди относительно задачи, останутся там на некоторое время (по умолчанию 500 секунд), но в конечном итоге будут удалены. Это важно, так как очередь задач не сохраняет историю выполненных заданий.

Отчет о ходе выполнения задачи

Пример задачи, который я использовал выше, нереально прост. Обычно для длительно выполняющейся задачи требуется, чтобы какая-то информация о ходе выполнения была доступна приложению, которое, в свою очередь, может показывать ее пользователю. RQ поддерживает это, используя атрибут meta объекта job. Позвольте мне переписать задачу example() на написание отчетов о проделанной работе:

app/tasks.py: Пример фоновой задачи с ходом выполнения.

import time
from rq import get_current_job

def example(seconds):
    job = get_current_job()
    print('Starting task')
    for i in range(seconds):
        job.meta['progress'] = 100.0 * i / seconds
        job.save_meta()
        print(i)
        time.sleep(1)
    job.meta['progress'] = 100
    job.save_meta()
    print('Task completed')

Эта новая версия example() использует функцию RQ get_current_job() для получения экземпляра задания, аналогичного тому, который возвращается приложению при отправке задания. Атрибут meta объекта job - это словарь, в который задача может записывать любые пользовательские данные, которые она хочет передать приложению. В этом примере я добавляю параметр progress, который представляет процент выполнения задачи. Каждый раз, когда обновляется ход выполнения, я вызываю job.save_meta(), чтобы дать указание RQ записать данные в Redis, где приложение сможет их найти.

На стороне приложения (в настоящее время это просто оболочка Python) я могу запустить эту задачу, а затем отслеживать прогресс следующим образом:

>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.meta
{}
>>> job.refresh()
>>> job.meta
{'progress': 13.043478260869565}
>>> job.refresh()
>>> job.meta
{'progress': 69.56521739130434}
>>> job.refresh()
>>> job.meta
{'progress': 100}
>>> job.is_finished
True

Как вы можете видеть выше, на этой стороне атрибут meta доступен для чтения. Для обновления содержимого из Redis необходимо вызвать метод refresh().

Представление задач в базе данных

В приведенном выше примере было достаточно запустить задачу и посмотреть, как она выполняется. Для веб-приложения все становится немного сложнее, потому что как только одна из этих задач запускается как часть запроса, этот запрос завершается, и весь контекст для этой задачи будет потерян. Поскольку я хочу, чтобы приложение отслеживало, какие задачи выполняет каждый пользователь, мне нужно использовать таблицу базы данных для поддержания некоторого состояния. Ниже вы можете увидеть реализацию модели Task:

app/models.py: Модель задач.

# ...
import redis
import rq

class User(UserMixin, db.Model):
    # ...
    tasks: so.WriteOnlyMapped['Task'] = so.relationship(back_populates='user')

# ...

class Task(db.Model):
    id: so.Mapped[str] = so.mapped_column(sa.String(36), primary_key=True)
    name: so.Mapped[str] = so.mapped_column(sa.String(128), index=True)
    description: so.Mapped[Optional[str]] = so.mapped_column(sa.String(128))
    user_id: so.Mapped[int] = so.mapped_column(sa.ForeignKey(User.id))
    complete: so.Mapped[bool] = so.mapped_column(default=False)

    user: so.Mapped[User] = so.relationship(back_populates='tasks')

    def get_rq_job(self):
        try:
            rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis)
        except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError):
            return None
        return rq_job

    def get_progress(self):
        job = self.get_rq_job()
        return job.meta.get('progress', 0) if job is not None else 100

Интересное отличие этой модели от предыдущих заключается в том, что поле id для первичного ключа представляет собой строку, а не целое число. Это потому, что для этой модели я не собираюсь полагаться на собственную генерацию первичного ключа базы данных, а вместо этого я собираюсь использовать идентификаторы заданий, сгенерированные RQ.

Модель будет хранить полное имя задачи (переданное в RQ), описание задачи, подходящее для показа пользователям, связь с пользователем, запросившим задачу, и логическое значение, указывающее, выполнена задача или нет. Назначение поля complete - отделять завершившиеся задачи от тех, которые активно выполняются, поскольку выполняемые задачи требуют специальной обработки для отображения обновлений хода выполнения.

Метод get_rq_job() - это вспомогательный метод, который загружает экземпляр RQ Job по заданному идентификатору задачи, который я могу получить из модели. Это делается с помощью Job.fetch(), которая загружает экземпляр Job на основе данных о нем, существующих в Redis. Метод get_progress() строится поверх get_rq_job() и возвращает процент выполнения задачи. У этого метода есть пара интересных предположений. Если идентификатор задания из модели не существует в очереди RQ, это означает, что задание уже завершено, а срок действия данных истек и они были удалены из очереди, поэтому в этом случае возвращаемый процент равен 100. С другой стороны, если задание существует, но нет информации, связанной с атрибутом meta, то можно с уверенностью предположить, что выполнение задания запланировано, но у него еще не было возможности запуститься, поэтому в этой ситуации 0 возвращается как прогресс.

Чтобы применить изменения к схеме базы данных, необходимо сгенерировать новую миграцию, а затем обновить базу данных:

(venv) $ flask db migrate -m "tasks"
(venv) $ flask db upgrade

Новую модель также можно добавить в контекст оболочки, чтобы сделать ее доступной в сеансах оболочки без необходимости ее импорта:

microblog.py: Добавление модели задачи в контекст оболочки.

import sqlalchemy as sa
import sqlalchemy.orm as so
from app import create_app, db
from app.models import User, Post, Message, Notification, Task

app = create_app()

@app.shell_context_processor
def make_shell_context():
    return {'sa': sa, 'so': so, 'db': db, 'User': User, 'Post': Post,
            'Message': Message, 'Notification': Notification, 'Task': Task}

Интеграция RQ с приложением Flask

URL-адрес подключения к службе Redis необходимо добавить в конфигурацию:

class Config:
    # ...
    REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'

Как всегда, URL-адрес подключения Redis будет получен из переменной окружения, и если переменная не определена, будет использоваться URL-адрес по умолчанию, который предполагает, что служба запущена на том же хосте и через порт по умолчанию.

Функция фабрики приложений будет отвечать за инициализацию Redis и RQ:

app/__init__.py: Интеграция с RQ.

# ...
from redis import Redis
import rq

# ...

def create_app(config_class=Config):
    # ...
    app.redis = Redis.from_url(app.config['REDIS_URL'])
    app.task_queue = rq.Queue('microblog-tasks', connection=app.redis)

    # ...

Объект app.task_queue будет очередью, в которую отправляются задачи. Прикрепление очереди к приложению удобно, потому что я могу использовать любое место в приложении для доступа к ней через current_app.task_queue. Чтобы упростить отправку или проверку задачи любой частью приложения, я могу создать несколько вспомогательных методов в модели User:

app/models.py: Вспомогательные методы выполнения задач в пользовательской модели.

# ...

class User(UserMixin, db.Model):
    # ...

    def launch_task(self, name, description, *args, **kwargs):
        rq_job = current_app.task_queue.enqueue(f'app.tasks.{name}', self.id,
                                                *args, **kwargs)
        task = Task(id=rq_job.get_id(), name=name, description=description,
                    user=self)
        db.session.add(task)
        return task

    def get_tasks_in_progress(self):
        query = self.tasks.select().where(Task.complete == False)
        return db.session.scalars(query)

    def get_task_in_progress(self, name):
        query = self.tasks.select().where(Task.name == name,
                                          Task.complete == False)
        return db.session.scalar(query)

Метод launch_task() заботится об отправке задачи в очередь RQ, а также о добавлении ее в базу данных. Аргументом name является имя функции, как она определена в app/tasks.py. При отправке в RQ функция добавляет к этому имени app.tasks., чтобы получить полное имя функции. Аргумент description - это понятное описание задачи, которое может быть представлено пользователям. Для функции, экспортирующей записи из блога, я установлю имя равным export_posts, а описание - Exporting posts.... Остальные аргументы - это позиционные и ключевые аргументы, которые будут переданы задаче. Функция начинается с вызова метода очереди enqueue() для отправки задания. Возвращаемый объект задания содержит идентификатор задачи, присвоенный RQ, поэтому я могу использовать его для создания соответствующего объекта Task в моей базе данных.

Обратите внимание, что launch_task() добавляет новый объект task в сеанс, но не выполняет фиксацию. В общем, лучше всего работать с сеансом базы данных в функциях более высокого уровня, поскольку это позволяет объединить несколько обновлений, выполненных функциями более низкого уровня, в одной транзакции. Это не строгое правило, и фактически, позже в этой главе вы увидите исключение, когда в дочерней функции выполняется фиксация.

Метод get_tasks_in_progress() возвращает полный список задач, которые необходимы пользователю. Позже вы увидите, что я использую этот метод для включения информации о запущенных задачах на страницы, которые отображаются пользователю.

Наконец, get_task_in_progress() это более простая версия предыдущей, которая возвращает конкретную задачу. Я запрещаю пользователям запускать две или более задач одного типа одновременно, поэтому перед запуском задачи я могу использовать этот метод, чтобы узнать, выполняется ли в данный момент предыдущая задача.

Отправка электронных писем из задачи RQ

Это может показаться отвлекающим маневром от основной темы, но я сказал выше, что по завершении задачи фонового экспорта пользователю будет отправлено электронное письмо с файлом JSON, содержащим все записи. Функциональность электронной почты, которую я построил в главе 11, необходимо расширить двумя способами. Во-первых, мне нужно добавить поддержку вложенных файлов, чтобы я мог прикреплять JSON-файл. Во-вторых, функция send_email() всегда отправляет электронные письма асинхронно, используя фоновый поток. Когда я собираюсь отправить электронное письмо из фоновой задачи, которая уже является асинхронной, наличие фоновой задачи второго уровня на основе потока имеет мало смысла, поэтому мне нужно поддерживать как синхронную, так и асинхронную отправку электронной почты.

К счастью, Flask-Mail поддерживает вложения, поэтому все, что мне нужно сделать, это расширить функцию send_email(), чтобы принимать их в качестве дополнительного аргумента, а затем настроить их в объекте Message. И чтобы при необходимости отправить электронное письмо в основном потоке, мне просто нужно добавить логический аргумент sync:

app/email.py: Отправка электронных писем с вложениями.

# ...

def send_email(subject, sender, recipients, text_body, html_body,
               attachments=None, sync=False):
    msg = Message(subject, sender=sender, recipients=recipients)
    msg.body = text_body
    msg.html = html_body
    if attachments:
        for attachment in attachments:
            msg.attach(*attachment)
    if sync:
        mail.send(msg)
    else:
        Thread(target=send_async_email,
               args=(current_app._get_current_object(), msg)).start()

Метод attach() класса Message принимает три аргумента, которые определяют вложение: имя файла, тип носителя и фактические данные файла. Имя файла - это просто имя, которое получатель увидит связанным с вложением, оно не обязательно должно быть реальным именем файла. Тип носителя определяет, что это за вложение, что помогает читателям электронной почты отображать его соответствующим образом. Например, если вы отправляете в качестве типа носителя image/png, программа чтения электронной почты будет знать, что вложение представляет собой изображение, и в этом случае она может отобразить его как таковое. Для файла содержащего записи из блога я собираюсь использовать формат JSON, в котором используется тип носителя application/json. Третий и последний аргумент - это строка или последовательность байтов с содержимым вложения.

Чтобы упростить, аргумент attachments в функции send_email() будет списком кортежей, и каждый кортеж будет содержать три элемента, которые соответствуют трем аргументам в attach(). Итак, для каждого элемента в этом списке мне нужно отправить кортеж в качестве аргументов в attach(). В Python, если у вас есть список или кортеж с аргументами, которые вы хотите отправить функции, вы можете использовать конструкцию func(*args), чтобы раскрыть этот список до фактического списка аргументов, вместо того, чтобы использовать более утомительный синтаксис, такой как func(args[0], args[1], args[2]). Итак, например, если у вас есть args = [1, 'foo'], вызов отправит два аргумента, такие же, как если бы вы вызывали func(1, 'foo'). Без * у вызова был бы единственный аргумент, которым был бы список.

Что касается синхронной отправки электронного письма, то мне нужно было просто вернуться к прямому вызову mail.send(msg), когда sync равен True.

Помощники по задачам

В то время как задача example(), которую я использовал выше, была простой автономной функцией, функции, которая экспортирует записи в блог, потребуются некоторые функциональные возможности, которые есть у меня в приложении, такие как доступ к базе данных и функция отправки электронной почты. Поскольку это будет выполняться в отдельном процессе, мне нужно инициализировать Flask-SQLAlchemy и Flask-Mail, которым, в свою очередь, нужен экземпляр приложения Flask, из которого можно получить их конфигурацию. Итак, я собираюсь добавить экземпляр приложения Flask и контекст приложения в верхней части модуля app/tasks.py:

app/tasks.py: Создание приложения и контекста.

from app import create_app

app = create_app()
app.app_context().push()

Приложение создано в этом модуле, потому что это единственный модуль, который рабочий процесс RQ собирается импортировать. Когда вы используете команду flask, модуль microblog.py в корневом каталоге создает приложение, но рабочий процесс RQ ничего об этом не знает, поэтому ему необходимо создать свой собственный экземпляр приложения, если это необходимо для выполнения задачи. Вы уже видели метод app.app_context() в нескольких местах, активация контекста делает приложение "текущим" экземпляром приложения, и это позволяет использовать расширения, такие как Flask-SQLAlchemy, current_app.config для получения их конфигурации. Без контекста выражение current_app возвращало бы ошибку.

Затем я начал думать о том, как я собираюсь сообщать о ходе выполнения этой функции. В дополнение к передаче информации о ходе выполнения через словарь job.meta, я хотел бы отправлять клиенту push-уведомления, чтобы процент выполнения мог обновляться динамически без необходимости обновления страницы пользователем. Для этого я собираюсь использовать механизмы уведомлений, которые я разработал в главе 21. Обновления будут работать очень похоже на значок непрочитанных сообщений. Когда сервер отрисовывает шаблон, он будет включать "статическую" информацию о ходе выполнения, полученную из job.meta, но затем, как только страница появится в браузере клиента, уведомления будут динамически обновлять процентное соотношение. Из-за уведомлений обновление хода выполнения запущенной задачи будет несколько сложнее, чем то, как я делал это в предыдущем примере, поэтому я собираюсь создать функцию-оболочку, предназначенную для обновления хода выполнения:

app/tasks.py: Настройка хода выполнения задачи.

from rq import get_current_job
from app import db
from app.models import Task

# ...

def _set_task_progress(progress):
    job = get_current_job()
    if job:
        job.meta['progress'] = progress
        job.save_meta()
        task = db.session.get(Task, job.get_id())
        task.user.add_notification('task_progress', {'task_id': job.get_id(),
                                                     'progress': progress})
        if progress >= 100:
            task.complete = True
        db.session.commit()

Задача экспорта может вызывать функцию _set_task_progress() для записи процента выполнения. Функция сначала записывает процентное значение в словарь job.meta и сохраняет его в Redis, затем загружает соответствующий объект задачи из базы данных и использует task.user для отправки уведомления пользователю, запросившему задачу, используя существующий метод add_notification(). Уведомлению будет присвоено имя task_progress, а данные, связанные с ним, будут представлять собой словарь с двумя элементами: идентификатором задачи и процентом выполнения. Позже я добавлю код JavaScript для работы с этим новым типом уведомлений.

Функция проверяет, указывает ли прогресс на то, что функция завершена, и в этом случае также обновляет атрибут complete объекта задачи в базе данных. Вызов фиксации базы данных гарантирует, что задача и объект уведомления, добавленные add_notification(), будут немедленно сохранены в базе данных. Мне нужно было быть очень осторожным при разработке родительской задачи, чтобы не вносить никаких изменений в базу данных, поскольку этот вызов фиксации изменений также запишет их.

Реализация задачи экспорта

Теперь все готово для написания функции экспорта. Высокоуровневая структура этой функции будет следующей:

app/tasks.py: Общая структура экспорта записей.

def export_posts(user_id):
    try:
        # read user posts from database
        # send email with data to user
    except Exception:
        # handle unexpected errors
    finally:
        # handle clean up

Зачем заключать всю задачу в блок try/except? Код приложения, существующий в обработчиках запросов, защищен от непредвиденных ошибок, потому что Flask сам улавливает исключения и затем обрабатывает их, соблюдая все обработчики ошибок и конфигурацию ведения журнала, которые я настроил для приложения. Однако эта функция будет выполняться в отдельном процессе, который управляется RQ, а не Flask, поэтому при возникновении каких-либо непредвиденных ошибок задача будет прервана, RQ выведет сообщение об ошибке на консоль, а затем вернется к ожиданию новых заданий. Таким образом, в принципе, если вы не просматриваете выходные данные рабочего процесса RQ или не записываете их в файл, вы никогда не обнаружите, что произошла ошибка.

Давайте начнем рассмотрение разделов, указанных в комментариях выше, с самых простых, которые касаются обработки ошибок и очистки в конце:

app/tasks.py: Обработка ошибок при экспорте записей.

import sys
# ...

def export_posts(user_id):
    try:
        # ...
    except Exception:
        _set_task_progress(100)
        app.logger.error('Unhandled exception', exc_info=sys.exc_info())
    finally:
        _set_task_progress(100)

Всякий раз, когда возникает непредвиденная ошибка, я собираюсь использовать объект logger из приложения Flask для регистрации ошибки вместе с трассировкой стека, информацией, которая предоставляется вызовом sys.exc_info(). Преимущество использования приложений Flask для регистрации ошибок заключается в том, что будут соблюдены все механизмы ведения журнала, которые вы реализовали для приложения Flask. Например, в главе 7 я настроил отправку ошибок на адрес электронной почты администратора. Просто используя app.logger, я получаю такое же поведение для этих ошибок. В блоке finally, который будет выполняться как при ошибочных, так и при успешных запусках, я отмечаю задачу как завершенную, устанавливая прогресс на 100%.

Далее я собираюсь написать фактический экспорт, который просто отправляет запрос к базе данных и циклически просматривает результаты, накапливая их в словаре:

app/tasks.py: Чтение сообщений пользователя из базы данных.

import time
from app.models import User, Post

# ...

def export_posts(user_id):
    try:
        user = db.session.get(User, user_id)
        _set_task_progress(0)
        data = []
        i = 0
        total_posts = db.session.scalar(sa.select(sa.func.count()).select_from(
            user.posts.select().subquery()))
        for post in db.session.scalars(user.posts.select().order_by(
                Post.timestamp.asc())):
            data.append({'body': post.body,
                         'timestamp': post.timestamp.isoformat() + 'Z'})
            time.sleep(5)
            i += 1
            _set_task_progress(100 * i // total_posts)

        # send email with data to user
    except Exception:
        # ...
    finally:
        # ...

Для каждого поста функция будет добавлять словарь с двумя элементами: телом поста и временем написания поста. Время будет указано в стандарте ISO 8601. Объекты Python datetime, которые я использую, не хранят часовой пояс, поэтому после экспорта времени в формат ISO я добавляю 'Z', который указывает UTC.

Код немного усложняется из-за необходимости отслеживать прогресс. Я поддерживаю счетчик i, и мне нужно выполнить дополнительный запрос к базе данных, прежде чем я войду в цикл, чтобы получить количество записей в total_posts. Используя i и total_posts, каждая итерация цикла может обновлять ход выполнения задачи значением от 0 до 100.

Возможно, вы заметили, что я также добавил вызов time.sleep(5) на каждой итерации цикла. Основная причина, по которой я добавил режим ожидания, заключается в том, чтобы продлить выполнение задачи экспорта и иметь возможность видеть прогресс, даже если экспорт охватывает всего несколько записей в блоге.

Ниже вы можете увидеть последнюю часть функции, которая отправляет пользователю электронное письмо со всей информацией, собранной в data в виде вложения:

app/tasks.py: Сообщения по электронной почте пользователю.

import json
from flask import render_template
from app.email import send_email

# ...

def export_posts(user_id):
    try:
        # ...

        send_email(
            '[Microblog] Your blog posts',
            sender=app.config['ADMINS'][0], recipients=[user.email],
            text_body=render_template('email/export_posts.txt', user=user),
            html_body=render_template('email/export_posts.html', user=user),
            attachments=[('posts.json', 'application/json',
                          json.dumps({'posts': data}, indent=4))],
            sync=True)
    except Exception:
        # ...
    finally:
        # ...

Это просто вызов функции send_email(). Вложение определяется как кортеж из трех элементов, которые затем передаются методу attach() объекта Flask-Mail Message. Третий элемент в кортеже - это содержимое вложения, которое генерируется с помощью функции Python json.dumps().

Здесь упоминается пара новых шаблонов, которые предоставляют содержимое тела электронного письма в виде обычного текста и HTML-формы. Вот текстовый шаблон:

app/templates/email/export_posts.txt: Текстовый шаблон электронного письма.

Dear {{ user.username }},

Please find attached the archive of your posts that you requested.

Sincerely,

The Microblog Team

Вот HTML-версия электронного письма:

app/templates/email/export_posts.html: HTML-шаблон электронного письма.

<p>Dear {{ user.username }},</p>
<p>Please find attached the archive of your posts that you requested.</p>
<p>Sincerely,</p>
<p>The Microblog Team</p>

Функция экспорта в приложении

Все основные компоненты для поддержки фоновых задач экспорта теперь на месте. Осталось подключить эту функциональность к приложению, чтобы приложение могло отправлять пользователям по электронной почте их публикации.

Ниже вы можете увидеть новую функцию просмотра export_posts:

app/main/routes.py: Маршрут экспорта записей и функция просмотра.

@bp.route('/export_posts')
@login_required
def export_posts():
    if current_user.get_task_in_progress('export_posts'):
        flash(_('An export task is currently in progress'))
    else:
        current_user.launch_task('export_posts', _('Exporting posts...'))
        db.session.commit()
    return redirect(url_for('main.user', username=current_user.username))

Функция сначала проверяет, есть ли у пользователя невыполненная задача экспорта, и в этом случае просто выдает сообщение. На самом деле нет смысла иметь две задачи экспорта для одного и того же пользователя одновременно, поэтому это предотвращается. Я могу проверить наличие этого условия, используя метод get_task_in_progress(), который я реализовал ранее.

Если пользователь еще не запустил экспорт, то вызывается функция launch_task(). Первый аргумент - это имя функции, которая будет передана рабочему процессу RQ, с префиксом app.tasks.. Второй аргумент - это просто понятное текстовое описание, которое будет показано пользователю. Оба значения записываются в объект Task в базе данных. Функция завершается перенаправлением на страницу профиля пользователя.

Теперь мне нужно предоставить ссылку на этот маршрут, к которому пользователь может получить доступ для запроса экспорта. Я думаю, что наиболее подходящее место для этого - страница профиля пользователя, где ссылка может быть показана только тогда, когда пользователи просматривают свою собственную страницу, прямо под ссылкой "Редактировать свой профиль":

app/templates/user.html: Ссылка на экспорт на странице профиля пользователя.

...
<p>
    <a href="{{ url_for('main.edit_profile') }}">
        {{ _('Edit your profile') }}
    </a>
</p>
{% if not current_user.get_task_in_progress('export_posts') %}
<p>
     <a href="{{ url_for('main.export_posts') }}">
        {{ _('Export your posts') }}
    </a>
</p>
...
{% endif %}

Эта ссылка привязана к условию, потому что я не хочу, чтобы она появлялась, когда пользователь уже выполняет экспорт.

На этом этапе фоновые задания должны работать, но без предоставления какой-либо обратной связи пользователю. Если вы хотите попробовать это, вы можете запустить приложение и рабочий процесс RQ следующим образом:

  • Убедитесь, что у вас запущен Redis

  • В первом окне терминала запустите один или несколько экземпляров рабочих процессов RQ. Для этого вы должны использовать команду rq worker microblog-tasks

  • Во втором окне терминала запустите приложение Flask с помощью flask run (не забудьте сначала установить FLASK_APP)

Уведомления о ходе выполнения

Чтобы завершить работу над этой функцией, я хочу информировать пользователя о выполнении фоновой задачи, включая процент завершения. Просматривая параметры компонента Bootstrap, я решил использовать для этого оповещение под панелью навигации. Оповещения - это цветные горизонтальные полосы, которые отображают информацию для пользователя. Синии окна уведомлений я использую для отображения всплывающих сообщений. Теперь я собираюсь добавить зеленые, чтобы показывать статус выполнения. Ниже вы можете увидеть, как это будет выглядеть:

Следующая глава => app/templates/base.html: Экспорт оповещения о ходе выполнения в базовый шаблон.

...
{% block content %}
    <div class="container">
        {% if current_user.is_authenticated %}
        {% with tasks = current_user.get_tasks_in_progress() %}
        {% if tasks %}
            {% for task in tasks %}
            <div class="alert alert-success" role="alert">
                {{ task.description }}
                <span id="{{ task.id }}-progress">{{ task.get_progress() }}</span>%
            </div>
            {% endfor %}
        {% endif %}
        {% endwith %}
        {% endif %}
        ...
{% endblock %}
...

Метод отображения уведомлений о задачах практически идентичен отображаемым сообщениям. Внешнее условие пропускает всю разметку, связанную с уведомлением, когда пользователь не вошел в систему. Для зарегистрированных пользователей я получаю список текущих задач, вызывая метод get_tasks_in_progress(), который я создал ранее. В текущей версии приложения я получу максимум один результат, поскольку я не разрешаю более одного активного экспорта одновременно, но в будущем я, возможно, захочу поддерживать другие типы задач, которые могут сосуществовать, поэтому написание этого в общем виде может сэкономить мне время позже.

Для каждой задачи я добавляю на страницу элемент оповещения. Цвет оповещения регулируется с помощью второго стиля CSS, которым в данном случае является alert-success, в то время как в случае с отображаемыми сообщениями был alert-info. В документации по начальной загрузке содержатся подробные сведения о структуре HTML для оповещений. Текст уведомления находится в поле description, сохраненное в модели Task, за которым следует процент выполнения.

Процентное значение заключено в элемент <span>, который имеет атрибут id. Причина этого в том, что я собираюсь обновлять процентное значение из JavaScript при получении уведомлений. Идентификатор, который я использую для данной задачи, создается как идентификатор задачи с добавлением в конце -progress. Когда придет уведомление, оно будет содержать идентификатор задачи, поэтому я могу легко найти правильный элемент <span> для обновления с помощью селектора для #<task.id>-progress.

Если вы попробуете приложение на этом этапе, вы будете видеть "статические" обновления хода выполнения при каждом переходе на новую страницу. Вы заметите, что после запуска задачи экспорта вы можете свободно перемещаться по разным страницам приложения, и состояние запущенной задачи всегда вызывается в памяти.

Чтобы подготовиться к применению динамических обновлений к элементам <span>, я собираюсь написать небольшую вспомогательную функцию на стороне JavaScript:

app/templates/base.html: Вспомогательная функция для динамического обновления хода выполнения задачи.

...
{% block scripts %}
    ...
    <script>
        ...
        function set_task_progress(task_id, progress) {
            const progressElement = document.getElementById(task_id + '-progress');
            if (progressElement) {
                progressElement.innerText = progress;
            }
        }
    </script>
    ...
{% endblock %}

Эта функция принимает идентификатор задачи id и значение прогресса, использует DOM API в браузере, чтобы найти элемент <span> для этой задачи, и если элемент существует, она записывает новый прогресс в качестве своего нового содержимого.

Уведомления уже поступают в браузер, потому что функция  _set_task_progress() в app/tasks.py вызывает add_notification() каждый раз, когда обновляется ход выполнения. Если вас смущает, как эти уведомления могли попадать в браузер без каких-либо действий со стороны меня, то на самом деле это потому, что в главе 21 я поступил мудро, реализовав функцию уведомлений совершенно универсальным способом. Любые уведомления, добавленные с помощью метода add_notification(), будут видны браузеру, когда он периодически запрашивает у сервера обновления уведомлений.

Но код JavaScript, обрабатывающий эти уведомления, распознает только те, у которых имя unread_message_count, и игнорирует остальные. Что мне нужно сделать сейчас, так это расширить эту функцию, чтобы она также обрабатывала уведомления task_progress, вызвав функцию set_task_progress(), которую я определил выше. Вот обновленная версия цикла, который обрабатывает уведомления из JavaScript.:

app/templates/base.html: Обработчик уведомлений.

for (let i = 0; i < notifications.length; i++) {
  switch (notifications[i].name) {
    case 'unread_message_count':
      set_message_count(notifications[i].data);
      break;
    case 'task_progress':
      set_task_progress(notifications[i].data.task_id,
          notifications[i].data.progress);
      break;
  }
  since = notifications[i].timestamp;
}

Теперь, когда мне нужно обрабатывать два разных уведомления, я решил заменить инструкцию if, которая проверяет название уведомления unread_message_count, в инструкции switch, которая содержит по одному разделу для каждого из уведомлений, которые мне теперь нужно поддерживать. Если вы не очень хорошо знакомы с языками семейства "C", возможно, вы раньше не встречали операторов switch. Они обеспечивают удобный синтаксис, который заменяет длинную цепочку операторов if/elseif. Это очень удобно, потому что, поскольку мне нужно поддерживать больше уведомлений, я могу просто продолжать добавлять их в виде дополнительных блоков case.

Если вы помните, данные, которые RQ прикрепляет к уведомлению task_progress, представляют собой словарь с двумя элементами, task_id и progress, которые являются двумя аргументами, которые мне нужно использовать для вызова set_task_progress().

Если вы запустите приложение сейчас, индикатор выполнения в зеленом окне оповещения будет обновляться каждые 10 секунд по мере доставки уведомлений клиенту.

Поскольку в этой главе я ввел новые переводимые строки, файлы перевода необходимо обновить. Если вы поддерживаете файл не на английском языке, вам необходимо использовать Flask-Babel для обновления ваших файлов переводов, а затем добавления новых переводов:

(venv) $ flask translate update

Если вы используете перевод на испанский, то я выполнил работу по переводу за вас, поэтому вы можете просто извлечь файл app/translations/es/LC_MESSAGES/messages.po из пакета загрузки для этой главы и добавить его в свой проект.

После завершения переводов вам необходимо скомпилировать файлы переводов:

(venv) $ flask translate compile

Рекомендации по развертыванию

Чтобы завершить эту главу, я хочу обсудить, как меняется развертывание приложения. Для поддержки фоновых задач я добавил в стек два новых компонента, сервер Redis и одного или нескольких рабочих процессов RQ. Очевидно, что они должны быть включены в вашу стратегию развертывания, поэтому я собираюсь кратко рассказать о различных вариантах развертывания, которые я рассматривал в предыдущих главах, и о том, как на них влияют эти изменения.

Развертывание на сервере Linux

Если вы запускаете свое приложение на сервере Linux, добавление Redis должно быть таким же простым, как установка этого пакета из вашей операционной системы. Для Ubuntu Linux вам необходимо запустить sudo apt-get install redis-server.

Чтобы запустить рабочий процесс RQ, вы можете следовать разделу "Настройка Gunicorn и Supervisor" в главе 17, чтобы создать вторую конфигурацию Supervisor, в которой вы запускаете rq worker microblog-tasks вместо gunicorn. Если вы хотите запустить более одного рабочего процесса (и, вероятно, вам следует это сделать для производства), вы можете использовать директиву супервизора numprocs, чтобы указать, сколько экземпляров вы хотите запустить одновременно.

Развертывание на Heroku

Для развертывания приложения на Heroku вам потребуется добавить службу Redis в свою учетную запись. Это похоже на процесс, который я использовал для добавления базы данных Postgres. В Redis также есть бесплатный уровень, который можно добавить с помощью следующей команды:

$ heroku addons:create heroku-redis:hobby-dev

URL-адрес доступа к вашей новой службе redis будет добавлен в вашу среду Heroku в качестве переменной REDIS_URL, чего и ожидает приложение.

Бесплатный тарифный план в Heroku допускает использование одного веб-модуля и одного рабочего модуля, так что вы можете разместить один рабочий процесс rq вместе с вашим приложением без каких-либо затрат. Для этого вам нужно будет объявить рабочий процесс в отдельной строке в вашем procfile:

web: flask db upgrade; flask translate compile; gunicorn microblog:app
worker: rq worker -u $REDIS_URL microblog-tasks

После развертывания с этими изменениями вы можете запустить рабочий процесс с помощью следующей команды:

$ heroku ps:scale worker=1

Развертывание в Docker

Если вы развертываете приложение в контейнерах Docker, то сначала вам нужно создать контейнер Redis в той же сети, которую использует приложение. Для этого вы можете использовать один из официальных образов Redis из реестра Docker:

$ docker run --name redis -d -p 6379:6379 --network microblog-network redis:latest

При запуске вашего приложения вам нужно будет установить переменную среды REDIS_URL, аналогично тому, как обрабатывался контейнер MySQL. Вот полная команда для запуска приложения, включая ссылку redis:

$ docker run --name microblog -d -p 8000:5000 --rm -e SECRET_KEY=my-secret-key \
    -e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \
    -e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \
    --network microblog-network \
    -e DATABASE_URL=mysql+pymysql://microblog:<database-password>@mysql/microblog \
    -e REDIS_URL=redis://redis:6379/0 \
    microblog:latest

Наконец, вам нужно будет запустить один или несколько контейнеров для рабочих процессов RQ. Поскольку рабочие процессы основаны на том же коде, что и основное приложение, вы можете использовать тот же образ контейнера, который используете для своего приложения, переопределив команду запуска, чтобы вместо веб-приложения запускался рабочий процесс. Вот пример команды docker run, которая запускает рабочий процесс:

$ docker run --name rq-worker -d --rm -e SECRET_KEY=my-secret-key \
    -e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \
    -e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \
    --network microblog-network \
    -e DATABASE_URL=mysql+pymysql://microblog:<database-password>@mysql/microblog \
    -e REDIS_URL=redis://redis:6379/0 \
    --entrypoint venv/bin/rq \
    microblog:latest worker -u redis://redis:6379/0 microblog-tasks

Переопределить команду запуска образа Docker по умолчанию немного сложно, потому что команду нужно давать в двух частях. Аргумент --entrypoint принимает только имя исполняемого файла, но аргументы (если они есть) должны быть указаны после изображения и тега в конце командной строки. Обратите внимание, что rq необходимо указать как venv/bin/rq, чтобы он работал без активации виртуальной среды.

Следующая глава =>

Комментарии (0)