С выходом версии 1.0.1 я решил осветить некоторые аспекты технической стороны моего проекта. В этой статье я расскажу про использование библиотеки tui-rs и про нюансы использования библиотеки tokio-rs, а так же постараюсь рассказать, что же было исправлено и почему так, как было раньше делать не надо. Добро пожаловать в статью - возможно, кому-то она поможет на тернистом пути погружения (или восхождения) в Rust.


Мой проект представляет из себя консольный клиент для игры World of Warcraft (но служит для других целей и вообще это - не то же самое). После недавнего обновления он обзавелся текстовым интерфейсом и вообще стал гораздо юзабельнее.

Отображение опкодов пакетов, отправленных с сервера
Отображение опкодов пакетов, отправленных с сервера

Но подключение интерфейса оказалось довольно объемной задачей. Начну с того, что я изначально решил выполнять генерацию интерфейса в виде отдельного таска для того, чтобы максимально инкапсулировать код, относящийся к UI. На текущий момент мои таски вызываются вот так:

join_all(vec![
	self.handle_ui_render(),
	self.handle_ui_input(),
	self.handle_ui_output(),
  
	self.handle_ai(),
	self.handle_queue(),
	self.handle_read(),
	self.handle_write(),
]).await;

таски, относящиеся к UI - их пока всего три. Помимо, собственно, таска для рендера я создаю отдельный таск для обработки событий с клавиатуры и отдельный таск для обработки отклика от интерфейса.

Типичный пример, как это работает: в процессе соединения с сервером может понадобиться сделать выбор. Например, выбор рилма (realm) или персонажа для входа в мир. Для этого необходимо показать какое-нибудь модальное окно.

Выбор рилма из списка в модальном окне
Выбор рилма из списка в модальном окне

Здесь вступает в силу следующий сценарий:

  1. я замораживаю обработку последующих пакетов (хотя, по прежнему их принимаю и сохраняю в очередь)

  2. отрисовываю модальное окно со списком персонажей или рилмов

  3. ожидаю, пока будет отправлено событие с клавиатуры (в нашем случае - нажатие стрелки вниз/вверх)

  4. перерисовываю интерфейс, но уже с выбранным элементом в списке

  5. в конце-концов, выбор сделан, требуется нажатие Enter, после чего происходит оповещение остальной части приложения о сделанном выборе

  6. и, наконец, я размораживаю обработку последующих пакетов

В коде это выглядит примерно так:

// события с клавиатуры принимаются здесь (см пункт 3 и 5)
fn handle_ui_input(&mut self) -> JoinHandle<()> {
	let income_pipe = self._income_message_pipe.lock().unwrap();
    // сюда приходят все входящие события с клавиатуры
	let event_income = income_pipe.event_income.clone();

	tokio::task::spawn_blocking(move || {
		let mut ui_input = UIInput::new(event_income);

		loop {
			ui_input.handle();
		}
	})
}

// отрисовка элементов интерфейса происходит здесь (см пункт 2 и 4)
fn handle_ui_render(&mut self) -> JoinHandle<()> {
	// ...
	let outcome_message_pipe = self._outcome_message_pipe.lock().unwrap();
    // отклик от UI от модальных окон
	let dialog_outcome = pipe.dialog_outcome.clone();
    // ...

	tokio::task::spawn_blocking(move || {
        // UI renderer, отвечающий за отрисовку интерфейса
		let mut ui = UI::new(
			CrosstermBackend::new(std::io::stdout()),
			UIOutputOptions {
				dialog_outcome,
				// ...
			},
		);

		loop {
			// ...

			ui.render(UIRenderOptions {
                // все входящие события принимаются здесь
				message: income_message_pipe.lock().unwrap().recv(),
				client_flags,
			});
		}
	})
}

fn handle_ui_output(&mut self) -> JoinHandle<()> {
	let outcome_message_pipe = Arc::clone(&self._outcome_message_pipe);
	// ...

	tokio::task::spawn_blocking(move || {
		loop {
            // принимаем исходящий отклик от UI и обрабатываем, если он имеется
			if let Ok(message) = outcome_message_pipe.lock().unwrap().recv() {
				// ...

				let mut ui_output = UIOutput::new(Arc::clone(&session), client_flags);
                // эту строку подробнее рассмотрим чуть ниже
				ui_output.handle(message);
			}
		}
	})
}

В ui_output.handle(message) на строке 57 выше происходит общение с остальной частью приложения через элементы межпроцессного взаимодействия (IPC) - в данном случае, это session , а так же через флаги приложения (созданные с помощью макроса bitflags! из одноименной библиотеки):

pub fn handle(&mut self, message: OutcomeMessageType) {
	match message {
        // получили отклик, что персонаж выбран по нажатию Enter
		OutcomeMessageType::CharacterSelected(character) => {
            // сохраняем персонажа в объект сессии
			self.session.lock().unwrap().me = Some(Player::from(character));
            // меняем флаг приложения для разморозки обработки пакетов (см пункт 6)
			self.client_flags.set(ClientFlags::IN_FROZEN_MODE, false);
		},
        // получили отклик, что рилм выбран по нажатию Enter
		OutcomeMessageType::RealmSelected(realm) => {
            // сохраняем выбранный рилм в объект сессии
			self.session.lock().unwrap().selected_realm = Some(realm);
            // меняем флаг приложения для разморозки обработки пакетов (см пункт 6)
			self.client_flags.set(ClientFlags::IN_FROZEN_MODE, false);
		},
        // ...
	};
}

Осталось рассмотреть, где происходит заморозка обработки пакетов (пункт 1).

fn handle_queue(&mut self) -> JoinHandle<()> {
    // все переменные, которые я клонирую через Arc::clone() 
    // имеют тип Arc<Mutex<...>>
	let input_queue = Arc::clone(&self._input_queue);
	let output_queue = Arc::clone(&self._output_queue);
	let session = Arc::clone(&self.session);
	let reader = Arc::clone(&self._reader);
	let writer = Arc::clone(&self._writer);
	let warden_crypt = Arc::clone(&self._warden_crypt);
	let client_flags = Arc::clone(&self.client_flags);
	let data_storage = Arc::clone(&self.data_storage);

	let income_pipe = self._income_message_pipe.lock().unwrap();
    // все входящие сообщения, которые мы хотим отобразить в debug panel
	let mut message_income = income_pipe.message_income.clone();
    // для отправки запроса на показ модального окна 
	let dialog_income = income_pipe.dialog_income.clone();

	tokio::spawn(async move {
		loop {
			let connected_to_realm = client_flags.lock().unwrap().contains(
				ClientFlags::IS_CONNECTED_TO_REALM
			);

			let packets = input_queue.lock().unwrap().pop_front();

			if packets.is_some() {
				for packet in packets.unwrap() {
					let processors = match connected_to_realm {
						true => Self::get_realm_processors(),
						false => Self::get_login_processors(),
					};

					let mut handler_input = HandlerInput {
						session: Arc::clone(&session),
						data: Some(&packet),
						data_storage: Arc::clone(&data_storage),
						message_income: message_income.clone(),
						dialog_income: dialog_income.clone(),
					};

                    // каждый процессор возвращает список хэндлеров для 
                    // определенного опкода, который мы извлекам из приходящего 
                    // с сервера пакета
					let handler_list = processors
						.iter()
						.map(|processor| processor(&mut handler_input))
						.flatten()
						.collect::<Vec<HandlerFunction>>();

					for mut handler in handler_list {
                        // каждый хэндлер выполняет одно действие и возвращает HandlerOutput
						match handler(&mut handler_input) {
							Ok(output) => {
								match output {
									HandlerOutput::Data((opcode, header, body)) => {
										// ...
									},
									HandlerOutput::ConnectionRequest(host, port) => {
										// ...
									},
									HandlerOutput::UpdateState(state) => {
										// ...
									},
                                    // этот тип ответа позволяет заморозить обработку
                                    // последующих пакетов (см. пункт 1)
									HandlerOutput::Freeze => {
										client_flags.lock().unwrap().set(
											ClientFlags::IN_FROZEN_MODE,
											true
										);

										loop {
											let frozen_mode = client_flags
												.lock()
												.unwrap()
												.contains(ClientFlags::IN_FROZEN_MODE);

                                            // для отмены нужно переключить флаг где-то извне
                                            // в нашем случае - это таск handle_ui_output (см. пункт 6)
											if !frozen_mode {
												break;
											}

											sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
										}
									},
									HandlerOutput::Void => {},
								};
							},
							Err(err) => {
								message_income.send_error_message(err.to_string());
							},
						};

						sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
					}
				}
			} else {
				sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
			}
		}
	})
}

На чем еще хотелось бы остановиться, так это на нюансах создания UI тасков. Вы наверняка заметили, что я их создаю через tokio::task::spawn_blocking() - и это не просто так. Эта функция гарантирует, что таск будет создан в отдельном потоке, таким образом, он не будет блокировать другие таски и не будет блокироваться ими. Или, в моем случае, такая функция гарантирует мгновенный отклик интерфейса, поскольку перехват событий/отрисовка будут выполняться независимо от остального приложения.

Про асинхронный код

Когда я только начинал свой путь в Rust на стезе мультипоточности, мне порекомендовали перейти на tokio-rs. Тогда я еще не вдавался в детали и решил, что это хороший повод изучить новую библиотеку. Но, оказалось, что нет. Потому что я, наверное, допустил все ошибки, которые можно было допустить при использовании tokio-rs.

Начнем с того, что tokio-rs - это такой рантайм (runtime) для запуска асинхронного кода. Можно использовать и другие рантаймы или вообще написать свой, если вы видите в этом нужду. По умолчанию все таски в tokio выполняются в одном потоке, но вы можете настроить мультипоточность, подключив фичу (feature) под названием "rt-multi-thread", после чего, при необходимости, таски будут создаваться в отдельных потоках.

Первой моей ошибкой было использование async везде. Даже там, где это не нужно. Все мои таски стали асинхронными, новые таски по умолчанию записывались в асинхронные. Типичный таск выглядел примерно так:

async fn handle_ai(&mut self) -> JoinHandle<()> {
	let session = Arc::clone(&self.session);
	let data_storage = Arc::clone(&self.data_storage);
	let output_queue = Arc::clone(&self._output_queue);

	let mut movement_ai = MovementAI::new();

	tokio::spawn(async move {
		loop {
			movement_ai.manage(AIManagerInput {
				session: Arc::clone(&session),
				data_storage: Arc::clone(&data_storage),
				output_queue: Arc::clone(&output_queue),
			}).await;

			sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
		}
	})
}

// далее таски вызывались примерно так
join_all(vec![
	self.handle_ui_render().await,
	self.handle_ui_input().await,
	self.handle_ui_output().await,
	self.handle_ai().await,
	self.handle_queue().await,
	self.handle_read().await,
	self.handle_write().await,
]).await;

Постарайтесь самостоятельно определить, какие в коде выше ошибки.

Вот какие

async в определении функции - не нужен, так как асинхронный код присутствует только в теле блока async, которое передается в tokio::spawn, соответственно, вызов функций в join_all должен быть без await, т.к. там нечего эвэйтить.

Т.е., грубо говоря, код выше - избыточен.

Вторая ошибка была серьезнее - и привела к дэдлокам (deadlock). В погоне за необходимостью использовать mutex для обеспечения совместного доступа (Shared Ownership) к внешним переменным я стал использовать tokio::sync::mutex . В результате (из-за гонки ака race condition) попал в несколько ситуаций, когда какой-нибудь client_flags блокируется в одном из тасков и не дает выполняться другому таску, который зависит от искомого. Типичный дэдлок может выглядеть вот так:

async fn handle_queue(&mut self) -> JoinHandle<()> {
	// ...

	tokio::spawn(async move {
		loop {
			let connected_to_realm = client_flags.lock().unwrap().contains(
				ClientFlags::IS_CONNECTED_TO_REALM
			);
            // дэдлок здесь
			if let Some(packets) = input_queue.lock().await.pop_front() {
				for packet in packets.unwrap() {
					// ...

					for mut handler in handler_list {
						match handler(&mut handler_input) {
							Ok(output) => {
								match output {
									HandlerOutput::Data((opcode, header, body)) => {},
									HandlerOutput::ConnectionRequest(host, port) => {},
									HandlerOutput::UpdateState(state) => {},
									HandlerOutput::Freeze => {},
									HandlerOutput::Void => {},
								};
							},
							Err(err) => {},
						};

						sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
					}
				}
			} else {
				sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
			}
		}
	})
}
Детали здесь

Дэдлок на строке 10. input_queue блокируется на время существования блока if. Вариант решения - вынести input_queue.lock().await.pop_front() в переменную.

Но лучше - не использовать (по возможности) tokio::sync::mutexвообще. В самой документации tokio-rs рекомендуется использовать асинхронный mutex с умом (например, для соединений с базой данных), а в большинстве случаев - использовать std::sync::mutex (т.е. синхронный mutex из стандартной библиотеки).

Вобщем, классический пример, когда сначала прочитал документации и сделал как надо понаставил везде асинхронных mutex, нахватал дэдлоков, после чего по ссылке со stackoverflow прочитал документацию и, наконец, героически пофиксил сделал как надо.

Отдельно хотелось бы рассмотреть такую структуру как Reader. До недавнего обновления он выглядел примерно так:

pub struct Reader {
    stream: OwnedReadHalf,
    decryptor: Option<Decryptor>,
    warden_crypt: Arc<Mutex<Option<WardenCrypt>>>
}

impl Reader {
    pub fn new(reader: OwnedReadHalf) -> Self {
        Self {
            stream: reader,
            decryptor: None,
            warden_crypt: Arc::new(Mutex::new(None))
        }
    }

    pub fn init(&mut self, session_key: &[u8], warden_crypt: Arc<Mutex<Option<WardenCrypt>>>) {
        self.decryptor = Some(Decryptor::new(session_key));
        self.warden_crypt = warden_crypt;
    }

    pub async fn read(&mut self) -> Result<Vec<Vec<u8>>, Error> {
        let mut buffer = [0u8; 65536];
        // здесь проблема, используется блокирующий read
        match self.stream.read(&mut buffer).await {
            Ok(bytes_count) => {
                let result = match self.decryptor.as_mut() {
                    Some(decryptor) => {
                        let warden_crypt = &mut *self.warden_crypt.lock().await;

                        Self::parse_packets(
                            buffer[..bytes_count].to_vec(),
                            decryptor,
                            warden_crypt.as_mut().unwrap(),
                        )
                    },
                    _ => {
                        vec![buffer[..bytes_count].to_vec()]
                    },
                };

                Ok(result)
            },
            Err(err) => {},
        }
    }
}

Асинхронный .read() на строке 24 выше стал тоже своего рода блокером, когда в зависимости от сервера мне приходилось играться с значением READ_TIMEOUT , чтобы код работал. Я думал, что причина - в пинге, пока не отрефакторил приложение и заменил, наконец, блокирующий .read() на неблокирующий .try_read() :

pub fn read(&mut self) -> Result<Vec<Vec<u8>>, Error> {
	let mut buffer = [0u8; 65536];

	match self.stream.try_read(&mut buffer) {
		Ok(bytes_count) => {
			let result = match self.decryptor.as_mut() {
				// ...
			};

			Ok(result)
		},
		Err(err) => {
			// ...
		},
	}
}

после этого фикса приложение стало работать быстрее (с учетом, что на некоторых серверах READ_TIMEOUT был равен 250-300).

Итого

После избавления от избыточного async/await приложение стало работать быстрее и предсказуемее. С позиции своего опыта я бы рекомендовал использовать async/await с умом, в частности, внимательно подходить к выбору mutex (tokio-rs или стандартная библиотека).

Одна из сложностей, с которой я столкнулся на этапе разработки - отсутствие местами качественной документации. Взять тот же tui-rs, было непонятно, что Layout можно дробить и использовать части в другом Layout, пока я не нашел кусок кода с примером в другом проекте github. Или было непонятно, как создать модальное окно, чтобы оно перекрывало часть уже отрисованного интерфейса (нужно рендерить виджет Clear).

Поэтому, я бы хотел, чтобы мой проект, в частности, мог быть использован в качестве примеров: работы со сложными Layout, нюансов отрисовки модалок или примера, как можно использовать ввод с клавиатуры в полноценном приложении.

Так же я надеюсь, что эта статья поможет вам избежать дэдлоков.

И, конечно, я буду рад всем, кто захочет сделать свой вклад в мой проект: https://github.com/idewave/idewave-cli

Комментарии (3)


  1. zblsv
    09.08.2022 14:15
    +2

    +500 Статья именно того формата, который лично мне нужен от Хабра. Пишите о том, что вы делаете и о том, что вы знаете: как делали и что знаете. Не надо о том, как лучше сделать и о том, что все знают... Надо только про себя.


    1. ivanuzzo Автор
      09.08.2022 16:00

      рад, что статья вам понравилась !


  1. Crash13
    09.08.2022 22:10
    +1

    Автор, пеши еще!