Если опустить первое и самое главное предубеждение относительно питонячьей многопоточности у большинства программистов — что её не существует из-за GIL, — то остается другое, и, наверное, вполне достоверное: многопоточность — это сложно, и нам этого, пожалуйста, не надо. И знаете что? Так оно и есть. Многопоточность — это сложно, особенно когда выбираешься за пределы стандартных руководств и попадаешь со своей многопоточной поделкой в реальный мир. И, возможно, вам не нужно. Ни здесь, ни далее я не буду обсуждать целесообразность написания многопоточного кода на Python и сразу перейду к тому, как это делать.

Если эта статья всем понравится, за ней может последовать серия на ту же тему: как писать многопоточный код с примерами на Python. Конкретно в этой статье примеры будут взяты из фреймворка Polog, который я пишу в свободное время. Он предназначен для логирования, но я постарался вынести все базовые принципы и паттерны в эту статью в специфико-агностичной форме. А там, где без специфики фреймворка не обойтись, буду указывать, в чём именно она состоит. Хочется верить, что это даже поспособствует усвоению материала за счёт каких-то ассоциативных связей, что ли.

Дисклеймер

Несмотря на то, что я привожу собственный код в качестве примера, я тем самым не утверждаю, что он написан лучшим из возможным способов. Более того, я даже не уверен, что вы не сможете найти в нём каких-либо (возможно, фатальных) недостатков. Если вдруг вы в состоянии это сделать, было бы круто, не сдерживайте себя.

Кроме того, здесь не будет ликбеза про многопоточность. Предполагается, что вы уже прочитали что-то вроде документации к модулю threading из стандартной библиотеки, и в целом хотя бы немного знакомы с темой. Вам не нужно объяснять, чем потоки отличаются от вашей бабушки, кто такие эти наши локи (и каковы их семейные взаимоотношения с дедом), что такое состояние гонки и всякое подобное. Про это всё вы уже сами прочитали и теперь вам интересно не столько то, что такое многопоточность, сколько то, как её применение может выглядеть в конкретном проекте.

Чего мы хотим?

Существует самый простой и очевидный способ хранить настройки приложения в качестве обычных переменных внутри модуля или класса. Его используют в том числе крупные веб-фреймворки, и я поначалу делал так же. Так почему же по итогу пришлось всё переделывать? Дело в том, что к хранению настроек внутри фреймворка возникли довольно специфические требования:

  • Прежде всего, необходима гибкая валидация значений. Причём, поскольку это библиотека, пользователь должен получать информативное исключение с описанием того, что именно он сделал не так, когда попытался применить заведомо невозможную настройку (например, установить какой-нибудь интервал времени в виде отрицательной величины).

  • Необходимо валидировать не просто значения сами по себе, но и их комбинации. К примеру, в Polog поддерживается два типа движков: синхронный (без использования воркеров в отдельных потоках) и асинхронный. Синхронный подгружается, когда настройка pool_size установлена в значение 0, асинхронный — при любом значении больше нуля. Асинхронный движок работает по схеме producer–consumer с передачей данных через очередь. Так вот, для этой самой очереди можно установить лимит — целое число больше нуля, а можно не устанавливать, оставив значение 0 по умолчанию. «Невозможной» комбинацией является pool_size равный нолю, и при этом max_queue_size (лимит размера очереди) больше нуля, поскольку никакой очереди в синхронном движке просто нет. Хотя каждое отдельное из значений — нулевой размер пула и ненулевой размер очереди — само по себе валидно, при попытке установить такую комбинацию мы тоже должны кинуть исключение с описанием конфликтующих полей.

  • Нужна возможность изменять настройки в любой момент. Исходно всё работало так: перед записью самого первого лога вы указывали нужные значения для всех пунктов настроек, и потом, когда первый лог создавался, инициализировались все внутренние объекты фреймворка согласно настройкам на этот момент времени. Дальше уже ничего поменять было нельзя. Так быть не должно, в идеале любой пункт настроек должно быть возможно изменить в любой момент времени, не потеряв при этом ни одной записи. Дальше станет ясно, почему этого добиться не так уж просто.

  • Каждый пункт настроек должен иметь значение по умолчанию.

  • Всё должно быть защищено от состояния гонки.

  • Хранилище должно иметь интерфейс словаря. Это просто и удобно, а ещё так компоненты фреймворка проще тестировать, подменяя хранилище настроек словарем.

Базовая концепция

У нас есть класс, оборачивающий доступ к словарю, объявленному как один из его атрибутов. Примерно вот такой:

from threading import Lock
from polog.core.utils.reload_engine import reload_engine
from polog.core.utils.read_only_singleton import ReadOnlySingleton
... # Прочие всякие импорты.


class SettingsStore(ReadOnlySingleton):
  # Перечисление пунктов настроек. Каждый пункт - экземпляр класса SettingPoint, внутри которого проводятся все необходимые проверки и блокировки.
  # По сути весь класс SettingsStore проксирует доступ к этому словарю.
  points = {
    'pool_size': SettingPoint(
      2,
      proves={
        'the value must be an integer': lambda x: isinstance(x, int),
        'the value must be greater than or equal to zero': lambda x: x >= 0,
      },
      conflicts={
        'max_queue_size': lambda new_value, old_value, other_field_value: new_value == 0 and other_field_value != 0,
      },
      action=lambda old_value, new_value, store: reload_engine() if old_value != new_value else None,
      read_lock=True,
      shared_lock_with=(
        'max_queue_size',
        'started',
      ),
    ),
    'max_queue_size': SettingPoint(
      0,
      proves={
        'the value must be an integer': lambda x: isinstance(x, int),
        'the value must be greater than or equal to zero': lambda x: x >= 0,
      },
      conflicts={
        'pool_size': lambda new_value, old_value, other_field_value: new_value != 0 and other_field_value == 0,
      },
      action=lambda old_value, new_value, store: reload_engine() if old_value != new_value else None,
      read_lock=True,
      shared_lock_with=(
        'pool_size',
        'started',
      ),
    ),
    ... # Там ниже должны быть еще другие пункты настроек, мы их не будем приводить здесь в целях экономии трафика.
  }
  lock = Lock()
  points_are_informed = False

  def __init__(self):
    with self.lock:
      if not self.points_are_informed:
          for name, point in self.points.items():
              # Передаем в объект поля объект хранилища.
              # Таким образом теперь каждый объект поля может обращаться ко всем остальным объектам поля.
              point.set_store_object(self)
              # Оповещаем объект поля о его собственном имени. Это полезно для конструирования сообщений об ошибках, например.
              point.set_name(name)
          for name, point in self.points.items():
              # По умолчанию внутри каждого поля есть собственный объект лока.
              # В данном методе объект поля передает свой лок всем связанным с ним полям.
              # Это нужно, чтобы блокировка одного поля блочила и остальные из этой группы. Используется в ситуациях, когда атомарное изменение должно быть комплексным.
              point.share_lock_object()
          self.points_are_informed = True

  def __getitem__(self, key):
    point = self.get_point(key)
    return point.get()

  def __setitem__(self, key, value):
    point = self.get_point(key)
    point.set(value)

  def get_point(self, key):
    if key not in self.points:
      raise KeyError(f'{key} - there is no settings point with this name.')
    return self.points[key]

  ... # Какие-то прочие методы, для нас пока не существенные.

Ключи в этом словаре — имена пунктов настроек, а значения — объекты, в которых хранятся значения. Именно в этих объектах класса SettingPoint и происходит почти вся описанная выше магия: валидация значений с киданием красивых исключений, блокировки и всё прочее. Далее мы сосредоточимся на устройстве и механизмах внутри этого класса.

Валидируем одиночные значения

Рассмотрение класса SettingPoint мы начнём с самого простого — как валидируются значения. Напомню, что создание экземпляра выглядело примерно так:

SettingPoint(
  2,
  proves={
    'the value must be an integer': lambda x: isinstance(x, int),
    'the value must be greater than or equal to zero': lambda x: x >= 0,
  },
  conflicts={
    'max_queue_size': lambda new_value, old_value, other_field_value: new_value == 0 and other_field_value != 0,
  },
  action=lambda old_value, new_value, store: reload_engine() if old_value != new_value else None,
  read_lock=True,
  shared_lock_with=(
    'max_queue_size',
    'started',
  ),
)

На этом этапе нас интересует словарь, который мы передаем для инициализации экземпляра как аргумент proves:

{
  'the value must be an integer': lambda x: isinstance(x, int),
  'the value must be greater than or equal to zero': lambda x: x >= 0,
}

Как несложно догадаться, ключи в этом словаре — фрагменты тех сообщений об ошибках, с которыми будут подниматься исключения. А значения — функции. Вот так выглядит метод объекта SettingPoint, где применяются эти лямбды:

def prove_value(self, value):
  for message, prove in self.proves.items(): # Предварительно словарь с лямбдами был сохранен в экземпляре как self.proves.
    if not prove(value):
      if not hasattr(self, 'name'):
        full_message = f'You used an incorrect value "{value}": {message}.'
      else:
        full_message = f'You used an incorrect value "{value}" for the field "{self.name}": {message}.'
      raise ValueError(full_message)

Как видим, перед поднятием исключения конструируется сообщение, в котором указано, с каким полем возникла проблема и какая именно. Всё это на человеческом английском. К слову говоря, каждый объект SettingPoint был оповещён о своём имени внутри метода init класса SettingsStore.

Ищем конфликты

Немного усложняем задачу: теперь нам нужно проверить, что два разных значения настроек не образуют невозможную комбинацию, хотя каждое из них в отдельности валидно. Для этого при инициализации объекта SettingPoint мы передаём словарь с описанием конфликтов, примерно такой:

{
  'max_queue_size': lambda new_value, old_value, other_field_value: new_value == 0 and other_field_value != 0,
}

Ключи в этом словаре — это названия полей, конфликты с которыми мы проверяем, а значения — функции. При каждой попытке сохранить новое значение этого пункта настроек мы передаём в каждую из функций три параметра: новое значение; значение, которое было до этого; и текущее значение того поля, конфликты с которым мы ищем. Для этого в объекте поля запускается простенький метод:

def prove_conflicts(self, old_value, new_value):
  for field_name, conflict_checker in self.conflicts.items(): # Предварительно словарь с описанием конфликтов был сохранен в self.conflicts.
    if conflict_checker(new_value, old_value, self.store.force_get(field_name)): # .force_get() - метод хранилища настроек, позволяющий получить значение любого пункта в обход блокировки, то есть осуществить по сути "грязное чтение". Но о блокировках будет дальше, до них мы еще не дошли.
      raise ValueError(f'The new value "{new_value}" of the field "{self.name}" is incompatible with the current value "{self.store.force_get(field_name)}" of the field "{field_name}".')

Что ж, мы провели два типа проверок и теперь должны быть уверены, что новое значение настройки нам подходит. Теперь наша задача — применить полученные данные.

Меняем настройки в рантайме

Пунктов настроек больше одного, но я опишу конвейер изменения одного из них, с которым пришлось повозиться больше всех. Чтобы объяснить, почему так сложно его изменить, придётся немного погрузить в устройство фреймворка до и после. Будет немного специфики, без неё никуда.

Исходно ядро Polog было исключительно многопоточным, на базе самописного thread pool. Тредпул работал по паттерну издатель-подписчик, то есть состоял из тр`х логических частей:

  1. Очередь для коммуникации.

  2. Объект, который передаёт логи в очередь.

  3. Набор потоков-воркеров. В каждом из них запущен бесконечный цикл, в котором они ожидают появления новых записей в очереди. Как только первый из них забирает лог, он сразу его как-то обрабатывает, например, записывает в файл.

Воркеры создавались «лениво», в момент записи первого лога. Это позволяло применить любые настройки на этапе инициализации программы. Проблема только в том, что тредпул получался «приколоченным гвоздями». После записи первого лога уже ни добавить воркеров, ни убрать лишних. Кроме того, не была предусмотрена возможность использования беспоточного движка: класс движка тоже, получается, приколочен. Валидным размером тредпула в настройках было только число не меньше единицы.

Существует две базовые стратегии изменения тредпула «на лету»:

  1. Изменение текущего тредпула с помощью добавления новых или удаления из него старых воркеров. Это вполне легко реализовать, нужно лишь получить новое число потоков; сравнить его с текущим; определить, добавляем или убираем; и применить изменение.

  2. Обернуть компонент с тредпулом в объект уровнем выше, который при каждом изменении настроек будет пересоздавать пул с нуля.

Казалось бы, первый вариант экономичнее. Надо нам добавить 101-й поток, мы не занимаемся бессмысленным уничтожением и созданием заново предыдущей сотни. Мы просто создаём ещё один и добавляем в пул. Однако я выбрал второй вариант, поскольку его API универсальнее и позволяет легко изменять любые характеристики движка, а не только количество воркеров. Любое изменение — это уничтожение предыдущего компонента и создание нового в соответствии с новыми настройками. У такого решения, кстати, был важный и полезный побочный эффект: стало возможным существование более одного типа движков. Старый уничтожается, а новый может быть каким угодно, он не связан со старым какими-либо общими механизмами.

В новой схеме может быть сколько угодно классов движков. Главное, чтобы движок имел два обязательных метода: write() для записи лога и stop() для самоуничтожения. При уничтожении движок должен подчистить за собой все занятые ресурсы, в том числе присоединить все созданные дополнительные потоки. Текущий движок оборачивается в объект специального класса Engine. «Обёртка» настоящего движка называется так, поскольку для всего окружающего вызывающего кода именно она представляет из себя движок. Вызывающий код не должен знать, что движок на самом деле — это что-то под капотом.

«Настоящих» движков (не обёрток) в настоящее время доступно два типа: синхронный и асинхронный, как я уже писал выше. Их порождает вот такая простенькая функция:

def real_engine_fabric(settings):
  if settings.force_get('pool_size') == 0:
    return SingleThreadedRealEngine(settings)
  return MultiThreadedRealEngine(settings)

Если размер запрошенного пула потоков (настройка pool_size) больше нуля, то создаётся асинхронный движок на базе тредпула. А если равен нулю, то создаётся более примитивный синхронный движок, у которого нет тредпула, а есть просто метод, куда можно передать лог и он будет обработан.

Для понимания, как выглядит минимальный возможный движок, синхронная версия выглядит так:

class SingleThreadedRealEngine(AbstractRealEngine):
  def write(self, log):
    for handler in log.get_handlers():
      self.call_handler(handler, log)

  @exception_escaping
  def call_handler(self, handler, log):
    handler(log)

Это всё. Метод самоуничтожения спрятан в классе, от которого наследуется движок, и не делает ничего, поскольку тут нет никаких ресурсов, которые надо освобождать.

Асинхронный немного сложнее, ведь он должен уметь завершаться, не потеряв ни одного лога в очереди. Давайте на него тоже глянем, прежде чем идти дальше:

class MultiThreadedRealEngine(AbstractRealEngine):
  def __init__(self, settings):
    super().__init__(settings)
    self.pool = ThreadPool(settings)

  def write(self, log):
    self.pool.put(log)

  def stop(self):
    self.pool.stop()

Как видите, вся работа по корректному завершению происходит внутри пула, так что провалимся в него:

import time
from queue import Queue

from polog.core.engine.real_engines.multithreaded.worker import Worker


class ThreadPool:
  def __init__(self, settings):
    self.settings = settings
    self.queue = Queue(maxsize=self.settings.force_get('max_queue_size'))
    self.workers = self.create_workers()

  def put(self, log):
    self.queue.put(log)

  def stop(self):
    self.wait_empty_queue()
    for worker in self.workers:
      worker.set_stop_flag()
    for worker in self.workers:
      worker.stop()

  def wait_empty_queue(self):
    delay = self.settings['time_quant'] * self.settings['delay_on_exit_loop_iteration_in_quants']
    while True:
      if self.queue.empty():
        break
      time.sleep(delay)

  def create_workers(self):
    workers = []
    for index in range(self.settings.force_get('pool_size')):
      worker = Worker(self.queue, index, self.settings)
      workers.append(worker)
    return workers

Итак, завершение работы пула происходит в три этапа:

  1. Дожидаемся, пока очередь с логами опустеет. При этом важно, чтобы с другой стороны её никто не пополнял, но об этом будет дальше. Ожидание опустения очереди — это обычный цикл, в котором с определённой периодичностью запрашивается размер очереди, и который завершается, когда очередь пуста.

  2. Сообщаем каждому воркеру, что пора заканчивать работу. Дело в том, что у него на руках в этот момент ещё может быть последний взятый из очереди лог, поэтому мы не можем прервать его насильно и вынуждены вежливо попросить. Воркер, в свою очередь (на это мы посмотрим дальше), получив такое сообщение, должен закончить с последним взятым из очереди логом, после чего добровольно суициднуться прервать собственный цикл.

  3. Джойним каждый поток с воркером.

Внутри воркера обработка сообщения об остановке выглядит как установка в значение True флага self.stopped:

class Worker:
  def run(self): # Тот самый метод с бесконечным циклом.
    stopped_from_flag = False
    while True:
      try:
        while True:
          try:
            log = self.queue.get(timeout=self.settings['time_quant'])
            break
          except Empty:
            if self.stopped:
              stopped_from_flag = True
              break
        if stopped_from_flag:
          break
        self.do_anything(log)
        self.queue.task_done()
      except Exception as e:
        self.queue.task_done()

  ... # Какие-то прочие методы.

Этот пример кода, возможно, придётся пораскуривать некоторое время, мне и самому его не так просто в голове уместить. Прежде всего требует пояснений, почему здесь два бесконечных цикла, а не один, как я обещал ранее. Дело в том, что если воркер ожидает из очереди данных без таймаута, то возможна ситуация, когда мы дождались бы опустения очереди, проставили флаг завершения и дальше бесконечно ждали бы его. Поэтому воркер ожидает данные из очереди не бесконечно, а периодически «просыпается», чтобы проверить, не попросили ли его прерваться. Не попросили — прекрасно, возвращаемся в очередь.

Выше мы выяснили, как происходит рождение и остановка движков, однако здесь мне придётся вернуться немного назад, чтобы рассказать ещё о паре важных моментов.

Во-первых, как вы могли заметить, изменение даже одного пункта настроек может быть сложной комплексной операцией. Опасно, чтобы одновременно выполнялись сразу две таких операции над одной и той же настройкой. Поэтому все изменения настроек всегда происходят из-под блокировки. Также в некоторых случаях значение настройки является невалидным в промежутке, пока оно устанавливается, но не всегда. К примеру, если изменился размер пула потоков в движке, то где-то «посередине» будет момент, когда старый движок уже уничтожен, а новый ещё не создан. В этот момент текущее значение настройки де-факто врёт и считывать его нельзя. Поэтому для отдельных пунктов настроек можно распространить блокировку записи и на чтение. Для этого при создании объекта SettingPoint нужно передать туда параметр read_lock=True.

Внутри SettingPoint при каждом чтении настройки было бы слишком дорого проверять, установлен такой флаг или нет. Поэтому метод get() де-факто конструируется внутри метода set_read_lock() при инициализации SettingPoint, и дальнейшие вызовы происходят с минимальным оверхедом:

# Методы где-то внутри SettingPoint:
def set_read_lock(self, read_lock):
  if read_lock: # Здесь read_lock - это тот самый булеановский флаг, который мы передали при инициализации объекта SettingPoint.
    self.get = self.locked_get
  else:
    self.get = self.unlocked_get

def unlocked_get(self):
  return self.value

def locked_get(self):
  with self.lock:
    return self.value

Во-вторых, при подмене движка во время работы программы нужно учитывать, что всё это время его обёртку могут вызывать логгеры из разных потоков, поэтому обёртку движка мы также временно блокируем. Код ниже демонстрирует, как происходит блокировка и последующая разблокировка движка при перезагрузке:

#
def reload(self):
  if self.settings['started']: # Настоящая подгрузка движка все еще ленивая, поэтому перезагрузка до записи первого лога лишена всякого смысла.
    self.block()
    self.stop()
    self.load()
    self.unlock()

def block(self):
  if self.settings['started']:
    self.lock.acquire()
    self.write = self._blocked_write

def unlock(self):
  self.write = self._new_write
  self.lock.release()

def _new_write(self, log):
  self.real_engine.write(log)

def _blocked_write(self, log):
  with self.lock:
    self._new_write(log)

Таким образом, любой поток, который попытается записать лог в тот момент, пока движок перезагружается, будет временно заблокирован, и продолжит свое выполнение, когда всё уже завершится, ничего даже не заметив. Лог будет записан как положено и не потеряется.

Если вы добрались до этого места и всё ещё ничего не поняли, то вот краткий пересказ того, как происходит применение нового значения пункта pool_size:

1. Проверяем валидность нового значения.
2. Проверяем конфликты нового значения.
3. Ставим блокировку на запись новых значений этого пункта настроек и всех с ним связанных.
4. Если нужно, ставим блокировку на чтение.
5. Сохраняем новое значение.
6. Запускаем коллбек:
  6.1. Перезагружаем движок:
    6.1.1. Устанавливаем блокировку на запись логов.
    6.1.2. Останавливаем старый движок:
      6.1.2.1. Если он был асинхронный:
        6.1.2.1.1. Дожидаемся, пока очередь опустеет.
        6.1.2.1.2. Устанавливаем для каждого воркера флаг остановки.
        6.1.2.1.2. Внутри воркера:
          6.1.2.1.2.1. Проверяем периодически этот флаг, в том числе пока ожидаем новых данных из очереди.
          6.1.2.1.2.2. Если флаг установлен:
            6.1.2.1.2.2.1. Прерываем бесконечный цикл.
        6.1.2.1.3. Джойним все потоки с воркерами.
      6.1.2.2. Если движок синхронный:
        6.1.2.2.1. Ничего не делаем.
    6.1.3. Создаём экземпляр нового движка:
      6.1.3.1. Внутри фабрики создания движков:
        6.1.3.1.1. Если настройка pool_size установлена в значение 0:
          6.1.3.1.1.1. Создаем синхронный движок.
        6.1.3.1.2. Иначе:
          6.1.3.1.2.1. Создаем асинхронный движок.
    6.1.4. Снимаем блокировку на запись логов.
7. Снимаем блокировку на запись для данного пункта настроек и всех с ним связанных.
8. Если ранее была установлена, то снимаем блокировку на чтение этого пункта настроек.

У других пунктов настроек схема может быть попроще, но принципиально отличаться не будет.

Заключение

Если вы дочитали, а не пролистали, статью до этого места, вам представленный код мог показаться чересчур сложным для решаемой им задачи (а если раньше вы уже делали что-то похожее, возможно, наоборот, слишком простым). Нужно учитывать, что такое навороченное решение с почти реализованной транзакционностью не появляется за один присест. Это результат неоднократного рефакторинга, причём каждая следующая итерация по сути добавляла новый уровень ненаивности в реализацию. И я не знаю, сколько их ещё впереди. Наивные решения задач — это хорошо и эффективно в большинстве случаев, если только речь не про многопоточность. А многопоточность — это сложно.

Комментарии (21)


  1. FeNUMe
    18.11.2021 12:01
    +1

    <зануда_мод>

    "SettingsStore" - магазин настроек? или все же планировалось хранилище SettingsStorage?

    </зануда_мод>


    1. pomponchik Автор
      18.11.2021 12:34

      Я думал об этом. Мне вариант со "Storage" не очень подходит, поскольку навевает ассоциации с персистентностью хранилища, а оно де-факто существует только в оперативной памяти. Между тем, одно из значений "Store" - это как раз "хранилище".


  1. mixsture
    18.11.2021 13:25

    Хмм. Как proof-of-concept идея асинхронности логирования подходит, но добавляет довольно существенные минусы и я бы сторонился такого решения на проде.
    От логов обычно ждут:
    1) точность во времени. Вот поставил я запись в лог между строками программы — вот ровно в этот момент и должно записаться. Асинхронность же тут отдаляет момент записи от реального исполнения. Причем никакой определенности «насколько отдаляет» — нет. Что делать с логом, в котором записи будут отставать по времени на 20 минут? А если у меня 2 лога, в одном отстают, а в другом нет — как их сопоставлять?
    2) последовательность. Логи — это часто отладочная информация, как вызываются функции друг за другом. А в документации «при работе асинхронного движка не гарантируется правильный порядок записи логов».

    Автор, а для чего вы используете эту штуку?


    1. pomponchik Автор
      18.11.2021 14:01

      Фреймворк универсален, может работать как в синхронном, так и в асинхронном режиме. Для отладки можно использовать синхронный. Асинхронный предназначен в первую очередь для повышения пиковой производительности высоконагруженных сервисов. Если к нам в моменте пришла куча запросов, мы сначала обработаем их, а уже потом в бэкграунде запишем куда-то логи.

      Точность по времени там обычно не особо нужна, однако вы можете уменьшить разброс установкой лимита на максимальный размер очереди.

      Последовательность именно записи часто тоже не принципиальна, если по самим логам можно восстановить последовательность событий. А так можно, поскольку каждое событие сопровождается временной меткой, соответственно при чтении или индексации логов в поисковой машине можно просто сортировать по этой метке.

      В целом асинхронный движок для логов - это компромисс между пиковой производительностью сервиса и упомянутыми вами свойствами логов. Чем больше мы теряем в первом, тем больше приобретаем во втором, и наоборот. Здесь уже нужно выбирать по потребности, и фреймворк дает такую возможность.

      Я пока фреймворк практически не использую, только пишу. Его текущая версия начинается с нуля, то есть на мой взгляд, он пока не готов для промышленного применения (хотя его уже можно использовать в каких-нибудь хобби-проектах).


  1. x2v0
    18.11.2021 18:42
    -1

    мысли в слух ...

    а, не легче ли было взять готовое, старое, проверенное решение на C и завернуть его в cpython?


    1. pomponchik Автор
      19.11.2021 12:31

      Это какое?


  1. sshmakov
    18.11.2021 23:03

    Можно сделать так - при каждом изменении создается объект настроек, содержащий их полный комплект. Он целиком валидируется и кладется в очередь, потоконезависимую, конечно. Отдельный тред читает эту очередь и применяет настройки - по сути замещает текущий объект настроек новым из очереди.

    Так не проще?


    1. pomponchik Автор
      19.11.2021 12:34

      Не, статья немного о другом. Очередь, движок и прочее используются лишь как пример отработки коллбека при изменении одного пункта настроек. Как настройки за счет защищенных локами коллбеков становятся интерактивными, а не задаются жестко при старте программы. Объект настроек никуда не передается через очередь. Наоборот, это его все отовсюду дергают.


      1. sshmakov
        19.11.2021 13:04

        Статья о другом. Но я предложил вариант, в котором просто не надо работать с отдельными настройками в многопоточке.

        Впрочем, не настаиваю. Ваш код, ваша статья, ваше родео.


        1. pomponchik Автор
          19.11.2021 14:19

          В принципе подход с "не изменять, а копировать" в многопоточке валидный и часто решает много проблем, но для меня не очевидно, как именно он применим в данном случае.


  1. danilovmy
    19.11.2021 01:18
    +3

    Привет. Интересно было посмотреть, как работают новички с параллельным выполнением кода, спасибо.

    Немного поправлю:

    class ThreadPool:
      def wait_empty_queue(self):
        delay = self.settings['time_quant'] * self.settings['delay_on_exit_loop_iteration_in_quants']
        while not self.queue.empty():
          time.sleep(delay)

    где Delay, вероятно, стоит вычислять отдельной функцией.

    Кстати, большинство циклов в коде можно сделать генераторами.

    ну и метод run явно может быть проще:

    while not self.stopped:
        try:
            log = self.queue.get(timeout=self.settings['time_quant'])
            self.do_anything(log)
            self.queue.task_done()
        except Empty:
            pass
        except Exception as e:
            self.queue.task_done()

    смотри. сам же пишешь сообщения об остановке выглядит как установка в значение True флага self.stopped.Так и проверяй его в первом цикле.

    Отличие, в моем варианте в том, что если на сбое пришел стоп - мой вариант закончит раньше работу.

    В твоем варианте будет считывать дальше из queue и делать do_anything пока не выпадет except Empty. Подумай, действительно ли ты уверен в двух циклах. Выглядит, неостанавливаемо, если self.stopped но не выпадает Empty


    1. pomponchik Автор
      19.11.2021 12:31

      Привет. Мне нравятся эти улучшения. Первое точно затащу к себе, насчет второго есть нюанс.

      Я не стал включать это статью, т.к. счел мелкой подробностью, которыми ее не стоит загромождать, но в доке к Queue.empty() есть приписка:

      Return True if the queue is empty, False otherwise. If empty() returns True it doesn’t guarantee that a subsequent call to put() will not block. Similarly, if empty() returns False it doesn’t guarantee that a subsequent call to get() will not block.

      То есть метод empty() на самом деле не точный и дает примерный результат. Т.к. моя цель - не потерять ни одного лога, я не могу закончить работу и выйти из цикла, пока не буду точно убежден, что очередь пуста. Да, на это уйдет еще один time_quant, но это цена гарантии.


      1. danilovmy
        19.11.2021 14:41

        ну тогда так. Но шансов прервать процедуру принудительно не останется.

        work=True
        while work:
            try:
                log = self.queue.get(timeout=self.settings['time_quant'])
                self.do_anything(log)
                self.queue.task_done()
            except Empty:
                work = not self.stopped
            except Exception as e:
                self.queue.task_done()

        это аналог твоего кода.


        1. pomponchik Автор
          19.11.2021 15:00

          Да, это очень похоже на правду. Пораскуриваю еще, и если не найду проблем - тоже утащу себе.


  1. t0rr
    19.11.2021 11:49
    +1

    Хотелось бы в статье видеть некий сравнительный анализ:

    1. Чем не устраивает валидация в pydantic?

    2. Чем не устраивают неблокирущие примеры logging из документации python? (там есть варианты и с мультипроцессингом, и с тредами)


    1. pomponchik Автор
      19.11.2021 12:20

      1. Хороша, но не хочу тащить такую громадную зависимость только ради валидации нескольких полей. Валидация здесь - маленький кусок общей задачи. Отсутствие внешних зависимостей - одна из главных фич проекта.

      2. Между моим фреймворком и logging слишком много отличий, чтобы здесь их все перечислить. Конкретно в вопросе неблокирующего движка главных отличий 2: у меня можно изменить количество воркеров в рантайме в любой момент, не потеряв ни одного лога, или даже безболезненно сменить движок; у меня есть защита при завершении программы от потери логов, оставшихся в очереди.


      1. snakers4
        19.11.2021 14:06
        +1

        На эту же тему, есть вроде прекрасная либа loguru. Весьма популярная, минималистичная в использовании и по заверениям авторов Asynchronous, Thread-safe, Multiprocess-safe.


        Просто если задаться задачей "не тащить", а самому писать даже логирование, то тяжело будет далеко уехать.


        1. pomponchik Автор
          19.11.2021 14:18

          Я в курсе про нее. Она мне не нравится по ряду причин, начиная с того, как написан и декомпозирован код, как там все прибито гвоздями в архитектурном плане.

          Но главная причина написания собственного фреймворка - это желание создать собственную мощную систему декораторов логирования. Ничего подобного я до сих пор не встречал нигде, и в loguru этого тоже нет. Практически все существующие логгинг-либы так или иначе наследуют уродливый интерфейс стандартного logging, я же сделал систему, которая, на мой взгляд, сильно удобнее. Конструкция ядра фреймворка здесь вторична, но раз так уж вышло, что там нашлось что-то интересное - решил об этом написать.

          Когда-нибудь я напишу статью про свой фреймворк и чем он лучше остальных, но сегодня это не совсем по теме.


        1. t0rr
          19.11.2021 15:15

          loguru - прекрасная? От неё больше проблем, чем пользы... Начните хотя бы с тестирования производительности

          Сноска от авторов про "10x faster" давно зачёркнута с припиской "In an upcoming release, Loguru's critical functions will be implemented in C for maximum speed.", но этот upcoming release уже давно не может произойти )


      1. t0rr
        19.11.2021 15:26

        То, что отличий много - понятно. На мой взгляд остался не раскрыт профит от использования всего этого добра. Замена настроек логирования в рантайме - очень специфичная задача, больше похожая на костыль, чем на повседневную потребность. Блю-грин никто не отменял.

        Т.е. прочитав статью, я остался при мнении, что в случае необходимости валидации я возьму pydantic, а в случае необходимости неблокирующего logging возьму пример из документации. А хотелось бы зайти в статью, найти что-то полезное и унести с собой ))

        Прочитал тезис про "красиво" - спорить не буду. Но хочется посмотреть на тесты производительности - это была одна из причин отказа от loguru и возврат к обычному logging


        1. pomponchik Автор
          19.11.2021 19:15

          Статья главным образом не о логгере, а о том, как в принципе можно сделать настройки чего угодно изменяемыми в рантайме, на примере логгера. Но я попробую в ближайшее время написать статью-сравнение, уже целенаправленно о логгере.