Привет, Хабр! Сегодня поговорим о lock-free (или же без использования блокировок) структурах данных и атомарных операциях в Rust.

Каждый lock может стать узким местом, тормозящим всю систему. Базовые методы синхронизации, типо мьютексов и семафор, частенько (но не всегда) снижают производительность из-за блокировок и контекстных переключений.

lock-free структуры данных позволяют нескольким потокам одновременно читать и изменять данные без блокировок.

Основы lock-free программирования в Rust

Rust дает безопасность многопоточной работы благодаря своей модели владения и системе типов:

  1. Модель владения

    • Каждый ресурс в Rust имеет владельца, и только один поток может быть владельцем ресурса в конкретный момент времени. Это предотвращает возможность конкурентного доступа к одному и тому же ресурсу без синхронизации.

    • Жизненные циклы контролят время жизни ссылок, предотвращая использование недействительных ссылок.

  2. Типы и проверка на этапе компиляции

    • Типаж Send: этот типаж указывает, что тип данных может быть безопасно передан между потоками.

    • Типаж Sync: указывает, что тип данных может быть безопасно доступен из нескольких потоков одновременно.

Rust проверяет правильность использования типажей Send и Sync на этапе компиляции.

use std::sync::Arc;
use std::thread;

fn main() {
    let data = Arc::new(vec![1, 2, 3]);
    let mut handles = vec![];

    for _ in 0..3 {
        let data = Arc::clone(&data);
        let handle = thread::spawn(move || {
            println!("{:?}", data);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

Здесь юзаем Arc для безопасного разделения данных между потоками. Типаж Send позволяет перемещать Arc между потоками, а Sync дает безопасный доступ к данным.

Атомарные типы данных в Rust позволяют выполнять операции над данными без блокировок.

Основные атомарные типы данных в стандартной библиотеке Rust включают:

  1. AtomicBool

    • Атомарная булевая переменная.

    • Пример:

      use std::sync::atomic::{AtomicBool, Ordering};
      use std::thread;
      
      let flag = AtomicBool::new(false);
      
      let handle = thread::spawn(move || {
          flag.store(true, Ordering::Relaxed);
      });
      
      handle.join().unwrap();
      assert_eq!(flag.load(Ordering::Relaxed), true);
  2. AtomicIsize и AtomicUsize

    • Атомарные целые числа (signed и unsigned).

    • Пример:

      use std::sync::atomic::{AtomicUsize, Ordering};
      use std::thread;
      
      let counter = AtomicUsize::new(0);
      let handles: Vec<_> = (0..10).map(|_| {
          let counter = &counter;
          thread::spawn(move || {
              for _ in 0..1000 {
                  counter.fetch_add(1, Ordering::SeqCst);
              }
          })
      }).collect();
      
      for handle in handles {
          handle.join().unwrap();
      }
      
      assert_eq!(counter.load(Ordering::SeqCst), 10000);
  3. AtomicPtr

    • Атомарный указатель.

    • Пример:

      use std::sync::atomic::{AtomicPtr, Ordering};
      use std::ptr;
      
      let mut data = 5;
      let atomic_ptr = AtomicPtr::new(&mut data);
      
      let new_data = 10;
      atomic_ptr.store(&mut new_data, Ordering::SeqCst);
      
      assert_eq!(unsafe { *atomic_ptr.load(Ordering::SeqCst) }, 10);

Атомарные операции обеспечивают безопасность и целостность данных без необходимости использования блокировок!

Crossbeam для lock-free структур данных

Библиотека crossbeam предназначена для упрощения многопоточного программирования и высокопроизводительных безопасных структур данных lock-free.

Основные элементы Crossbeam:

  • Структуры данных: многопоточные структуры данных, такие как очереди и деки.

  • Epoch-based garbage collection: механизм управления памятью, который минимизирует блокировки при сборе мусора.

  • Concurrency primitives: синхронизационные примитивы для упрощения написания конкурентного кода.

ArrayQueue

ArrayQueue представляет собой lock-free очередь, реализованную на массиве с фикс. размером. Эта очередь идеально подходит для сценариев, где известно макс. количество элементов, которое нужно хранить. Пример:

use crossbeam_queue::ArrayQueue;
use std::sync::Arc;
use std::thread;

fn main() {
    let queue = Arc::new(ArrayQueue::new(100));
    let mut handles = vec![];

    for i in 0..5 {
        let queue = Arc::clone(&queue);
        handles.push(thread::spawn(move || {
            for j in 0..20 {
                queue.push(i * 20 + j).unwrap();
            }
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

    while let Ok(val) = queue.pop() {
        println!("{}", val);
    }
}

Несколько потоков добавляют элементы в ArrayQueue, а затем элементы извлекаются из очереди.

SegQueue

SegQueue — это lock-free очередь с динамическим расширением, которая подходит для сценариев с неопределенным количеством элементов. Пример:

use crossbeam_queue::SegQueue;
use std::sync::Arc;
use std::thread;

fn main() {
    let queue = Arc::new(SegQueue::new());
    let mut handles = vec![];

    for i in 0..5 {
        let queue = Arc::clone(&queue);
        handles.push(thread::spawn(move || {
            for j in 0..20 {
                queue.push(i * 20 + j);
            }
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

    while let Some(val) = queue.pop() {
        println!("{}", val);
    }
}

SegQueue используется аналогично ArrayQueue, но не ограничена фиксированным размером.

epoch-based garbage collection

Epoch-based garbage collection юзают для управления памятью без блокировок. Основная фича состоит в том, чтобы разделить время на epochs и отслеживать, когда каждый поток находится в активном состоянии.

Из чего состоит epoch-based GC:

  • Global epoch counter: глобальный счетчик эпох, который отслеживает текущую эпоху.

  • Thread-local epoch counter: локальный для потока счетчик эпох, который обновляется при каждом входе в критическую секцию.

  • Garbage lists: списки мусора для каждой epoch, в которые добавляются освобождаемые объекты.

Пример:

use crossbeam_epoch as epoch;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr;

struct Node {
    value: i32,
    next: AtomicPtr<Node>,
}

fn main() {
    let n1 = Box::into_raw(Box::new(Node {
        value: 1,
        next: AtomicPtr::new(ptr::null_mut()),
    }));

    let n2 = Box::into_raw(Box::new(Node {
        value: 2,
        next: AtomicPtr::new(n1),
    }));

    let head = AtomicPtr::new(n2);

    epoch::pin(|scope| {
        let h = head.load(Ordering::Relaxed, scope);
        unsafe {
            if !h.is_null() {
                println!("Value: {}", (*h).value);
            }
        }
    });

    unsafe {
        drop(Box::from_raw(n1));
        drop(Box::from_raw(n2));
    }
}

Создали простую lock-free структуру данных и юзаем epoch-based GC для управления памятью.

Примеры использования

Многопоточная обработка событий в серверной системе

В серверных приложениях нужно обрабатывать множество событий, поступающих от различных клиентов. С lock-free очередями можно избежать задержек, связанных с блокировками:

use crossbeam_queue::SegQueue;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

struct Event {
    id: u32,
    payload: String,
}

fn main() {
    let event_queue = Arc::new(SegQueue::new());
    let mut handles = vec![];

    // поток, добавляющий события в очередь
    let producer_queue = Arc::clone(&event_queue);
    handles.push(thread::spawn(move || {
        for i in 0..100 {
            let event = Event {
                id: i,
                payload: format!("Event {}", i),
            };
            producer_queue.push(event);
            thread::sleep(Duration::from_millis(10));
        }
    }));

    // поток, обрабатывающий события из очереди
    let consumer_queue = Arc::clone(&event_queue);
    handles.push(thread::spawn(move || {
        while let Some(event) = consumer_queue.pop() {
            println!("Processing event id: {}", event.id);
            thread::sleep(Duration::from_millis(20));
        }
    }));

    for handle in handles {
        handle.join().unwrap();
    }
}

Один поток добавляет события в lock-free очередь SegQueue, а другой поток извлекает и обрабатывает их.

Пул потоков для веб-сервера

Пул потоков в основном юзают в веб-серверах для обработки запросов клиентов. lock-free здесь тоже находят свое применение:

use crossbeam_queue::SegQueue;
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc::{self, Sender};

struct ThreadPool {
    workers: Vec<Worker>,
    sender: Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    fn new(size: usize) -> ThreadPool {
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();
            println!("Worker {} got a job; executing.", id);
            job();
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

fn main() {
    let pool = ThreadPool::new(4);

    for i in 0..8 {
        pool.execute(move || {
            println!("Processing request {}", i);
        });
    }
}

Пул потоков использует канал mpsc для передачи задач.

Concurrent Hash Map

С Concurrent Hash Map можно круто кэшировать данные, разделяемые между потоками, без необходимости блокировок:

use std::sync::Arc;
use dashmap::DashMap;
use std::thread;

fn main() {
    let cache = Arc::new(DashMap::new());
    let mut handles = vec![];

    // потоки, добавляющие данные в кэш
    for i in 0..5 {
        let cache = Arc::clone(&cache);
        handles.push(thread::spawn(move || {
            for j in 0..10 {
                cache.insert(i * 10 + j, format!("Value {}", j));
            }
        }));
    }

    // потоки, читающие данные из кэша
    for i in 0..5 {
        let cache = Arc::clone(&cache);
        handles.push(thread::spawn(move || {
            for j in 0..10 {
                if let Some(value) = cache.get(&(i * 10 + j)) {
                    println!("Read from cache: {}", value);
                }
            }
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

Юзаем DashMap, которая обеспечивает lock-free операции для вставки и чтения данных.


На моем опыте было замечено, что с lock-free структурами данных можно сократить время выполнения задач на 30-50% по сравнению с использованием мьютексов.

Больше про языки программирования эксперты OTUS рассказывают в рамках практических онлайн-курсов. С полным каталогом курсов можно ознакомиться по ссылке.

Комментарии (1)


  1. T_12
    26.06.2024 19:51

    Но ведь DashMap под капотом шардирует и на каждый шард использует RwLock, разве нет?