Привет, Хабр! Сегодня поговорим о lock-free (или же без использования блокировок) структурах данных и атомарных операциях в Rust.
Каждый lock может стать узким местом, тормозящим всю систему. Базовые методы синхронизации, типо мьютексов и семафор, частенько (но не всегда) снижают производительность из-за блокировок и контекстных переключений.
lock-free структуры данных позволяют нескольким потокам одновременно читать и изменять данные без блокировок.
Основы lock-free программирования в Rust
Rust дает безопасность многопоточной работы благодаря своей модели владения и системе типов:
-
Модель владения
Каждый ресурс в Rust имеет владельца, и только один поток может быть владельцем ресурса в конкретный момент времени. Это предотвращает возможность конкурентного доступа к одному и тому же ресурсу без синхронизации.
Жизненные циклы контролят время жизни ссылок, предотвращая использование недействительных ссылок.
-
Типы и проверка на этапе компиляции
Типаж
Send
: этот типаж указывает, что тип данных может быть безопасно передан между потоками.Типаж
Sync
: указывает, что тип данных может быть безопасно доступен из нескольких потоков одновременно.
Rust проверяет правильность использования типажей Send
и Sync
на этапе компиляции.
use std::sync::Arc;
use std::thread;
fn main() {
let data = Arc::new(vec![1, 2, 3]);
let mut handles = vec![];
for _ in 0..3 {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
println!("{:?}", data);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
Здесь юзаем Arc
для безопасного разделения данных между потоками. Типаж Send
позволяет перемещать Arc
между потоками, а Sync
дает безопасный доступ к данным.
Атомарные типы данных в Rust позволяют выполнять операции над данными без блокировок.
Основные атомарные типы данных в стандартной библиотеке Rust включают:
-
AtomicBool
Атомарная булевая переменная.
-
Пример:
use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; let flag = AtomicBool::new(false); let handle = thread::spawn(move || { flag.store(true, Ordering::Relaxed); }); handle.join().unwrap(); assert_eq!(flag.load(Ordering::Relaxed), true);
-
AtomicIsize и AtomicUsize
Атомарные целые числа (signed и unsigned).
-
Пример:
use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; let counter = AtomicUsize::new(0); let handles: Vec<_> = (0..10).map(|_| { let counter = &counter; thread::spawn(move || { for _ in 0..1000 { counter.fetch_add(1, Ordering::SeqCst); } }) }).collect(); for handle in handles { handle.join().unwrap(); } assert_eq!(counter.load(Ordering::SeqCst), 10000);
-
AtomicPtr
Атомарный указатель.
-
Пример:
use std::sync::atomic::{AtomicPtr, Ordering}; use std::ptr; let mut data = 5; let atomic_ptr = AtomicPtr::new(&mut data); let new_data = 10; atomic_ptr.store(&mut new_data, Ordering::SeqCst); assert_eq!(unsafe { *atomic_ptr.load(Ordering::SeqCst) }, 10);
Атомарные операции обеспечивают безопасность и целостность данных без необходимости использования блокировок!
Crossbeam для lock-free структур данных
Библиотека crossbeam предназначена для упрощения многопоточного программирования и высокопроизводительных безопасных структур данных lock-free.
Основные элементы Crossbeam:
Структуры данных: многопоточные структуры данных, такие как очереди и деки.
Epoch-based garbage collection: механизм управления памятью, который минимизирует блокировки при сборе мусора.
Concurrency primitives: синхронизационные примитивы для упрощения написания конкурентного кода.
ArrayQueue
ArrayQueue представляет собой lock-free очередь, реализованную на массиве с фикс. размером. Эта очередь идеально подходит для сценариев, где известно макс. количество элементов, которое нужно хранить. Пример:
use crossbeam_queue::ArrayQueue;
use std::sync::Arc;
use std::thread;
fn main() {
let queue = Arc::new(ArrayQueue::new(100));
let mut handles = vec![];
for i in 0..5 {
let queue = Arc::clone(&queue);
handles.push(thread::spawn(move || {
for j in 0..20 {
queue.push(i * 20 + j).unwrap();
}
}));
}
for handle in handles {
handle.join().unwrap();
}
while let Ok(val) = queue.pop() {
println!("{}", val);
}
}
Несколько потоков добавляют элементы в ArrayQueue, а затем элементы извлекаются из очереди.
SegQueue
SegQueue — это lock-free очередь с динамическим расширением, которая подходит для сценариев с неопределенным количеством элементов. Пример:
use crossbeam_queue::SegQueue;
use std::sync::Arc;
use std::thread;
fn main() {
let queue = Arc::new(SegQueue::new());
let mut handles = vec![];
for i in 0..5 {
let queue = Arc::clone(&queue);
handles.push(thread::spawn(move || {
for j in 0..20 {
queue.push(i * 20 + j);
}
}));
}
for handle in handles {
handle.join().unwrap();
}
while let Some(val) = queue.pop() {
println!("{}", val);
}
}
SegQueue используется аналогично ArrayQueue, но не ограничена фиксированным размером.
epoch-based garbage collection
Epoch-based garbage collection юзают для управления памятью без блокировок. Основная фича состоит в том, чтобы разделить время на epochs и отслеживать, когда каждый поток находится в активном состоянии.
Из чего состоит epoch-based GC:
Global epoch counter: глобальный счетчик эпох, который отслеживает текущую эпоху.
Thread-local epoch counter: локальный для потока счетчик эпох, который обновляется при каждом входе в критическую секцию.
Garbage lists: списки мусора для каждой epoch, в которые добавляются освобождаемые объекты.
Пример:
use crossbeam_epoch as epoch;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr;
struct Node {
value: i32,
next: AtomicPtr<Node>,
}
fn main() {
let n1 = Box::into_raw(Box::new(Node {
value: 1,
next: AtomicPtr::new(ptr::null_mut()),
}));
let n2 = Box::into_raw(Box::new(Node {
value: 2,
next: AtomicPtr::new(n1),
}));
let head = AtomicPtr::new(n2);
epoch::pin(|scope| {
let h = head.load(Ordering::Relaxed, scope);
unsafe {
if !h.is_null() {
println!("Value: {}", (*h).value);
}
}
});
unsafe {
drop(Box::from_raw(n1));
drop(Box::from_raw(n2));
}
}
Создали простую lock-free структуру данных и юзаем epoch-based GC для управления памятью.
Примеры использования
Многопоточная обработка событий в серверной системе
В серверных приложениях нужно обрабатывать множество событий, поступающих от различных клиентов. С lock-free очередями можно избежать задержек, связанных с блокировками:
use crossbeam_queue::SegQueue;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
struct Event {
id: u32,
payload: String,
}
fn main() {
let event_queue = Arc::new(SegQueue::new());
let mut handles = vec![];
// поток, добавляющий события в очередь
let producer_queue = Arc::clone(&event_queue);
handles.push(thread::spawn(move || {
for i in 0..100 {
let event = Event {
id: i,
payload: format!("Event {}", i),
};
producer_queue.push(event);
thread::sleep(Duration::from_millis(10));
}
}));
// поток, обрабатывающий события из очереди
let consumer_queue = Arc::clone(&event_queue);
handles.push(thread::spawn(move || {
while let Some(event) = consumer_queue.pop() {
println!("Processing event id: {}", event.id);
thread::sleep(Duration::from_millis(20));
}
}));
for handle in handles {
handle.join().unwrap();
}
}
Один поток добавляет события в lock-free очередь SegQueue, а другой поток извлекает и обрабатывает их.
Пул потоков для веб-сервера
Пул потоков в основном юзают в веб-серверах для обработки запросов клиентов. lock-free здесь тоже находят свое применение:
use crossbeam_queue::SegQueue;
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc::{self, Sender};
struct ThreadPool {
workers: Vec<Worker>,
sender: Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job();
});
Worker {
id,
thread: Some(thread),
}
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Processing request {}", i);
});
}
}
Пул потоков использует канал mpsc для передачи задач.
Concurrent Hash Map
С Concurrent Hash Map можно круто кэшировать данные, разделяемые между потоками, без необходимости блокировок:
use std::sync::Arc;
use dashmap::DashMap;
use std::thread;
fn main() {
let cache = Arc::new(DashMap::new());
let mut handles = vec![];
// потоки, добавляющие данные в кэш
for i in 0..5 {
let cache = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for j in 0..10 {
cache.insert(i * 10 + j, format!("Value {}", j));
}
}));
}
// потоки, читающие данные из кэша
for i in 0..5 {
let cache = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for j in 0..10 {
if let Some(value) = cache.get(&(i * 10 + j)) {
println!("Read from cache: {}", value);
}
}
}));
}
for handle in handles {
handle.join().unwrap();
}
}
Юзаем DashMap, которая обеспечивает lock-free операции для вставки и чтения данных.
На моем опыте было замечено, что с lock-free структурами данных можно сократить время выполнения задач на 30-50% по сравнению с использованием мьютексов.
Больше про языки программирования эксперты OTUS рассказывают в рамках практических онлайн-курсов. С полным каталогом курсов можно ознакомиться по ссылке.
T_12
Но ведь DashMap под капотом шардирует и на каждый шард использует RwLock, разве нет?