Привет, Хабр!
Gevent — это высокопроизводительная асинхронная библиотека для Python, которая решает проблемы многозадачности с помощью корутин, известных как "зеленые потоки" или Greenlets. Зеленые потоки — легковесные корутины, которые позволяют выполнять задачи одновременно без затрат ресурсов, связанных с традиционными потоками.
Также Gevent преуспевает в неблокирующих операциях ввода/вывода и имеется встроенная техника под названием "monkey patching", которая модифицирует некоторые модули стандартной библиотеки, делая их кооперативными. Так можно преобразовывать блокирующие вызовы в неблокирующие в рамках среды Gevent.
Рассмотрим библиотеку подробней. Но для начала - установим:
pip install gevent
Основной синтаксис
Создание и управление зелеными потоками
gevent.spawn(): создание зеленых потоков
import gevent
def task(message):
print(message)
# создание зеленого потока
greenlet = gevent.spawn(task, "Hello, Habr!")
# ожидание завершения зеленого потока
greenlet.join()
Фнкция создает новый зеленый поток, который выполняет указанную функцию task
с переданным аргументом "Hello, Habr!"
. join()
используется для ожидания завершения зеленого потока.
gevent.spawn_later(): отложенный запуск зеленых потоков
import gevent
def delayed_task():
print("Task executed after delay")
# отложенный запуск через 5 секунд
greenlet = gevent.spawn_later(5, delayed_task)
# ожидание завершения зеленого потока
greenlet.join()
Функция запускает зеленый поток через заданное время (в нашем случае 5 секунд).
gevent.kill() и gevent.killall(): остановка зеленых потоков
import gevent
def task():
gevent.sleep(10)
print("This will not be printed")
# создание и запуск зеленого потока
greenlet = gevent.spawn(task)
# остановка зеленого потока
greenlet.kill()
# создание и запуск нескольких зеленых потоков
greenlets = [gevent.spawn(task) for _ in range(3)]
# остановка всех зеленых потоков
gevent.killall(greenlets)
kill()
используется для остановки одного зеленого потока, а killall()
— для остановки всех переданных зеленых потоков.
gevent.joinall(): ожидание завершения всех зеленых потоков
import gevent
def task(id):
gevent.sleep(id)
print(f"Task {id} completed")
# создание и запуск нескольких зеленых потоков
greenlets = [gevent.spawn(task, i) for i in range(5)]
# ожидание завершения всех зеленых потоков
gevent.joinall(greenlets)
joinall()
позволяет ожидать завершения всех зеленых потоков из переданного списка.
Работа с синхронизацией
gevent.lock(): замки
import gevent
from gevent.lock import Semaphore
sem = Semaphore(1)
def task1():
with sem:
print("Task 1 acquired lock")
gevent.sleep(2)
print("Task 1 released lock")
def task2():
with sem:
print("Task 2 acquired lock")
gevent.sleep(1)
print("Task 2 released lock")
gevent.joinall([
gevent.spawn(task1),
gevent.spawn(task2)
])
Семафоры используются для управления доступом к ресурсам между зелеными потоками.
gevent.event(): события
import gevent
from gevent.event import Event
evt = Event()
def setter():
gevent.sleep(3)
print("Event set")
evt.set()
def waiter():
print("Waiting for event")
evt.wait()
print("Event received")
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter)
])
События используются для синхронизации зеленых потоков, позволяя одному потоку ждать, пока другой не выполнит определенное действие.
Взаимодействие с I/O
gevent.socket: кооперативные сокеты
import gevent
from gevent import socket
def fetch(host):
sock = socket.create_connection((host, 80))
sock.sendall(b"GET / HTTP/1.1\r\nHost: " + host.encode('ascii') + b"\r\n\r\n")
data = sock.recv(1024)
print(f"{host}: {data}")
hosts = ['www.google.com', 'otus.ru', 'habr.com']
jobs = [gevent.spawn(fetch, host) for host in hosts]
gevent.joinall(jobs)
С кооперативными сокетами можно выполнять сетевые операции асинхронно, не блокируя выполнение программы.
gevent.subprocess: работа с subprocess в асинхронном режиме
import gevent
from gevent import subprocess
def run_command():
process = subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE)
out, err = process.communicate()
print(out.decode('utf-8'))
gevent.spawn(run_command).join()
С модулем gevent.subprocess
можно асинхронно запускать и управлять внешними процессами.
Monkey Patching
gevent.monkey.patch_all(): кооперативный патчинг стандартных библиотек
from gevent import monkey
monkey.patch_all()
import socket
import requests
def fetch(url):
response = requests.get(url)
print(f"Fetched {len(response.content)} bytes from {url}")
urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']
jobs = [gevent.spawn(fetch, url) for url in urls]
gevent.joinall(jobs)
Функция monkey.patch_all()
заменяет стандартные блокирующие вызовы на кооперативные.
Примеры использования
В большинстве примеров будем использовать Flask для создания API сервиса обмена сообщениями.
Веб-сервер с использованием Gevent
Создадим веб-сервер, который может одновременно обслуживать множество клиентов:
from gevent import monkey
monkey.patch_all()
from gevent.pywsgi import WSGIServer
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/api', methods=['GET'])
def api():
return jsonify({'message': 'Hello, Habr!'})
if __name__ == '__main__':
http_server = WSGIServer(('0.0.0.0', 5000), app)
print("Server running on http://0.0.0.0:5000")
http_server.serve_forever()
monkey.patch_all()
используется для кооперативного патчинга стандартных библиотек, чтобы сделать их совместимыми с Gevent.
WSGIServer
из gevent.pywsgi
создает веб-сервер, который может обслуживать несколько запросов одновременно.
Веб-скрапинг с использованием Gevent
Гео-скрапинг сайтов для извлечения данных параллельно:
import gevent
from gevent import monkey
monkey.patch_all()
import requests
from bs4 import BeautifulSoup
def fetch(url):
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.find('title').text
print(f"Title: {title} from {url}")
urls = [
'http://www.google.com',
'http://www.example.com',
'http://www.python.org'
]
jobs = [gevent.spawn(fetch, url) for url in urls]
gevent.joinall(jobs)
monkey.patch_all()
для кооперативного выполнение сетевых запросов.
requests
используется для отправки HTTP-запросов.
BeautifulSoup
используется для парсинга HTML и извлечения данных.
Асинхронные микросервисы с использованием Gevent
Создадим микросервис, который выполняет асинхронные операции ввода-вывода:
from gevent import monkey
monkey.patch_all()
import gevent
from gevent.queue import Queue
from flask import Flask, jsonify
app = Flask(__name__)
queue = Queue()
@app.route('/add_task', methods=['POST'])
def add_task():
task = request.json.get('task')
queue.put(task)
return jsonify({'status': 'Task added'}), 201
@app.route('/get_task', methods=['GET'])
def get_task():
try:
task = queue.get_nowait()
except gevent.queue.Empty:
task = None
return jsonify({'task': task})
def worker():
while True:
task = queue.get()
if task:
print(f"Processing task: {task}")
gevent.sleep(2)
if __name__ == '__main__':
workers = [gevent.spawn(worker) for _ in range(3)]
http_server = WSGIServer(('0.0.0.0', 5000), app)
print("Microservice running on http://0.0.0.0:5000")
http_server.start()
gevent.joinall(workers + [http_server])
Создается очередь задач Queue
, в которую добавляются задачи через HTTP-запросы.
Рабочие worker
обрабатывают задачи из очереди параллельно.
Управление потоками данных в реальном времени
Создадим сервис для обработки данных в реальном времени, например как чат или игровая комната:
from gevent import monkey
monkey.patch_all()
import gevent
from gevent.queue import Queue
from flask import Flask, request, jsonify
app = Flask(__name__)
message_queues = {}
@app.route('/send_message', methods=['POST'])
def send_message():
username = request.json.get('username')
message = request.json.get('message')
if username in message_queues:
message_queues[username].put_nowait(message)
return jsonify({'status': 'Message sent'}), 201
else:
return jsonify({'error': 'User not connected'}), 404
@app.route('/receive_messages/<username>', methods=['GET'])
def receive_messages(username):
if username not in message_queues:
message_queues[username] = Queue()
messages = []
try:
while True:
message = message_queues[username].get_nowait()
messages.append(message)
except gevent.queue.Empty:
pass
return jsonify({'messages': messages})
if __name__ == '__main__':
http_server = WSGIServer(('0.0.0.0', 5000), app)
print("Real-time messaging service running on http://0.0.0.0:5000")
http_server.serve_forever()
Создали очереди сообщений для каждого юзера.
Сообщения добавляются в соответствующую очередь и могут быть извлечены через HTTP-запросы.
Подробнее с библиотекой можно ознакомиться здесь.
В завершение статьи хочу пригласить вас на бесплатный вебинар курса Highload Architect. Регистрация доступна по ссылке.
Комментарии (3)
Akorabelnikov
24.06.2024 11:44Ужас, это ИИ рерайтит старый неактуальный код, чтобы на Хабре публиковаться? Поставьте за меня минус, я не могу
kivsiak
Вот серьезно? В 2024 году когда есть есть и поддержка асинхронщины на уровне языка и рантаймы типа asyncio и trio и фреймворки типа fastapi - рассказывать про gevent с его манкипатчингом? Из каких запасников вы извлекли эту изрядно заплесневевшую статью?
spooph
У этого автора 285 статей за 3 месяца. Просто копипастят со всего интернета весь мусор, что найдут, для накрутки кармы.
Видимо, оригинал статьи родом из 2015 года, золотое время для gevent'a)