Постановка проблемы
Есть приложение, в котором выполняет несколько функций, например, сбора данных из различных источников, их обработки и помещения результатов в БД. Приложение, по задумке, должно работать 24/7, чтобы в любой момент можно было подключиться к БД и получить свежайшую информацию.
Но вот незадача... Вроде бы весь код отлажен, работа приложения стабильна, но в какие‑то моменты замечается, что «бах» и процесс пропал. Ни ошибки в логах, ни сигналов, ничего нет. И как ловить, не очень понятно, а работа стоит и надо как‑то запускаться. На отладку нет много времени.
Симуляция
Представим, что есть две простые функции, которые что‑то делают (не важно, чем они заняты). Если мы говорим о мультипроцессинге, то мы говорим, что каждая функция запускается в системе отдельно, как отдельная задача, отследить которую можно в Диспетчере задач. Значит у процесса есть PID.
Представим, что третий процесс — «следилка». Следилка занимается тем, что она отслеживает существует ли ещё процесс с таким PID в системе. Если работа функции завершается или процесс самопроизвольно погибает, то её задача увидеть это (т. е. не увидеть такой PID) и перезапустить процесс, заново поместив туда функцию.
Очевидно, что нужно имитировать падение процесса, т. е. должна быть функция, которая искусственно убивает процесс.
Также хочется, чтобы уж быть максимально приближенным к реальности, не передавать переменные между процессами, чтобы все работали автономно.
Реализация
Вот пример двух простых функций, которые живут и что‑то делают (можно не смотреть скрытый текст, т. н. функции будут представлены также в полных текстах решения):
Hidden text
def proc_1(timepass=1, repeat=20):
repeated = True
i = 0
while repeated:
x = random.uniform(0, timepass)
i += 1
if i < repeat:
print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))
else:
repeated = False
print("Proc_1 COMPLETED!")
break
time.sleep(x)
def proc_2(timepass=1, repeat=20):
repeated = True
i = 0
while repeated:
x = random.uniform(0, timepass)
i += 1
if i < repeat:
print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))
else:
repeated = False
print("Proc_2 COMPLETED!")
break
time.sleep(x)
Было реализовано два примера, которые отличаются тем, что функция, отвечающая за слежение и восстановление, в первом случае, живёт в основном процессе приложения, а во втором — запускается отдельным процессом. Второй случай усложняется тем, что перезапуск нового процесса производится из функции восстановления и если выполнить join в ней, то сама она также будет ждать. Решением это проблемы явилось запуск join в отдельном потоке.
Второй вариант, кстати, выглядит приятней и логичней.
Отдельным моментом является понимание того, в какой операционной системе вы запускаете работу приложения. Пришлось делать «оговорку» на Linux по понятным причинам.
Вот первый вариант реализации (полный текст решения):
# Работа с процессами - рабочая версия
import multiprocessing as mp
import time
import random
import psutil
import os
import platform
import fnmatch
import signal
import subprocess
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def proc_1(timepass=1, repeat=20):
repeated = True
i = 0
while repeated:
x = random.uniform(0, timepass)
i += 1
if i < repeat:
print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))
else:
repeated = False
print("Proc_1 COMPLETED!")
break
time.sleep(x)
def proc_2(timepass=1, repeat=20):
repeated = True
i = 0
while repeated:
x = random.uniform(0, timepass)
i += 1
if i < repeat:
print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))
else:
repeated = False
print("Proc_2 COMPLETED!")
break
time.sleep(x)
def process_killer(PID_list, timepass, exclusion_list=[]):
while True:
system = platform.system()
x = random.uniform(0, timepass)
time.sleep(x)
this_process_pid = os.getpid()
this_process = psutil.Process(this_process_pid)
this_process_info_as_dict = this_process.as_dict()
parent_process_pid = this_process.ppid()
parent_process = psutil.Process(parent_process_pid)
parent_process_as_dict = parent_process.as_dict()
parent_process_children = parent_process.children(recursive=True)
child_pid_list = []
for i in range(len(parent_process_children)):
child_info_as_dict = parent_process_children[i].as_dict()
if child_info_as_dict['pid'] != this_process_pid:
child_pid_list.append(child_info_as_dict['pid'])
child_pid_list = list(set(child_pid_list) - set(exclusion_list))
child_pid_list = list(set(child_pid_list) - set([this_process_pid]))
# for i in range(len(child_pid_list)):
# print(f'Process_{i+1} PID: {child_pid_list[i]}')
temp_str = '\n'.join([f'Process_{i + 1} PID: {child_pid_list[i]}' for i in range(len(child_pid_list))])
print(f"{temp_str}")
if len(child_pid_list) > 0:
if len(child_pid_list) > 1:
number = random.randint(0, len(child_pid_list) - 1)
else:
number = 0
kill_proc = psutil.Process(child_pid_list[number])
kill_process_info_as_dict = kill_proc.as_dict()
if psutil.pid_exists(kill_process_info_as_dict['pid']):
if fnmatch.fnmatch(kill_process_info_as_dict['name'], "python*"):
print("We kill the process with PID", kill_process_info_as_dict['pid'])
try:
process = psutil.Process(kill_process_info_as_dict['pid'])
except psutil.NoSuchProcess:
print(f"Process with PID {child_pid_list[number]} not found.")
continue
else:
if system == "Windows":
kill_proc.kill()
elif system == "Linux":
os.kill(kill_process_info_as_dict['pid'], signal.SIGTERM)
# os.kill(kill_process_info_as_dict['pid'],signal.SIGKILL)
# subprocess.call(["kill", str(kill_process_info_as_dict['pid'])])
print(f"{bcolors.FAIL}Process with PID {kill_process_info_as_dict['pid']} killed.{bcolors.ENDC}")
child_pid_list.remove(kill_process_info_as_dict['pid'])
if __name__ == "__main__":
print(f'{bcolors.OKGREEN}Начало работы программы!{bcolors.ENDC}')
process_name_list = ["process_1", "process_2"]
process_1 = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
process_2 = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})
process_1.start()
process_2.start()
PID_list = [process_1.pid, process_2.pid]
# process_recov = mp.Process(target=process_recovery, kwargs={'process_pid_list': PID_list})
# process_recov.start()
process_kill = mp.Process(target=process_killer,
kwargs={'PID_list': PID_list, 'timepass': 10, 'exclusion_list': []})
process_kill.start()
PID_list.append(process_kill.pid)
system = platform.system()
while True:
if system == "Linux":
os.wait()
for i in range(len(PID_list)):
try:
if psutil.pid_exists(PID_list[i]):
pass
# print(f'Process with PID {process_pid} is alive')
else:
print(f'Process with PID {PID_list[0]} is dead')
print(f"{bcolors.WARNING}Restoring the process{bcolors.ENDC}")
if PID_list[i] == PID_list[0]:
process = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
if PID_list[i] == PID_list[1]:
process = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})
process.start()
old_pid = PID_list[i]
PID_list[i] = process.pid
temp_str=""
for i in range(len(PID_list)):
if PID_list[i]==process.pid:
temp_str += f'{bcolors.OKGREEN}Process_{i + 1} PID: {PID_list[i]}{bcolors.ENDC} (old: {old_pid})\n'
else:
temp_str += f'Process_{i + 1} PID: {PID_list[i]}\n'
temp_str = temp_str.rstrip()
# temp_str = '\n'.join(
# [f'Process_{i + 1} PID: {PID_list[i]}' for i in range(len(PID_list))])
print('Recovery result:\n' + temp_str)
except:
pass
time.sleep(0.2)
print("Main PID:", os.getpid())
print("Process_1 PID:", process_1.pid)
print("Process_2 PID:", process_2.pid)
print("Process_killer PID:", process_kill.pid)
process_1.join()
process_2.join()
process_kill.join()
# process_recov.join()
process.join()
time.sleep(5)
print("Program completed")
А вот второй:
# Работа с процессами - рабочая, но сомнительная версия
import multiprocessing as mp
import threading
import time
import random
import psutil
import os
import platform
import fnmatch
import signal
import subprocess
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def proc_1(timepass=1, repeat=20):
repeated = True
i = 0
while repeated:
x = random.uniform(0, timepass)
i += 1
if i < repeat:
print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))
else:
repeated = False
print("Proc_1 COMPLETED!")
break
time.sleep(x)
def proc_2(timepass=1, repeat=20):
repeated = True
i = 0
while repeated:
x = random.uniform(0, timepass)
i += 1
if i < repeat:
print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))
else:
repeated = False
print("Proc_2 COMPLETED!")
break
time.sleep(x)
def process_killer(PID_list, timepass, exclusion_list=[]):
while True:
system = platform.system()
x = random.uniform(0, timepass)
time.sleep(x)
this_process_pid = os.getpid()
this_process = psutil.Process(this_process_pid)
this_process_info_as_dict = this_process.as_dict()
parent_process_pid = this_process.ppid()
parent_process = psutil.Process(parent_process_pid)
parent_process_as_dict = parent_process.as_dict()
parent_process_children = parent_process.children(recursive=True)
child_pid_list = []
for i in range(len(parent_process_children)):
child_info_as_dict = parent_process_children[i].as_dict()
if child_info_as_dict['pid'] != this_process_pid:
child_pid_list.append(child_info_as_dict['pid'])
child_pid_list = list(set(child_pid_list) - set(exclusion_list))
# for i in range(len(child_pid_list)):
# print(f'Process_{i+1} PID: {child_pid_list[i]}')
temp_str = '\n'.join([f'Process_{i + 1} PID: {child_pid_list[i]}' for i in range(len(child_pid_list))])
print(f"{temp_str}")
if len(child_pid_list) > 0:
if len(child_pid_list) > 1:
number = random.randint(0, len(child_pid_list) - 1)
else:
number = 0
kill_proc = psutil.Process(child_pid_list[number])
kill_process_info_as_dict = kill_proc.as_dict()
if psutil.pid_exists(kill_process_info_as_dict['pid']):
if fnmatch.fnmatch(kill_process_info_as_dict['name'], "python*"):
print("We kill the process with PID", kill_process_info_as_dict['pid'])
try:
process = psutil.Process(kill_process_info_as_dict['pid'])
except psutil.NoSuchProcess:
print(f"Process with PID {child_pid_list[number]} not found.")
continue
else:
if system == "Windows":
kill_proc.kill()
elif system == "Linux":
os.kill(kill_process_info_as_dict['pid'], signal.SIGTERM)
# os.kill(kill_process_info_as_dict['pid'],signal.SIGKILL)
# subprocess.call(["kill", str(kill_process_info_as_dict['pid'])])
print(f"{bcolors.FAIL}Process with PID {kill_process_info_as_dict['pid']} killed.{bcolors.ENDC}")
child_pid_list.remove(kill_process_info_as_dict['pid'])
def process_recovery(process_pid_list):
PID_list = process_pid_list
this_process_pid = os.getpid()
this_process = psutil.Process(this_process_pid)
this_process_info_as_dict = this_process.as_dict()
parent_process_pid = this_process.ppid()
parent_process = psutil.Process(parent_process_pid)
parent_process_as_dict = parent_process.as_dict()
parent_process_children = parent_process.children(recursive=True)
child_pid_list = []
for i in range(len(parent_process_children)):
child_info_as_dict = parent_process_children[i].as_dict()
if child_info_as_dict['pid'] != this_process_pid:
child_pid_list.append(child_info_as_dict['pid'])
system = platform.system()
while True:
if system == "Linux":
try:
os.wait()
except:
pass
for i in range(len(child_pid_list)):
try:
if psutil.pid_exists(child_pid_list[i]):
pass
# print(f'Process with PID {process_pid} is alive')
else:
print(f'Process with PID {child_pid_list[i]} is dead')
print(f"{bcolors.WARNING}Restoring the process{bcolors.ENDC}")
if child_pid_list[i] == PID_list[0]:
process = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
elif child_pid_list[i] == PID_list[1]:
process = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})
else:
process = mp.Process(target=process_killer,
kwargs={'PID_list': PID_list, 'timepass': 10,
'exclusion_list': [process_recov.pid, ]})
process.start()
old_pid = child_pid_list[i]
child_pid_list[i] = process.pid
if old_pid == PID_list[0]:
PID_list[0] = process.pid
if old_pid == PID_list[1]:
PID_list[1] = process.pid
temp_str = ""
for i in range(len(child_pid_list)):
if child_pid_list[i] == process.pid:
temp_str += f'{bcolors.OKGREEN}Process_{i + 1} PID: {child_pid_list[i]}{bcolors.ENDC} (old: {old_pid})\n'
else:
temp_str += f'Process_{i + 1} PID: {child_pid_list[i]}\n'
temp_str = temp_str.rstrip()
# temp_str = '\n'.join(
# [f'Process_{i + 1} PID: {PID_list[i]}' for i in range(len(PID_list))])
print('Recovery result:\n' + temp_str)
threading.Thread(target=process.join).start()
except:
pass
time.sleep(0.2)
if __name__ == "__main__":
process_name_list = ["process_1", "process_2"]
process_1 = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
process_2 = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})
process_1.start()
process_2.start()
PID_list = [process_1.pid, process_2.pid]
process_recov = mp.Process(target=process_recovery, kwargs={'process_pid_list': PID_list})
process_recov.start()
process_kill = mp.Process(target=process_killer,
kwargs={'PID_list': PID_list, 'timepass': 10, 'exclusion_list': [process_recov.pid, ]})
#process_kill.start()
print("Main PID:", os.getpid())
print("Process_1 PID:", process_1.pid)
print("Process_2 PID:", process_2.pid)
print("Process_killer PID:", process_kill.pid)
process_1.join()
process_2.join()
#process_kill.join()
process_recov.join()
time.sleep(5)
print("Program completed")
Надеюсь, что, если вдруг, вы столкнётесь с подобной проблемой и не захотите тратить время на поиска ошибки — фантома, то вам поможет такое решение проблемы.
Комментарии (18)
dyadyaSerezha
14.06.2023 16:30+2except вместе с pass всегда требует комментария, почему исключения игнорируются.
rick1177 Автор
14.06.2023 16:30-1Вы имели виду по коду... ну в данном случае там возникало сравнение с None. Можно обработчик поставить, а можно просто обработать ошибку и прекратить выполнение. Результат одинаков.
rSedoy
14.06.2023 16:30+1этот блок try-except охватывает большое количество кода, там не только "возникало сравнение с None" (которое надо делать явно) но и куча других исключений может быть, но про них никогда не узнаем, потому они все замалчиваются.
poxvuibr
14.06.2023 16:30+3куча других исключений может быть, но про них никогда не узнаем, потому они все замалчиваются
Если прочитать статью то создаётся впечателение, что именно этот подход её и родил )). Вот, приведу цитату
Но вот незадача... Вроде бы весь код отлажен, работа приложения стабильна, но в какие‑то моменты замечается, что «бах» и процесс пропал. Ни ошибки в логах, ни сигналов, ничего нет. И как ловить, не очень понятно
Почему же в логах нет ошибок, как же так могло получиться ))
CrazyElf
14.06.2023 16:30Да и есть ли сами логи? Даже при отсутствии информации об ошибках просто грамотное логирование часто может помочь найти место, где код падает. Когда знаешь, что по коду за такой-то операцией должна идти такая-то, а лог именно в этом месте прерывается.
rick1177 Автор
14.06.2023 16:30В оригинальном приложении, из которого родилась эта частная задача, производилось соединение и запрос к источнику данных, источник данных возвращал положительный код ответа, но какую-то иногда выплёвывал чушь, хотя не должен был. Отправитель считал, что он отправил всё корректно. При этом эта чушь даже по структуре была разная, какими-то странными символами и прочее. Мы обратились к производителю и не смогли добиться внятного ответа, ушёл в молчание (видимо сам анализирует проблему). Наше приложение не могло идентифицировать эту разнообразную чушь и падало. Мы сначала писали кучу обработчиков, но тщетно... вот пришли к такому решению. Видимо не понравилось оно сообществу. Один хейт в комментариях.
CrazyElf
14.06.2023 16:30Вообще, если приложению приходит чушь, оно должно кидать и ловить эксепшены. На каких-то уровнях приложения эксепшены должны ловиться и обрабатываться - запись в логи и воостановление процессов должны быть в обработчике. А вы, видимо, на произвол судьбы всё бросаете, вот и результат.
rick1177 Автор
14.06.2023 16:30Как-будто вообще не вниклив суть. Стороннее приложение отправляет чушь.
CrazyElf
14.06.2023 16:30И что? Почему на вашей то стороне вы не можете исключение поймать, обработать и сделать повторный запрос?
rick1177 Автор
14.06.2023 16:30Ну потому что приходит код, как будто все норм, а по факту, нет. Когда разбирать начинаем, то летит ... нужно кучу обработчик писать. Зачем?
dyadyaSerezha
14.06.2023 16:30Не в результате дело (и не очевидно, что он один и тот же), а в явном объяснении своей логики там, где она не очевидна.
fedorro
/fixed
А если серьезно - даже без докера, и в Windows, и в Linux есть встроенные средства для перезапуска упавшего процесса.
rick1177 Автор
Мне кажется в суть вопроса вы не вникли... не нужны средства, если есть программное решение.
fedorro
А Вы не вникли в суть моего комментария: не нужны программные решения, если уже есть готовые средства ????
rick1177 Автор
А, ну ту может быть, но я тогда не умею сего. Буду благодарен за встречную статью ))