Микросервисы – это парадигма, где приложение разбивается на небольшие независимые компоненты, каждый из которых отвечает за конкретную функцию. Это как отделы в офисе, каждый офис – это отдельный сервис, который может быть разработан, масштабирован и развернут независимо.

Почему асинхронность так важна для наших микросервисов? Представьте себе множество людей, ожидающих в лифте – каждый из них хочет двигаться своим темпом, и никто не хочет ждать, когда лифт подойдет к нужному этажу. Так и в мире микросервисов – каждый сервис может заниматься своей задачей, не блокируя другие. Асинхронность позволяет нам этим заниматься: вместо того чтобы ждать ответа от одного сервиса, мы можем отправить запрос другому и эффективно использовать время, пока ждем ответа.

Асинхронность в микросервисах позволяет нам связывать компоненты приложения так, чтобы они работали в гармонии, минимизируя задержки и максимизируя эффективность.

Асинхронное программирование в Python

Ключевыми инструментами асинхронного программирования в Python являются ключевые слова async и await. Когда мы объявляем функцию с использованием ключевого слова async, мы говорим Python, что это асинхронная функция и она может быть приостановлена во время выполнения.

Мы также используем оператор await внутри асинхронной функции для "ожидания" завершения асинхронной операции. В этот момент управление переходит к другим асинхронным задачам, пока выполняется операция, на которой мы ждем.

Примеры асинхронного программирования на Python:

  1. Асинхронные сетевые запросы с использованием библиотеки aiohttp:

import aiohttp
import asyncio

async def fetch_data(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = [
        "https://api.example.com/data/1",
        "https://api.example.com/data/2",
        "https://api.example.com/data/3"
    ]
    tasks = [fetch_data(url) for url in urls]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)

asyncio.run(main())

В этом примере мы используем библиотеку aiohttp для выполнения асинхронных HTTP-запросов к нескольким URL-адресам. Когда мы вызываем asyncio.gather(*tasks), все запросы выполняются параллельно, и после того, как все они завершатся, мы получаем результаты.

  1. Асинхронная обработка больших объемов данных с помощью библиотеки aiomultiprocess:

import aiomultiprocess

async def process_data(data_chunk):
    # Выполняем длительную обработку данных
    # и возвращаем результат
    return processed_data

async def main():
    data = [...]  # Большой объем данных
    chunk_size = 1000
    chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

    async with aiomultiprocess.Pool() as pool:
        results = await pool.map(process_data, chunks)

    # Объединяем результаты обработки данных
    final_result = merge_results(results)
    print(final_result)

asyncio.run(main())

В этом примере мы используем библиотеку aiomultiprocess для асинхронной обработки большого объема данных. Мы разбиваем данные на части и обрабатываем каждую часть параллельно в нескольких процессах. Затем мы объединяем результаты обработки данных для получения конечного результата.

Фреймворки и библиотеки для асинхронных микросервисов

Конечно, давайте глубже погрузимся в асинхронные фреймворки и библиотеки на Python, и рассмотрим примеры их использования более подробно.

Асинхронные фреймворки: FastAPI и Sanic

FastAPI - это настоящая жемчужина среди асинхронных фреймворков. Он разработан с учетом современных стандартов и требований, предоставляя разработчикам удивительный опыт в создании веб-сервисов. Проектирование API в FastAPI – это настоящее удовольствие благодаря его декларативному подходу. Декораторы позволяют описать параметры запроса, возвращаемые значения и обработчики событий очень легко и интуитивно понятно.

Например, создание простого веб-сервиса для конвертации валюты с использованием FastAPI может выглядеть так:

from fastapi import FastAPI

app = FastAPI()

@app.get("/convert")
async def convert_currency(amount: float, from_currency: str, to_currency: str):
    # Здесь можно реализовать логику конвертации валюты
    converted_amount = ...
    return {"converted_amount": converted_amount}

FastAPI также активно использует типовые подсказки Python (Type Hints) для автоматической валидации запросов и генерации интерактивной документации. Например, если вы передадите несоответствующий тип данных, FastAPI предупредит вас об этом, что спасает от многих потенциальных ошибок.

Sanic – это еще один асинхронный фреймворк, который ставит скорость и производительность в центр своей сути. Его асинхронная природа и асинхронные обработчики запросов позволяют обрабатывать большое количество одновременных подключений без значительных потерь в производительности. Sanic также предоставляет гибкость в работе с HTTP-запросами и поддерживает асинхронные операции, что делает его прекрасным выбором для создания быстрых и отзывчивых веб-приложений.

Использование библиотеки asyncio

Библиотека asyncio – это must have для асинхронного программирования в Python. Ее инструменты позволяют создавать асинхронные функции и корутины, которые могут быть объединены в асинхронные задачи для параллельного выполнения.

Пример использования asyncio для выполнения асинхронных операций с базой данных:

import asyncio
import asyncpg

async def fetch_data_from_db():
    conn = await asyncpg.connect(user='user', password='password',
                                 database='mydb', host='127.0.0.1')
    data = await conn.fetch('SELECT * FROM mytable')
    await conn.close()
    return data

async def main():
    result = await fetch_data_from_db()
    print(result)

asyncio.run(main())

В этом примере мы создаем асинхронную функцию fetch_data_from_db(), которая асинхронно подключается к базе данных, выполняет запрос и затем закрывает соединение. Затем, с помощью asyncio.run(main()), мы запускаем нашу основную асинхронную задачу.

Работа с базами данных и внешними API

В асинхронных микросервисах работа с базами данных и внешними API также важна. ия.

Использование асинхронных библиотек, таких как aiohttp для HTTP-запросов или aiomysql для работы с MySQL, позволяет вам эффективно обращаться к внешним ресурсам без блокировки других задач.

Пример использования aiohttp для асинхронных HTTP-запросов:

import aiohttp
import asyncio

async def fetch_data_from_api(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

async def main():
    urls = [
        "https://api.example.com/data/1",
        "https://api.example.com/data/2",
        "https://api.example.com/data/3"
    ]
    tasks = [fetch_data_from_api(url) for url in urls]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)

asyncio.run(main())

Здесь мы используем aiohttp для асинхронного обращения к внешним API. Вместо ожидания ответа от сервера, мы асинхронно обрабатываем все запросы, что позволяет нам значительно увеличить скорость выполнения приложения.

Пример разработки асинхронного микросервиса на Python

Разработаем простой, но мощный микросервис с использованием FastAPI и асинхронных функций. Будем шаг за шагом создавать сервис, который обрабатывает запросы, взаимодействует с базой данных и предоставляет данные.

Для начала, убедимся, что у нас установлен FastAPI. Если нет, давайте установим его с помощью pip:

pip install fastapi

И так, представьте, что нам нужно создать микросервис для управления задачами в списке. Начнем с создания файла main.py и импортирования необходимых модулей:

from fastapi import FastAPI

app = FastAPI()

Теперь создадим асинхронные обработчики для наших запросов. Для примера, давайте реализуем добавление и получение списка задач.

Добавление задачи:

tasks = []

@app.post("/tasks/")
async def create_task(task: str):
    tasks.append(task)
    return {"message": "Task created successfully"}

Получение списка задач:

@app.get("/tasks/")
async def get_tasks():
    return {"tasks": tasks}

Теперь наш микросервис может принимать POST-запросы для создания новых задач и GET-запросы для получения списка задач. Но что, если мы хотим сохранять задачи в базе данных? Давайте внедрим асинхронное взаимодействие с базой данных с использованием aiomysql.

Взаимодействие с базой данных

Сначала установим aiomysql:

pip install aiomysql

Теперь добавим код для работы с базой данных:

import aiomysql

async def create_pool():
    pool = await aiomysql.create_pool(
        host="localhost", user="user",
        password="password", db="tasks_db",
        autocommit=True
    )
    return pool

async def save_task_to_db(task):
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("INSERT INTO tasks (task_name) VALUES (%s)", (task,))

Изменим обработчик создания задачи:

pool = None

@app.on_event("startup")
async def startup_db():
    global pool
    pool = await create_pool()

@app.on_event("shutdown")
async def shutdown_db():
    global pool
    pool.close()
    await pool.wait_closed()

@app.post("/tasks/")
async def create_task(task: str):
    await save_task_to_db(task)
    return {"message": "Task created successfully"}

Теперь наш микросервис сохраняет задачи в базу данных, обеспечивая надежное хранение данных между запросами.

Вот как выглядит полный пример разработки асинхронного микросервиса на Python с использованием FastAPI и асинхронных функций:

from fastapi import FastAPI
import aiomysql

app = FastAPI()

pool = None

async def create_pool():
    pool = await aiomysql.create_pool(
        host="localhost", user="user",
        password="password", db="tasks_db",
        autocommit=True
    )
    return pool

async def save_task_to_db(task):
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("INSERT INTO tasks (task_name) VALUES (%s)", (task,))

@app.on_event("startup")
async def startup_db():
    global pool
    pool = await create_pool()

@app.on_event("shutdown")
async def shutdown_db():
    global pool
    pool.close()
    await pool.wait_closed()

tasks = []

@app.post("/tasks/")
async def create_task(task: str):
    tasks.append(task)
    await save_task_to_db(task)
    return {"message": "Task created successfully"}

@app.get("/tasks/")
async def get_tasks():
    return {"tasks": tasks}

Таким образом, мы разработали асинхронный микросервис на Python с использованием FastAPI и асинхронных функций. Этот микросервис способен принимать запросы на создание и получение задач, а также сохранять задачи в базе данных для долгосрочного хранения. Это всего лишь начало возможностей асинхронной разработки на Python, и вы можете дальше расширять и улучшать этот микросервис, добавляя новые функции и улучшая производительность.

Управление асинхронными операциями

Обработка ошибок и исключений в асинхронных функциях

Когда мы работаем с асинхронными операциями, неизбежно встает вопрос об обработке ошибок. В асинхронных функциях ошибки могут возникнуть как в самой функции, так и в асинхронных вызовах, которые она выполняет. Пример обработки ошибок с использованием try и except:

async def divide(a, b):
    try:
        result = a / b
        return result
    except ZeroDivisionError:
        return "Division by zero is not allowed"

Также мы можем использовать asyncio.gather() для асинхронного выполнения нескольких операций и обработки исключений:

import asyncio

async def main():
    try:
        results = await asyncio.gather(
            divide(10, 2),
            divide(20, 0),
            divide(30, 3)
        )
    except Exception as e:
        print(f"An error occurred: {e}")
    else:
        print(results)

asyncio.run(main())

Мониторинг и логирование асинхронных процессов

Эффективный мониторинг и логирование асинхронных процессов - это ключевой аспект в создании надежных и отказоустойчивых приложений. Для логирования асинхронных событий мы можем использовать стандартную библиотеку logging:

import asyncio
import logging

logging.basicConfig(level=logging.INFO)

async def my_task():
    logging.info("Task started")
    await asyncio.sleep(2)
    logging.info("Task completed")

async def main():
    await asyncio.gather(my_task(), my_task())

asyncio.run(main())

Тестирование асинхронного кода

Тестирование асинхронного кода имеет свои особенности, но с правильными подходами можно создать надежные и понятные тесты. Для тестирования асинхронных функций мы можем использовать библиотеку pytest и ее асинхронные возможности:

import asyncio
import pytest

async def async_add(a, b):
    await asyncio.sleep(1)  # имитируем асинхронную операцию
    return a + b

@pytest.mark.asyncio
async def test_async_add():
    result = await async_add(2, 3)
    assert result == 5

Обработка ошибок, мониторинг, логирование и тестирование – все это важные инструменты, позволяющие создавать надежные и эффективные асинхронные приложения на Python. Ваше понимание этих аспектов значительно обогатит вашу разработку и поможет создать более качественное программное обеспечение.

Заключение

В заключении, асинхронные микросервисы на Python предоставляют нам мощные инструменты для создания высокопроизводительных, масштабируемых и отзывчивых приложений.

Данная статья подготовлена в преддверии старта курса Microservice Architecture. На странице курса вы можете подробно ознакомиться с программой, а также зарегистрироваться на бесплатные вебинары.

Комментарии (2)


  1. S0mbre
    04.08.2023 01:24

    В разделе про базы данных зачем-то заново дан пример по aiohttp...


  1. baldr
    04.08.2023 01:24

    Ох, OTUS- символ низкокачественных статей на Хабре... Снова неправильный пример, скопированный откуда-то из глубин индийского интернета..

    Держите правильный код, просто дарю. Можете в статье поправить (да, в обоих местах).

    Пояснение: в изначальном примере сессия создается внутри функции fetch_data, что ведет к очень большим накладным расходам - создание нового TCP-соединения. Если сессию сначала создать, а потом передать в функцию - можно добиться более высокой производительности и снижения нагрузок на сеть и сервера. Вот просто первая попавшаяся нагугленная статья по теме. Вообще ClientSession принимает много параметров и их нужно использовать если у вас что-то больше чем просто один раз скачать одну страничку.

    Однако, XXI век не стоит на дворе и пора бы переходить с aiohttp на httpx.

    import aiohttp
    import asyncio
    
    async def fetch_data(session, url):
        async with session.get(url) as response:
            return await response.text()
    
    async def main():
        urls = [
            "https://api.example.com/data/1",
            "https://api.example.com/data/2",
            "https://api.example.com/data/3"
        ]
        async with aiohttp.ClientSession() as session:
            tasks = [fetch_data(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
        for result in results:
            print(result)
    
    asyncio.run(main())