Это продолжение цикла статей про asyncio. Начало здесь.

6. Веб-сервер aiohttp и другие жители асинхронного мира

Продолжаем готовить asyncio. Теперь мы уже знаем достаточно, чтобы написать модный асинхронный микросервис. Реализуем известный архитектурный паттерн "API-шлюз". Это довольно простая штука. По запросу на свой API-интерфейс приложение собирает данные из других API, обрабатывает и возвращает результат пользователю. При этом пользователь знает только одну точку входа, а все внутренние подробности (кто куда и зачем сходил) от него скрыты.

В предыдущей главе мы научились запрашивать погоду у сервиса api.openweathermap.org. Давайте его слегка украдем импортозаместим. Вернее сказать, русифицируем. Пусть пользователь нашего сервиса передает название города на русском языке в параметрах GET-запроса и получает ответ в виде json опять-таки на великом и могучем. А откуда мы взяли информацию о погоде, пользователю знать не положено. Может у нас собственные метеостанции в каждой деревне стоят, поди проверь.

Мы уже освоили http-клиента библиотеки aiohttp, с помощью которого можно обращаться к внешним API. Оказывается, в этой же библиотеке есть и все необходимое для создания полноценного http-сервера. Для начала напишем просто зеркальный прокси:

Пример 6.1

import asyncio
import json
from aiohttp import ClientSession, web


async def get_weather(city):
    async with ClientSession() as session:
        url = f'http://api.openweathermap.org/data/2.5/weather'
        params = {'q': city, 'APPID': '2a4ff86f9aaa70041ec8e82db64abf56'}

        async with session.get(url=url, params=params) as response:
            weather_json = await response.json()
            try:
                return weather_json["weather"][0]["main"]
            except KeyError:
                return 'Нет данных'


async def handle(request):
    city = request.rel_url.query['city']
    weather = await get_weather(city)
    result = {'city': city, 'weather': weather}

    return web.Response(text=json.dumps(result, ensure_ascii=False))


async def main():
    app = web.Application()
    app.add_routes([web.get('/weather', handle)])
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()

    while True:
        await asyncio.sleep(3600)


if __name__ == '__main__':
    asyncio.run(main())

В асинхронной функции get_weather ничего нового нет, мы ее лишь слегка причесали, чтобы запрос погоды для несуществующего города не приводил к трагическим последствиям для всего нашего приложения. За обработку запроса отвечает функция handle ("ручка" на сленге бэкендеров). Из запроса извлекается параметр city и передается в get_weather. Далее формируется результирующий ответ в виде json. Адрес нашего сервиса и тип запроса задается вapp.add_routes.

Стоп! А где же задачи? Не переживайте. Когда мы имеем дело с асинхронными веб-фреймворками (а aiohttp — это именно фреймворк, хоть и супер-минималистический), вся работа по созданию и запуску задач asyncio происходит у фреймворка "под капотом". Нам, как разработчикам, теперь нет нужды беспокоится об этих низменных деталях. Приложение мирно спит в бесконечном неблокирующем цикле, пока не придет запрос GET на определенный URL. Как только это произойдет, отработает логика в ручке. И снова баю-бай до следующего запроса. Но если первый запрос еще не успел обработаться, как поступил следующий, фреймворк отработает его в отдельной задаче, не дожидаясь (по возможности) окончания обработки первого. В этом сама суть асинхронности.

Заходим браузером на адрес: localhost:8080/weather?city=Sochi и получаем симпатичный json:

{"city": "Sochi", "weather": "Clouds"}

Кстати, если вы всерьез решили заняться бэкенд-разработкой, одним браузером вам никак не обойтись. Потребуется инструмент, позволяющий залезать вглубь HTTP. Стандарт де-факто здесь Postman, но в природе существуют и альтернативные решения.

Скелет нашего приложения готов, теперь начинаем наращивать на нем мышцы. Воспользуемся бесплатным API переводчика libretranslate.de:

Пример 6.2

import asyncio
import json
from aiohttp import ClientSession, web


async def get_weather(city):
    async with ClientSession() as session:
        url = f'http://api.openweathermap.org/data/2.5/weather'
        params = {'q': city, 'APPID': '2a4ff86f9aaa70041ec8e82db64abf56'}

        async with session.get(url=url, params=params) as response:
            weather_json = await response.json()
            try:
                return weather_json["weather"][0]["main"]
            except KeyError:
                return 'Нет данных'


async def get_translation(text, source, target):
    async with ClientSession() as session:
        url = 'https://libretranslate.de/translate'

        data = {'q': text, 'source': source, 'target': target, 'format': 'text'}

        async with session.post(url, json=data) as response:
            translate_json = await response.json()

            try:
                return translate_json['translatedText']
            except KeyError:
                return text


async def handle(request):
    city_ru = request.rel_url.query['city']
    city_en = await get_translation(city_ru, 'ru', 'en')

    weather_en = await get_weather(city_en)
    weather_ru = await get_translation(weather_en, 'en', 'ru')

    result = {'city': city_ru, 'weather': weather_ru}

    return web.Response(text=json.dumps(result, ensure_ascii=False))


async def main():
    app = web.Application()
    app.add_routes([web.get('/weather', handle)])
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()

    while True:
        await asyncio.sleep(3600)


if __name__ == '__main__':
    asyncio.run(main())

Теперь в ручке дважды вызывается асинхронная функция get_translation (обратите внимание, на этот раз мы передаем параметры внешнему сервису в виде json через тело запроса POST) и, вуаля:

localhost:8080/weather?city=Сочи

{"city": "Сочи", "weather": "Облака"}

Над всей Испанией безоблачное небо, а в деревне Гадюкино опять идут дожди...

Но что это за микросервис без логгера? Однако использовать в насквозь асинхронном приложении привычную синхронную (а значит блокирующую) библиотеку logging — это не наш путь. Воспользуемся правильной асинхронной библиотекой логгирования aiologger:

Пример 6.3

import asyncio
import json
from aiohttp import ClientSession, web
from aiologger.loggers.json import JsonLogger


logger = JsonLogger.with_default_handlers(
            level='DEBUG',
            serializer_kwargs={'ensure_ascii': False},
        )


async def get_weather(city):
    async with ClientSession() as session:
        url = f'http://api.openweathermap.org/data/2.5/weather'
        params = {'q': city, 'APPID': '2a4ff86f9aaa70041ec8e82db64abf56'}

        async with session.get(url=url, params=params) as response:
            weather_json = await response.json()
            try:
                return weather_json["weather"][0]["main"]
            except KeyError:
                return 'Нет данных'


async def get_translation(text, source, target):
    await logger.info(f'Поступил запрос на на перевод слова: {text}')

    async with ClientSession() as session:
        url = 'https://libretranslate.de/translate'

        data = {'q': text, 'source': source, 'target': target, 'format': 'text'}

        async with session.post(url, json=data) as response:
            translate_json = await response.json()

            try:
                return translate_json['translatedText']
            except KeyError:
                logger.error(f'Невозможно получить перевод для слова: {text}')
                return text


async def handle(request):
    city_ru = request.rel_url.query['city']

    await logger.info(f'Поступил запрос на город: {city_ru}')

    city_en = await get_translation(city_ru, 'ru', 'en')
    weather_en = await get_weather(city_en)
    weather_ru = await get_translation(weather_en, 'en', 'ru')

    result = {'city': city_ru, 'weather': weather_ru}

    return web.Response(text=json.dumps(result, ensure_ascii=False))


async def main():
    app = web.Application()
    app.add_routes([web.get('/weather', handle)])
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()

    while True:
        await asyncio.sleep(3600)


if __name__ == '__main__':
    asyncio.run(main())

Обратите внимание, асинхронный логгер ни на миллисекунду не задержит работу нашего приложения в целом. Он будет использовать паузы, пока мы ожидаем запроса от пользователя или ответа от внешнего сервиса. Поэтому, если мы будем наблюдать за его работой под серьезной нагрузкой, может обнаружиться, что он заметно отстает от реального времени. Но в конечном итоге все что должно будет выведено в лог и ни один символ не потеряется.

Ну а теперь вишенка на торте — асинхронный доступ к базе данных. Предположим, в процессе работы нашего приложения нам надо что-то писать в БД, ну, например, сохранять поступившие запросы (ничего более умного мне как-то в голову не пришло). Самая простая БД в мире, как известно, — это SQLite. И для нее, к счастью, есть асинхронный драйвер aiosqlite. Пробуем:

Пример 6.4

import json
import aiosqlite
import asyncio
from aiohttp import ClientSession, web
from aiologger.loggers.json import JsonLogger
from datetime import datetime


logger = JsonLogger.with_default_handlers(
            level='DEBUG',
            serializer_kwargs={'ensure_ascii': False},
        )


async def create_table():
    async with aiosqlite.connect('weather.db') as db:
        await db.execute('CREATE TABLE IF NOT EXISTS requests '
                         '(date text, city text, weather text)')
        await db.commit()


async def save_to_db(city, weather):
    async with aiosqlite.connect('weather.db') as db:
        await db.execute('INSERT INTO requests VALUES (?, ?, ?)',
                         (datetime.now(), city, weather))
        await db.commit()


async def get_weather(city):
    async with ClientSession() as session:
        url = f'http://api.openweathermap.org/data/2.5/weather'
        params = {'q': city, 'APPID': '2a4ff86f9aaa70041ec8e82db64abf56'}

        async with session.get(url=url, params=params) as response:
            weather_json = await response.json()
            try:
                return weather_json["weather"][0]["main"]
            except KeyError:
                return 'Нет данных'


async def get_translation(text, source, target):
    await logger.info(f'Поступил запрос на на перевод слова: {text}')

    async with ClientSession() as session:
        url = 'https://libretranslate.de/translate'

        data = {'q': text, 'source': source, 'target': target, 'format': 'text'}

        async with session.post(url, json=data) as response:
            translate_json = await response.json()

            try:
                return translate_json['translatedText']
            except KeyError:
                logger.error(f'Невозможно получить перевод для слова: {text}')
                return text


async def handle(request):
    city_ru = request.rel_url.query['city']

    await logger.info(f'Поступил запрос на город: {city_ru}')

    city_en = await get_translation(city_ru, 'ru', 'en')
    weather_en = await get_weather(city_en)
    weather_ru = await get_translation(weather_en, 'en', 'ru')

    result = {'city': city_ru, 'weather': weather_ru}

    await save_to_db(city_ru, weather_ru)

    return web.Response(text=json.dumps(result, ensure_ascii=False))


async def main():
    await create_table()
    app = web.Application()
    app.add_routes([web.get('/weather', handle)])
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()

    while True:
        await asyncio.sleep(3600)


if __name__ == '__main__':
    asyncio.run(main())

Можете заглянуть внутрь созданного на лету файла weather.db (только используйте не текстовый просмотрщик, а какую-нибудь утилиту для работы с БД, например, DBeaver). Для каждого запроса создается соответствующая запись в таблице requests. И снова никаких блокировок, мы ведь живем в асинхронном мире.

В заключение этого раздела хочу вас поздравить. Теперь вы имеете в руках все необходимое для создания собственных асинхронных веб-приложений. Неважно какой фреймворк вы будете использовать: FastAPI, Tornado, Falcon или какой-нибудь еще. Принцип останется тот же самый как в старом добром aiohttp: создаем ручку и в ней нанизываем "шашлык" из вызовов асинхронных функций. Главное, за чем необходимо следить — это чтобы в эти функции не затесался какой-нибудь блокирующий зловред из скучной пыльной синхронной вселенной.

На этом временно прощаемся.

Продолжение следует...

Комментарии (7)


  1. andriymoskal
    17.06.2022 01:27
    +2

    People constantly asked about ability to run aiohttp servers together with other asyncio code, but aiohttp.web.run_app() is blocking synchronous call.

    run_app() provides a simple blocking API for running an Application.

    For starting the application asynchronously or serving on multiple HOST/PORT AppRunner exists.


    1. vlakir Автор
      17.06.2022 08:36

      Спасибо за ценное замечание! Действительно, мой косяк - надо внимательнее читать документацию. Поправил и плюс в карму.


  1. funca
    17.06.2022 04:47
    +1

    sql = f'INSERT INTO requests VALUES ("{datetime.now()}", "{city}", "{weather}")'

    await db.execute(sql)

    Я понимаю, что примеры упрощённые. Но ведь написать запрос без потенциального SQL injection заняло бы столько же строчек. Вы же учить новичков пытаетесь, так учите хорошему.

    url = f'http://api.openweathermap.org/data/2.5/weather' \

    f'?q={city}&APPID=2a4ff86f9aaa70041ec8e82db64abf56'

    Так подставлять параметр в URL тоже не безопасно.


    1. vlakir Автор
      17.06.2022 08:45
      +1

      Спасибо, согласен. Поправил.


      1. funca
        17.06.2022 20:41

        Теперь гораздо лучше. Вообще мне нравится ваш стиль - просто о сложном. Надеюсь на продолжение цикла.


  1. TheRealMaN
    19.06.2022 14:07

    Неважно какой фреймворк вы будете использовать: FastAPITornadoFalcon или какой-нибудь еще. Принцип останется тот же самый как в старом добром aiohttp: создаем ручку и в ней нанизываем "шашлык" из вызовов асинхронных функций.

    Очень интересная статья, а можно, пожалуйста, небольшой пример кода, как запустить FastAPI в async режиме?


    1. vlakir Автор
      19.06.2022 14:15

      Вот здесь посмотрите. И как запустить это под uvicorn тут.