На работе наша команда недавно столкнулась с необходимостью в высокопроизводительном IPC в Rust. Поиск привел нас к содержательной статье от 3tilley «IPC in Rust — a Ping Pong Comparison», что стало превосходной отправной точкой в нашем исследовании.

Вдохновляясь этой работой, мы решили копнуть глубже и провести собственные замеры производительности, в особенности нас интересовал новый многообещающий фреймворк iceoryx2. Взяв за основу работу в исходной статье, мы будем использовать UNIX Domain Sockets (как stream, так и datagram), Memory Mapped Files и Shared Memory с использованиемiceoryx2 для сравнения производительности IPC между процессами на одной машине для различных размеров пэйлоада.

Сетап

Мы будем измерять время, потраченное на выполнение цикла «запрос‑ответ» между двумя процессами, общающимися с помощью IPC. Все эксперименты схожи с показанными в оригинальной статье: 1. Отправитель генерирует запрос размером x KB 2. Получатель отвечает пэйлоадом такого же размера x KB 3. Мы измеряем время, затраченное на обмен сообщениями

Для выполнения измерений будет использоваться тот же инструмент Divan.

Железо и ОС

  • Env: Linux Cloud VM

  • Arch: x86_64

  • CPU: 32 cores

  • Model Name: AMD EPYC 7B13

  • L1d cache: 512 KiB

  • L1i cache: 512 KiB

  • L2 cache: 8 MiB

  • L3 cache: 64 MiB

  • Memory: 128GB

  • OS: Ubuntu 20.04.1

  • Kernel: 5.15.0

Оптимизации и техники измерения

  1. CPU Affinity: Привязка процессов к определенным ядрам предотвращает миграцию процессов между ядрами, из‑за которой может возникнуть вариативность в измерениях, вызванная переключениями контекста. Для этого используется core_affinity.

  2. CPU Warmup: Чтобы учесть первоначальные состояния низкого потребления энергии и добиться стабильной частоты, мы добавили период «прогрева» в 1 секунду перед выполнением замеров. Это позволяет процессору достичь устойчивого состояния производительности, предоставляя более точные и консистентные результаты с использованием различных механизмов IPC.

pub fn cpu_warmup() {
    let warmup = std::time::Instant::now();
    loop {
        if warmup.elapsed() > std::time::Duration::from_millis(1000) {
            break;
        }
    }
}

Генерация пэйлоада

В качестве пэйлоада используются случайно сгенерированные строки, состоящие из латинских букв разного регистра и цифр.

pub fn generate_random_data(data_size: usize, seed: u64) -> Vec<u8> {
    const CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ\
                            abcdefghijklmnopqrstuvwxyz\
                            0123456789";
    let mut rng = StdRng::seed_from_u64(seed);
    (0..data_size)
        .map(|_| {
            let idx = rng.gen_range(0..CHARSET.len());
            CHARSET[idx]
        })
        .collect()
}

Поскольку передача данных происходит побайтово, мы генерируем строки как векторы байт (Vec<u8>) заданного размера. seed позволяет генерировать одинаковый пэйлоад для заданного data_size, обеспечивая консистентность данных для каждого тестируемого подхода.

pub fn get_payload(data_size: usize) -> (Vec<u8>, Vec<u8>) {
    let request_data = generate_random_data(data_size, 1);
    let response_data = generate_random_data(data_size, 2);
    (request_data, response_data)
}

Функция get_payload возвращает данные запроса и ответа.

Подход 1 - UNIX Domain Stream Socket

UNIX Domain Sockets(UDS) обычно считаются более предпочтительными для обмена данными между процессами на одной UNIX‑машине по сравнению с IP‑сокетами. Они заявляются как более легкие и быстрые по сравнению с последними, поскольку не выполняют ряд операций, требуемых для сетевых интерфейсов.

Ниже упрощенная реализация использования UNIX Domain Stream Socket:

...
// Producer
const UNIX_SOCKET_PATH: &str = "/tmp/unix_stream.sock";

impl UnixStreamRunner {
    pub fn new(start_child: bool, data_size: usize) -> Self {
        let unix_listener = UnixListener::bind(UNIX_SOCKET_PATH).unwrap();
        let exe = crate::executable_path("unix_stream_consumer");
        let child_proc = if start_child {
            Some(Command::new(exe).args(&[data_size.to_string()]).spawn().unwrap())
        } else {
            None
        };

        let wrapper = UnixStreamWrapper::from_listener(unix_listener);
        let (request_data, response_data) = get_payload(data_size);

        Self { child_proc, wrapper, data_size, request_data, response_data }
    }

    pub fn run(&mut self, n: usize) {
        let mut buf = vec![0; self.data_size];
        for _ in 0..n {
            self.wrapper.stream.write(&self.request_data).unwrap();
            self.wrapper.stream.read_exact(&mut buf).unwrap();
        }
    }
}
...

// Cosumer
fn main() {
    let args: Vec<String> = std::env::args().collect();
    let data_size = usize::from_str(&args[1]).unwrap();

    core_affinity::set_for_current(core_affinity::CoreId { id: 0 });

    let mut wrapper = ipc::unix_stream::UnixStreamWrapper::unix_connect();
    let (request_data, response_data) = get_payload(data_size);
    cpu_warmup();

    let mut buf = vec![0; data_size];
    while let Ok(_) = wrapper.stream.read_exact(&mut buf) {
        wrapper.stream.write(&response_data).unwrap();
    }
}

Для потоковых сокетов мы предоставляем путь в файловой системе, который будет использоваться в качестве адреса сокета. После соединения в сокет записывается request_data, а получатель читает данные и записывает в буфер, после чего отвечает с помощью response_message.

Подход 2 - Unix Datagram Socket

В процессе реализации UNIX domain datagram socket мы столкнулись с некоторыми трудностями в рантайме:

  1. «Address already in use»: Первоначально мы пытались использовать один путь сокета /tmp/unix_datagram.sock для коммуникации, подобно потоковым сокетам. Мы пытались привязать оба процесса к одному пути, что приводило к ошибке рантайма «Address already in use». Потом мы узнали, что в случае использования сокетов датаграмм необходимо привязать каждый процесс к отдельному пути, после чего для общения между процессами необходимо соединить сокеты друг с другом с помощью connect(), что выставит адрес пира.

  2. «Message too big»: Данная ошибка возникает из‑за ограничений на размер датаграммы UDP. Для обработки больших размеров данных необходимо отправлять их частями.

  3. «No buffer space available»: В процессе поиска решения данной ошибки, мы обнаружили на stackoverflow, что данная ошибка возникает, когда ядро не может выделить память под буфер сокета и отправка данных через какой‑либо из сокетов невозможна, пока не будет освобождена память. Мы решили данную ошибку путем создания механизма повторной отправки. Мы пытаемся отправить сообщение до тех пор, пока не добьемся успешной записи.

  4. Потеря данных: Нам потребовался бы более точное управление потоком на стороне UDP, чтобы избежать потерь пакетов при большом размере данных. Поскольку наша основная задача заключается в измерении разницы между IPC на основе общей памяти и другими подходами, мы пока опустим данную задачу. Для избежания паник в случае периодических потери или повреждения данных, мы убрали строгие проверки корректности данных. Это позволяет продолжить выполнение программы, даже если часть пакетов будет потеряна или поступит в другом порядке.

Ниже упрощенная реализация:

...
// Producer
const MAX_CHUNK_SIZE: usize = 64 * KB;
const UNIX_DATAGRAM_SOCKET_1: &str = "/tmp/unix_datagram1.sock";
const UNIX_DATAGRAM_SOCKET_2: &str = "/tmp/unix_datagram2.sock";

impl UnixDatagramWrapper {
    pub fn new(is_child: bool, data_size: usize) -> Self {
        let (socket_path, peer_socket_path) = if is_child {
            (UNIX_DATAGRAM_SOCKET_1, UNIX_DATAGRAM_SOCKET_2)
        } else {
            (UNIX_DATAGRAM_SOCKET_2, UNIX_DATAGRAM_SOCKET_1)
        };
        let socket = UnixDatagram::bind(socket_path).unwrap();
        Self { socket, peer_socket_path, data_size, }
    }

    pub fn connect_to_peer(&self) {
        self.socket.connect(&self.peer_socket_path).unwrap();
    }

    pub fn send(&self, data: &Vec<u8>) {
        // Send data in chunks
        for chunk in data.chunks(MAX_CHUNK_SIZE) {
            // Retry until we have a successful write
            loop {
                match self.socket.send(chunk) {
                    Ok(_) => break,
                    Err(_) => continue,
                }
            }
        }
    }

    pub fn recv(&self) -> Vec<u8> {
        let mut received_data = Vec::new();
        // Read till we receive all chunks
        loop {
            let mut buf = vec![0; MAX_CHUNK_SIZE];
            let size = self.socket.recv(&mut buf).unwrap();
            received_data.extend_from_slice(&buf[..size]);
            if received_data.len() == self.data_size {
                break;
            }
        }
        received_data
    }
}

impl UnixDatagramRunner {
    pub fn new(start_child: bool, data_size: usize) -> Self {
        let is_child = false;
        let wrapper = UnixDatagramWrapper::new(is_child, data_size);
        let (request_data, response_data) = get_payload(data_size);

        let exe = crate::executable_path("unix_datagram_consumer");
        let child_proc = if start_child {
            Some(Command::new(exe).args(&[data_size.to_string()]).spawn().unwrap(),)
        } else {
            None
        };
        // Awkward sleep to make sure the child proc is ready
        sleep(Duration::from_millis(500));
        wrapper.connect_to_peer();

        Self { child_proc, wrapper, request_data, response_data, }
    }

    pub fn run(&mut self, n: usize, print: bool) {
        let start = Instant::now();
        for _ in 0..n {
            self.wrapper.send(&self.request_data);
            let _response = self.wrapper.recv();
        }
    }
}
...

// Consumer
fn main() {
    let args: Vec<String> = std::env::args().collect();
    let data_size = usize::from_str(&args[1]).unwrap();

    core_affinity::set_for_current(core_affinity::CoreId { id: 0 });

    let is_child = true;
    let socket_wrapper = ipc::unix_datagram::UnixDatagramWrapper::new(is_child, data_size);
    socket_wrapper.connect_to_peer();

    let (request_data, response_data) = get_payload(data_size);

    cpu_warmup();
    loop {
        let _request = socket_wrapper.recv();
        socket_wrapper.send(&response_data);
    }
}

Для каждого процесса мы привязываем сокеты с помощью bind() к socket_path и соединяем их к соответствующим peer_socket_path. Затем мы отправляем request_data частями и повторяем попытки, пока все данные не будут успешно отправлены. Принимающий процесс читает данные, пока все части не будут получены и отвечает, используя response_data.

Подход 3 - Memory Mapped Files

Отображение файлов в память это метод доступа к их содержимому, отображая их на участок памяти в адресном пространстве вызывающего процесса. Чтобы сделать изменения, внесенные одним из процессов видимыми другому, мы вызываем mmap() с флагом MAP_SHARED. Синхронизация между процессами осуществляется с использованием raw_sync, как и в оригинальной статье.

// Shared memory layout
//|    0    |    1    |    2    |    3    |    4    |    5    |    6    |    7    |
//|   producer lock   |   consumer lock   |      data buffer (ping or pong)       |
pub struct MmapWrapper {
    pub mmap: MmapMut,
    pub owner: bool,
    pub our_event: Box<dyn EventImpl>,
    pub their_event: Box<dyn EventImpl>,
    pub data_start: usize,
    pub data_size: usize,
}

impl MmapWrapper {
    pub fn new(owner: bool) -> Self {
        let path: PathBuf = "/tmp/mmap_data.txt".into();
        let file = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .open(&path)
            .unwrap();
        file.set_len(8).unwrap();

        let mut mmap = unsafe { MmapMut::map_mut(&file).unwrap() };
        let bytes = mmap.as_mut();

        // The two events are locks - one for each side. Each side activates the lock while it's
        // writing, and then unlocks when the data can be read
        let ((our_event, lock_bytes_ours), (their_event, lock_bytes_theirs)) = unsafe {
            if owner {
                (
                    BusyEvent::new(bytes.get_mut(0).unwrap(), true).unwrap(),
                    BusyEvent::new(bytes.get_mut(2).unwrap(), true).unwrap(),
                )
            } else {
                (
                    // If we're not the owner, the events have been created already
                    BusyEvent::from_existing(bytes.get_mut(2).unwrap()).unwrap(),
                    BusyEvent::from_existing(bytes.get_mut(0).unwrap()).unwrap(),
                )
            }
        };

        // Confirm that we've correctly indexed two bytes for each lock
        assert!(lock_bytes_ours <= 2);
        assert!(lock_bytes_theirs <= 2);
        if owner {
            our_event.set(EventState::Clear).unwrap();
            their_event.set(EventState::Clear).unwrap();
        }

        Self {
            mmap,
            owner,
            our_event,
            their_event,
            data_start: 4,
            data_size,
        }
    }

    pub fn signal_start(&mut self) {
        self.our_event.set(EventState::Clear).unwrap()
    }

    pub fn signal_finished(&mut self) {
        self.our_event.set(EventState::Signaled).unwrap()
    }

    pub fn write(&mut self, data: &[u8]) {
        (&mut self.mmap[self.data_start..]).write(data).unwrap();
    }

    pub fn read(&self) -> &[u8] {
        &self.mmap.as_ref()[self.data_start..self.data_size]
    }
}

Функция MmapMut::map_mut() создает общую карту распределения памяти с возможностью записи на основе файла. Первые 4 байта в отображении памяти зарезервированы под управление событиями, а остальные 4 — для хранения данных.

pub fn run(&mut self, n: usize, print: bool) {
    for _ in 0..n {
        // Activate our lock in preparation for writing
        self.wrapper.signal_start();
        self.wrapper.write(&self.request_data);
        // Unlock after writing
        self.wrapper.signal_finished();
        // Wait for their lock to be released so we can read
        if self.wrapper.their_event.wait(Timeout::Infinite).is_ok() {
            let str = self.wrapper.read();
        }
    }
}

Мы блокируем, записываем, разблокируем и читаем данные, когда это возможно.

Подход 4 - общая память с использованием iceoryx2

Библиотека iceoryx2 предоставляет возможность общения между процессами без локов и копирования данных. На данный момент библиотека не поддерживает механизм запрос‑ответ, так что мы будем использовать их механизм публикации‑подписки для измерения времени цикла запрос‑ответ.

Упрощенная реализация:

...
// Producer
pub struct IceoryxWrapper {
    pub publisher: Publisher<ipc::Service, [u8], ()>,
    pub subscriber: Subscriber<ipc::Service, [u8], ()>,
}

impl IceoryxWrapper {
    pub fn new(is_producer: bool, data_size: usize) -> IceoryxWrapper {
        let node = NodeBuilder::new().create::<ipc::Service>().unwrap();
        let request_name = ServiceName::new(&format!("Request")).unwrap();
        let request_service = node
            .service_builder(&request_name)
            .publish_subscribe::<[u8]>()
            .open_or_create()
            .unwrap();

        let response_name = ServiceName::new(&format!("Respose")).unwrap();
        let response_service = node
            .service_builder(&response_name)
            .publish_subscribe::<[u8]>()
            .open_or_create()
            .unwrap();

        let (publisher, subscriber) = if is_producer {
            (
                request_service
                    .publisher_builder()
                    .max_slice_len(data_size)
                    .create()
                    .unwrap(),
                response_service.subscriber_builder().create().unwrap(),
            )
        } else {
            (
                response_service
                    .publisher_builder()
                    .max_slice_len(data_size)
                    .create()
                    .unwrap(),
                request_service.subscriber_builder().create().unwrap(),
            )
        };

        IceoryxWrapper {
            publisher,
            subscriber,
        }
    }
}

impl IceoryxRunner {
    pub fn run(&mut self, n: usize, print: bool) {
        for _ in 0..n {
            let sample = self
                .wrapper
                .publisher
                .loan_slice_uninit(self.data_size)
                .unwrap();
            let sample = sample.write_from_slice(self.request_data.as_slice());
            sample.send().unwrap();

            // Waiting for response
            loop {
                if let Some(recv_payload) = self.wrapper.subscriber.receive().unwrap() {
                    break;
                }
            }
        }
    }
}
...

// Consumer
fn main() {
    let args: Vec<String> = std::env::args().collect();
    let data_size = usize::from_str(&args[1]).unwrap();

    core_affinity::set_for_current(core_affinity::CoreId { id: 0 });

    let wrapper = ipc::iceoryx::IceoryxWrapper::new(false, data_size);
    let (request_data, response_data) = get_payload(data_size);

    cpu_warmup();
    loop {
        if let Some(recv_payload) = wrapper.subscriber.receive().unwrap() {
            let sample = wrapper.publisher.loan_slice_uninit(data_size).unwrap();
            let sample = sample.write_from_slice(response_data.as_slice());
            sample.send().unwrap();
        }
    }
}

Мы определили два publish-subscribe сервиса. Один сервис используется для отправки пэйлоада запроса, второй - для возвращения ответа на него. После публикации request_data принимающий процесс читает сообщение, подписавшись на отправляющий сервис. Затем принимающий сервис публикует response_data отправляющем сервису, который подписан на принимающий и читает ответ.

Результаты

Цикл Ping-Pong

Некоторые наблюдения на основе производительности цикла отправки-получения:

  1. Подходы общего использования памяти и отображения файлов в память стабильно наиболее быстрые с наименьшем временем на операцию и наиболее высоким количеством операций в секунду.

  2. iceoryx2 показывает себя крайне выгодно и значительно быстрее традиционных методов IPC, таких как пайпы, сокеты и доменные сокеты Unix.

  3. Хотя значительной разницы не наблюдается, доменные сокеты Unix показывают себя лучше IP-сокетов.

Различные размеры сообщений

Время на операцию (в µs)

Количество операций в секунду (в тысячах)

Некоторые наблюдения:

  1. Первое наблюдение, следующее из графика в явной разнице в производительности между методам на основе использования памяти и традиционными методами IPC.

  2. Для небольших (от 1 КБ до 8 КБ) размеров сообщений файлы с отображением в памяти и использование общей памяти стабильно показывают лучший результат с наименьшим временем на операцию и наибольшим количеством операций в секунду. iceoryx2 удерживает относительно плоский профиль задержки до 8 КБ.

  3. По мере увеличения размера сообщения разница в производительности между iceoryx2 и использованием общей памяти/файлов с отображением в память значительно снижается и производительность становится одинаковой.

  4. Среди традиционных методов IPC потоковые сокеты Unix имеют лучшую производительность для всех размеров сообщений.

  5. Для пэйлоада размером больше 64 КБ наблюдается значительное снижение производительности, которое может быть вызвано ограничениями в максимальном размере датаграммы.

  6. Использование общей памяти и файлы, отображенные в память, показывают высокую производительность для всех размеров пэйлоада.

  7. iceoryx2 сохраняет конкурентоспособную пропускную способность, особенно для больших пэйлоадов.

  8. Традиционные методы IPC показывают значительно более низкую пропускную способность с сильным ее снижением по мере увеличения размера сообщения.

Подводя итоги

iceoryx2 показывает себя вполне привлекательно в ситуациях, когда требуется высокая производительность в сообщении между процессами. И хотя для сообщений небольшого размера она показывает себя медленнее использования общей памяти напрямую или файлов с отображением в памяти, у нее есть положительная сторона в виде абстрагирования ручной синхронизации, необходимой для других методов. Это приводит к хорошему балансу между производительностью и удобством разработки, делая ее привлекательным вариантом для высокопроизводительной IPC в проектах на Rust.

Особая благодарность elBoberido and Abhirag за ревью поста и предложенные улучшения.

Полный исходный код доступен наGithub и открыт к критике и улучшениям ?

Комментарии (1)


  1. AlexPTS
    19.10.2024 06:10

    Можно добавить в сравнение еще SystemV Message queues и SystemV shared memory с семафорами