Привет, Хабр!

Gevent — это высокопроизводительная асинхронная библиотека для Python, которая решает проблемы многозадачности с помощью корутин, известных как "зеленые потоки" или Greenlets. Зеленые потоки — легковесные корутины, которые позволяют выполнять задачи одновременно без затрат ресурсов, связанных с традиционными потоками.

Также Gevent преуспевает в неблокирующих операциях ввода/вывода и имеется встроенная техника под названием "monkey patching", которая модифицирует некоторые модули стандартной библиотеки, делая их кооперативными. Так можно преобразовывать блокирующие вызовы в неблокирующие в рамках среды Gevent.

Рассмотрим библиотеку подробней. Но для начала - установим:

pip install gevent

Основной синтаксис

Создание и управление зелеными потоками

gevent.spawn(): создание зеленых потоков

import gevent

def task(message):
    print(message)

# создание зеленого потока
greenlet = gevent.spawn(task, "Hello, Habr!")
# ожидание завершения зеленого потока
greenlet.join()

Фнкция создает новый зеленый поток, который выполняет указанную функцию task с переданным аргументом "Hello, Habr!". join() используется для ожидания завершения зеленого потока.

gevent.spawn_later(): отложенный запуск зеленых потоков

import gevent

def delayed_task():
    print("Task executed after delay")

# отложенный запуск через 5 секунд
greenlet = gevent.spawn_later(5, delayed_task)
# ожидание завершения зеленого потока
greenlet.join()

Функция запускает зеленый поток через заданное время (в нашем случае 5 секунд).

gevent.kill() и gevent.killall(): остановка зеленых потоков

import gevent

def task():
    gevent.sleep(10)
    print("This will not be printed")

# создание и запуск зеленого потока
greenlet = gevent.spawn(task)
# остановка зеленого потока
greenlet.kill()

# создание и запуск нескольких зеленых потоков
greenlets = [gevent.spawn(task) for _ in range(3)]
# остановка всех зеленых потоков
gevent.killall(greenlets)

kill() используется для остановки одного зеленого потока, а killall() — для остановки всех переданных зеленых потоков.

gevent.joinall(): ожидание завершения всех зеленых потоков

import gevent

def task(id):
    gevent.sleep(id)
    print(f"Task {id} completed")

# создание и запуск нескольких зеленых потоков
greenlets = [gevent.spawn(task, i) for i in range(5)]
# ожидание завершения всех зеленых потоков
gevent.joinall(greenlets)

joinall() позволяет ожидать завершения всех зеленых потоков из переданного списка.

Работа с синхронизацией

gevent.lock(): замки

import gevent
from gevent.lock import Semaphore

sem = Semaphore(1)

def task1():
    with sem:
        print("Task 1 acquired lock")
        gevent.sleep(2)
        print("Task 1 released lock")

def task2():
    with sem:
        print("Task 2 acquired lock")
        gevent.sleep(1)
        print("Task 2 released lock")

gevent.joinall([
    gevent.spawn(task1),
    gevent.spawn(task2)
])

Семафоры используются для управления доступом к ресурсам между зелеными потоками.

gevent.event(): события

import gevent
from gevent.event import Event

evt = Event()

def setter():
    gevent.sleep(3)
    print("Event set")
    evt.set()

def waiter():
    print("Waiting for event")
    evt.wait()
    print("Event received")

gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter)
])

События используются для синхронизации зеленых потоков, позволяя одному потоку ждать, пока другой не выполнит определенное действие.

Взаимодействие с I/O

gevent.socket: кооперативные сокеты

import gevent
from gevent import socket

def fetch(host):
    sock = socket.create_connection((host, 80))
    sock.sendall(b"GET / HTTP/1.1\r\nHost: " + host.encode('ascii') + b"\r\n\r\n")
    data = sock.recv(1024)
    print(f"{host}: {data}")

hosts = ['www.google.com', 'otus.ru', 'habr.com']
jobs = [gevent.spawn(fetch, host) for host in hosts]
gevent.joinall(jobs)

С кооперативными сокетами можно выполнять сетевые операции асинхронно, не блокируя выполнение программы.

gevent.subprocess: работа с subprocess в асинхронном режиме

import gevent
from gevent import subprocess

def run_command():
    process = subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE)
    out, err = process.communicate()
    print(out.decode('utf-8'))

gevent.spawn(run_command).join()

С модулем gevent.subprocess можно асинхронно запускать и управлять внешними процессами.

Monkey Patching

gevent.monkey.patch_all(): кооперативный патчинг стандартных библиотек

from gevent import monkey
monkey.patch_all()

import socket
import requests

def fetch(url):
    response = requests.get(url)
    print(f"Fetched {len(response.content)} bytes from {url}")

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']
jobs = [gevent.spawn(fetch, url) for url in urls]
gevent.joinall(jobs)

Функция monkey.patch_all() заменяет стандартные блокирующие вызовы на кооперативные.

Примеры использования

В большинстве примеров будем использовать Flask для создания API сервиса обмена сообщениями.

Веб-сервер с использованием Gevent

Создадим веб-сервер, который может одновременно обслуживать множество клиентов:

from gevent import monkey
monkey.patch_all()

from gevent.pywsgi import WSGIServer
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/api', methods=['GET'])
def api():
    return jsonify({'message': 'Hello, Habr!'})

if __name__ == '__main__':
    http_server = WSGIServer(('0.0.0.0', 5000), app)
    print("Server running on http://0.0.0.0:5000")
    http_server.serve_forever()

monkey.patch_all() используется для кооперативного патчинга стандартных библиотек, чтобы сделать их совместимыми с Gevent.

WSGIServer из gevent.pywsgi создает веб-сервер, который может обслуживать несколько запросов одновременно.

Веб-скрапинг с использованием Gevent

Гео-скрапинг сайтов для извлечения данных параллельно:

import gevent
from gevent import monkey
monkey.patch_all()

import requests
from bs4 import BeautifulSoup

def fetch(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')
    title = soup.find('title').text
    print(f"Title: {title} from {url}")

urls = [
    'http://www.google.com',
    'http://www.example.com',
    'http://www.python.org'
]

jobs = [gevent.spawn(fetch, url) for url in urls]
gevent.joinall(jobs)

monkey.patch_all() для кооперативного выполнение сетевых запросов.

requests используется для отправки HTTP-запросов.

BeautifulSoup используется для парсинга HTML и извлечения данных.

Асинхронные микросервисы с использованием Gevent

Создадим микросервис, который выполняет асинхронные операции ввода-вывода:

from gevent import monkey
monkey.patch_all()

import gevent
from gevent.queue import Queue
from flask import Flask, jsonify

app = Flask(__name__)
queue = Queue()

@app.route('/add_task', methods=['POST'])
def add_task():
    task = request.json.get('task')
    queue.put(task)
    return jsonify({'status': 'Task added'}), 201

@app.route('/get_task', methods=['GET'])
def get_task():
    try:
        task = queue.get_nowait()
    except gevent.queue.Empty:
        task = None
    return jsonify({'task': task})

def worker():
    while True:
        task = queue.get()
        if task:
            print(f"Processing task: {task}")
            gevent.sleep(2)

if __name__ == '__main__':
    workers = [gevent.spawn(worker) for _ in range(3)]
    http_server = WSGIServer(('0.0.0.0', 5000), app)
    print("Microservice running on http://0.0.0.0:5000")
    http_server.start()
    gevent.joinall(workers + [http_server])

Создается очередь задач Queue, в которую добавляются задачи через HTTP-запросы.

Рабочие worker обрабатывают задачи из очереди параллельно.

Управление потоками данных в реальном времени

Создадим сервис для обработки данных в реальном времени, например как чат или игровая комната:

from gevent import monkey
monkey.patch_all()

import gevent
from gevent.queue import Queue
from flask import Flask, request, jsonify

app = Flask(__name__)
message_queues = {}

@app.route('/send_message', methods=['POST'])
def send_message():
    username = request.json.get('username')
    message = request.json.get('message')
    if username in message_queues:
        message_queues[username].put_nowait(message)
        return jsonify({'status': 'Message sent'}), 201
    else:
        return jsonify({'error': 'User not connected'}), 404

@app.route('/receive_messages/<username>', methods=['GET'])
def receive_messages(username):
    if username not in message_queues:
        message_queues[username] = Queue()
    messages = []
    try:
        while True:
            message = message_queues[username].get_nowait()
            messages.append(message)
    except gevent.queue.Empty:
        pass
    return jsonify({'messages': messages})

if __name__ == '__main__':
    http_server = WSGIServer(('0.0.0.0', 5000), app)
    print("Real-time messaging service running on http://0.0.0.0:5000")
    http_server.serve_forever()

Создали очереди сообщений для каждого юзера.

Сообщения добавляются в соответствующую очередь и могут быть извлечены через HTTP-запросы.


Подробнее с библиотекой можно ознакомиться здесь.

В завершение статьи хочу пригласить вас на бесплатный вебинар курса Highload Architect. Регистрация доступна по ссылке.

Комментарии (3)


  1. kivsiak
    24.06.2024 11:44
    +9

    Вот серьезно? В 2024 году когда есть есть и поддержка асинхронщины на уровне языка и рантаймы типа asyncio и trio и фреймворки типа fastapi - рассказывать про gevent с его манкипатчингом? Из каких запасников вы извлекли эту изрядно заплесневевшую статью?


    1. spooph
      24.06.2024 11:44
      +6

      У этого автора 285 статей за 3 месяца. Просто копипастят со всего интернета весь мусор, что найдут, для накрутки кармы.

      Видимо, оригинал статьи родом из 2015 года, золотое время для gevent'a)


  1. Akorabelnikov
    24.06.2024 11:44

    Ужас, это ИИ рерайтит старый неактуальный код, чтобы на Хабре публиковаться? Поставьте за меня минус, я не могу