Постановка проблемы

Есть приложение, в котором выполняет несколько функций, например, сбора данных из различных источников, их обработки и помещения результатов в БД. Приложение, по задумке, должно работать 24/7, чтобы в любой момент можно было подключиться к БД и получить свежайшую информацию.

Но вот незадача... Вроде бы весь код отлажен, работа приложения стабильна, но в какие‑то моменты замечается, что «бах» и процесс пропал. Ни ошибки в логах, ни сигналов, ничего нет. И как ловить, не очень понятно, а работа стоит и надо как‑то запускаться. На отладку нет много времени.

Симуляция

Представим, что есть две простые функции, которые что‑то делают (не важно, чем они заняты). Если мы говорим о мультипроцессинге, то мы говорим, что каждая функция запускается в системе отдельно, как отдельная задача, отследить которую можно в Диспетчере задач. Значит у процесса есть PID.

Представим, что третий процесс — «следилка». Следилка занимается тем, что она отслеживает существует ли ещё процесс с таким PID в системе. Если работа функции завершается или процесс самопроизвольно погибает, то её задача увидеть это (т. е. не увидеть такой PID) и перезапустить процесс, заново поместив туда функцию.

Очевидно, что нужно имитировать падение процесса, т. е. должна быть функция, которая искусственно убивает процесс.

Также хочется, чтобы уж быть максимально приближенным к реальности, не передавать переменные между процессами, чтобы все работали автономно.

Реализация

Вот пример двух простых функций, которые живут и что‑то делают (можно не смотреть скрытый текст, т. н. функции будут представлены также в полных текстах решения):

Hidden text
def proc_1(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_1 COMPLETED!")
            break
        time.sleep(x)


def proc_2(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_2 COMPLETED!")
            break
        time.sleep(x)

Было реализовано два примера, которые отличаются тем, что функция, отвечающая за слежение и восстановление, в первом случае, живёт в основном процессе приложения, а во втором — запускается отдельным процессом. Второй случай усложняется тем, что перезапуск нового процесса производится из функции восстановления и если выполнить join в ней, то сама она также будет ждать. Решением это проблемы явилось запуск join в отдельном потоке.

Второй вариант, кстати, выглядит приятней и логичней.

Отдельным моментом является понимание того, в какой операционной системе вы запускаете работу приложения. Пришлось делать «оговорку» на Linux по понятным причинам.

Вот первый вариант реализации (полный текст решения):

# Работа с процессами - рабочая версия

import multiprocessing as mp
import time
import random
import psutil
import os
import platform
import fnmatch
import signal
import subprocess


class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'
    
def proc_1(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_1 COMPLETED!")
            break
        time.sleep(x)


def proc_2(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_2 COMPLETED!")
            break
        time.sleep(x)


def process_killer(PID_list, timepass, exclusion_list=[]):
    while True:
        system = platform.system()
        x = random.uniform(0, timepass)
        time.sleep(x)
        this_process_pid = os.getpid()
        this_process = psutil.Process(this_process_pid)
        this_process_info_as_dict = this_process.as_dict()
        parent_process_pid = this_process.ppid()
        parent_process = psutil.Process(parent_process_pid)
        parent_process_as_dict = parent_process.as_dict()
        parent_process_children = parent_process.children(recursive=True)
        child_pid_list = []
        for i in range(len(parent_process_children)):
            child_info_as_dict = parent_process_children[i].as_dict()
            if child_info_as_dict['pid'] != this_process_pid:
                child_pid_list.append(child_info_as_dict['pid'])
        child_pid_list = list(set(child_pid_list) - set(exclusion_list))
        child_pid_list = list(set(child_pid_list) - set([this_process_pid]))
        # for i in range(len(child_pid_list)):
        #     print(f'Process_{i+1} PID: {child_pid_list[i]}')
        temp_str = '\n'.join([f'Process_{i + 1} PID: {child_pid_list[i]}' for i in range(len(child_pid_list))])
        print(f"{temp_str}")

        if len(child_pid_list) > 0:
            if len(child_pid_list) > 1:
                number = random.randint(0, len(child_pid_list) - 1)
            else:
                number = 0
            kill_proc = psutil.Process(child_pid_list[number])
            kill_process_info_as_dict = kill_proc.as_dict()
            if psutil.pid_exists(kill_process_info_as_dict['pid']):
                if fnmatch.fnmatch(kill_process_info_as_dict['name'], "python*"):
                    print("We kill the process with PID", kill_process_info_as_dict['pid'])
                    try:
                        process = psutil.Process(kill_process_info_as_dict['pid'])
                    except psutil.NoSuchProcess:
                        print(f"Process with PID {child_pid_list[number]} not found.")
                        continue
                    else:
                        if system == "Windows":
                            kill_proc.kill()
                        elif system == "Linux":
                            os.kill(kill_process_info_as_dict['pid'], signal.SIGTERM)
                            # os.kill(kill_process_info_as_dict['pid'],signal.SIGKILL)
                            # subprocess.call(["kill", str(kill_process_info_as_dict['pid'])])

                        print(f"{bcolors.FAIL}Process with PID {kill_process_info_as_dict['pid']} killed.{bcolors.ENDC}")
                        child_pid_list.remove(kill_process_info_as_dict['pid'])




if __name__ == "__main__":
    print(f'{bcolors.OKGREEN}Начало работы программы!{bcolors.ENDC}')

    process_name_list = ["process_1", "process_2"]

    process_1 = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
    process_2 = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})

    process_1.start()
    process_2.start()

    PID_list = [process_1.pid, process_2.pid]

    # process_recov = mp.Process(target=process_recovery, kwargs={'process_pid_list': PID_list})
    # process_recov.start()

    process_kill = mp.Process(target=process_killer,
                              kwargs={'PID_list': PID_list, 'timepass': 10, 'exclusion_list': []})
    process_kill.start()

    PID_list.append(process_kill.pid)

    system = platform.system()

    while True:
        if system == "Linux":
            os.wait()
        for i in range(len(PID_list)):
            try:
                if psutil.pid_exists(PID_list[i]):
                    pass
                    # print(f'Process with PID {process_pid} is alive')
                else:
                    print(f'Process with PID {PID_list[0]} is dead')
                    print(f"{bcolors.WARNING}Restoring the process{bcolors.ENDC}")
                    if PID_list[i] == PID_list[0]:
                        process = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
                    if PID_list[i] == PID_list[1]:
                        process = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})
                    process.start()
                    old_pid = PID_list[i]
                    PID_list[i] = process.pid

                    temp_str=""
                    for i in range(len(PID_list)):
                        if  PID_list[i]==process.pid:
                            temp_str += f'{bcolors.OKGREEN}Process_{i + 1} PID: {PID_list[i]}{bcolors.ENDC} (old: {old_pid})\n'
                        else:
                            temp_str += f'Process_{i + 1} PID: {PID_list[i]}\n'
                    temp_str = temp_str.rstrip()
                    # temp_str = '\n'.join(
                    #     [f'Process_{i + 1} PID: {PID_list[i]}' for i in range(len(PID_list))])
                    print('Recovery result:\n' + temp_str)
            except:
                pass
        time.sleep(0.2)





    print("Main PID:", os.getpid())
    print("Process_1 PID:", process_1.pid)
    print("Process_2 PID:", process_2.pid)
    print("Process_killer PID:", process_kill.pid)

    process_1.join()
    process_2.join()
    process_kill.join()
    # process_recov.join()
    process.join()

    time.sleep(5)

    print("Program completed")

А вот второй:

# Работа с процессами - рабочая, но сомнительная версия
import multiprocessing as mp
import threading
import time
import random
import psutil
import os
import platform
import fnmatch
import signal
import subprocess

class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

def proc_1(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_1 COMPLETED!")
            break
        time.sleep(x)


def proc_2(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_2 COMPLETED!")
            break
        time.sleep(x)


def process_killer(PID_list, timepass, exclusion_list=[]):
    while True:
        system = platform.system()
        x = random.uniform(0, timepass)
        time.sleep(x)
        this_process_pid = os.getpid()
        this_process = psutil.Process(this_process_pid)
        this_process_info_as_dict = this_process.as_dict()
        parent_process_pid = this_process.ppid()
        parent_process = psutil.Process(parent_process_pid)
        parent_process_as_dict = parent_process.as_dict()
        parent_process_children = parent_process.children(recursive=True)
        child_pid_list = []
        for i in range(len(parent_process_children)):
            child_info_as_dict = parent_process_children[i].as_dict()
            if child_info_as_dict['pid'] != this_process_pid:
                child_pid_list.append(child_info_as_dict['pid'])
        child_pid_list = list(set(child_pid_list) - set(exclusion_list))
        # for i in range(len(child_pid_list)):
        #     print(f'Process_{i+1} PID: {child_pid_list[i]}')
        temp_str = '\n'.join([f'Process_{i + 1} PID: {child_pid_list[i]}' for i in range(len(child_pid_list))])
        print(f"{temp_str}")

        if len(child_pid_list) > 0:
            if len(child_pid_list) > 1:
                number = random.randint(0, len(child_pid_list) - 1)
            else:
                number = 0
            kill_proc = psutil.Process(child_pid_list[number])
            kill_process_info_as_dict = kill_proc.as_dict()
            if psutil.pid_exists(kill_process_info_as_dict['pid']):
                if fnmatch.fnmatch(kill_process_info_as_dict['name'], "python*"):
                    print("We kill the process with PID", kill_process_info_as_dict['pid'])
                    try:
                        process = psutil.Process(kill_process_info_as_dict['pid'])
                    except psutil.NoSuchProcess:
                        print(f"Process with PID {child_pid_list[number]} not found.")
                        continue
                    else:
                        if system == "Windows":
                            kill_proc.kill()
                        elif system == "Linux":
                            os.kill(kill_process_info_as_dict['pid'], signal.SIGTERM)
                            # os.kill(kill_process_info_as_dict['pid'],signal.SIGKILL)
                            # subprocess.call(["kill", str(kill_process_info_as_dict['pid'])])

                        print(f"{bcolors.FAIL}Process with PID {kill_process_info_as_dict['pid']} killed.{bcolors.ENDC}")
                        child_pid_list.remove(kill_process_info_as_dict['pid'])


def process_recovery(process_pid_list):
    PID_list = process_pid_list
    this_process_pid = os.getpid()
    this_process = psutil.Process(this_process_pid)
    this_process_info_as_dict = this_process.as_dict()
    parent_process_pid = this_process.ppid()
    parent_process = psutil.Process(parent_process_pid)
    parent_process_as_dict = parent_process.as_dict()
    parent_process_children = parent_process.children(recursive=True)
    child_pid_list = []
    for i in range(len(parent_process_children)):
        child_info_as_dict = parent_process_children[i].as_dict()
        if child_info_as_dict['pid'] != this_process_pid:
            child_pid_list.append(child_info_as_dict['pid'])

    system = platform.system()

    while True:
        if system == "Linux":
            try:
                os.wait()
            except:
                pass
        for i in range(len(child_pid_list)):
            try:
                if psutil.pid_exists(child_pid_list[i]):
                    pass
                    # print(f'Process with PID {process_pid} is alive')
                else:
                    print(f'Process with PID {child_pid_list[i]} is dead')
                    print(f"{bcolors.WARNING}Restoring the process{bcolors.ENDC}")
                    if child_pid_list[i] == PID_list[0]:
                        process = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
                    elif child_pid_list[i] == PID_list[1]:
                        process = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})
                    else:
                        process = mp.Process(target=process_killer,
                                                  kwargs={'PID_list': PID_list, 'timepass': 10,
                                                          'exclusion_list': [process_recov.pid, ]})
                    process.start()
                    old_pid = child_pid_list[i]
                    child_pid_list[i] = process.pid
                    if old_pid == PID_list[0]:
                        PID_list[0] = process.pid
                    if old_pid == PID_list[1]:
                        PID_list[1] = process.pid

                    temp_str = ""
                    for i in range(len(child_pid_list)):
                        if child_pid_list[i] == process.pid:
                            temp_str += f'{bcolors.OKGREEN}Process_{i + 1} PID: {child_pid_list[i]}{bcolors.ENDC} (old: {old_pid})\n'
                        else:
                            temp_str += f'Process_{i + 1} PID: {child_pid_list[i]}\n'
                    temp_str = temp_str.rstrip()
                    # temp_str = '\n'.join(
                    #     [f'Process_{i + 1} PID: {PID_list[i]}' for i in range(len(PID_list))])
                    print('Recovery result:\n' + temp_str)
                    threading.Thread(target=process.join).start()
            except:
                pass
        time.sleep(0.2)


if __name__ == "__main__":
    process_name_list = ["process_1", "process_2"]

    process_1 = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
    process_2 = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})

    process_1.start()
    process_2.start()

    PID_list = [process_1.pid, process_2.pid]

    process_recov = mp.Process(target=process_recovery, kwargs={'process_pid_list': PID_list})
    process_recov.start()

    process_kill = mp.Process(target=process_killer,
                              kwargs={'PID_list': PID_list, 'timepass': 10, 'exclusion_list': [process_recov.pid, ]})
    #process_kill.start()



    print("Main PID:", os.getpid())
    print("Process_1 PID:", process_1.pid)
    print("Process_2 PID:", process_2.pid)
    print("Process_killer PID:", process_kill.pid)

    process_1.join()
    process_2.join()
    #process_kill.join()
    process_recov.join()

    time.sleep(5)

    print("Program completed")

Надеюсь, что, если вдруг, вы столкнётесь с подобной проблемой и не захотите тратить время на поиска ошибки — фантома, то вам поможет такое решение проблемы.

Комментарии (18)


  1. fedorro
    14.06.2023 16:30
    +4

    services:
    ...
        restart: always

    /fixed

    А если серьезно - даже без докера, и в Windows, и в Linux есть встроенные средства для перезапуска упавшего процесса.


    1. rick1177 Автор
      14.06.2023 16:30
      -3

      Мне кажется в суть вопроса вы не вникли... не нужны средства, если есть программное решение.


      1. fedorro
        14.06.2023 16:30
        +8

        А Вы не вникли в суть моего комментария: не нужны программные решения, если уже есть готовые средства ????


        1. rick1177 Автор
          14.06.2023 16:30
          -3

          А, ну ту может быть, но я тогда не умею сего. Буду благодарен за встречную статью ))


  1. dyadyaSerezha
    14.06.2023 16:30
    +2

    except вместе с pass всегда требует комментария, почему исключения игнорируются.


    1. rick1177 Автор
      14.06.2023 16:30
      -1

      Вы имели виду по коду... ну в данном случае там возникало сравнение с None. Можно обработчик поставить, а можно просто обработать ошибку и прекратить выполнение. Результат одинаков.


      1. rSedoy
        14.06.2023 16:30
        +1

        этот блок try-except охватывает большое количество кода, там не только "возникало сравнение с None" (которое надо делать явно) но и куча других исключений может быть, но про них никогда не узнаем, потому они все замалчиваются.


        1. poxvuibr
          14.06.2023 16:30
          +3

          куча других исключений может быть, но про них никогда не узнаем, потому они все замалчиваются

          Если прочитать статью то создаётся впечателение, что именно этот подход её и родил )). Вот, приведу цитату

          Но вот незадача... Вроде бы весь код отлажен, работа приложения стабильна, но в какие‑то моменты замечается, что «бах» и процесс пропал. Ни ошибки в логах, ни сигналов, ничего нет. И как ловить, не очень понятно

          Почему же в логах нет ошибок, как же так могло получиться ))


          1. CrazyElf
            14.06.2023 16:30

            Да и есть ли сами логи? Даже при отсутствии информации об ошибках просто грамотное логирование часто может помочь найти место, где код падает. Когда знаешь, что по коду за такой-то операцией должна идти такая-то, а лог именно в этом месте прерывается.


          1. rick1177 Автор
            14.06.2023 16:30

            В оригинальном приложении, из которого родилась эта частная задача, производилось соединение и запрос к источнику данных, источник данных возвращал положительный код ответа, но какую-то иногда выплёвывал чушь, хотя не должен был. Отправитель считал, что он отправил всё корректно. При этом эта чушь даже по структуре была разная, какими-то странными символами и прочее. Мы обратились к производителю и не смогли добиться внятного ответа, ушёл в молчание (видимо сам анализирует проблему). Наше приложение не могло идентифицировать эту разнообразную чушь и падало. Мы сначала писали кучу обработчиков, но тщетно... вот пришли к такому решению. Видимо не понравилось оно сообществу. Один хейт в комментариях.


            1. CrazyElf
              14.06.2023 16:30

              Вообще, если приложению приходит чушь, оно должно кидать и ловить эксепшены. На каких-то уровнях приложения эксепшены должны ловиться и обрабатываться - запись в логи и воостановление процессов должны быть в обработчике. А вы, видимо, на произвол судьбы всё бросаете, вот и результат.


              1. rick1177 Автор
                14.06.2023 16:30

                Как-будто вообще не вниклив суть. Стороннее приложение отправляет чушь.


                1. CrazyElf
                  14.06.2023 16:30

                  И что? Почему на вашей то стороне вы не можете исключение поймать, обработать и сделать повторный запрос?


                  1. rick1177 Автор
                    14.06.2023 16:30

                    Ну потому что приходит код, как будто все норм, а по факту, нет. Когда разбирать начинаем, то летит ... нужно кучу обработчик писать. Зачем?


      1. dyadyaSerezha
        14.06.2023 16:30

        Не в результате дело (и не очевидно, что он один и тот же), а в явном объяснении своей логики там, где она не очевидна.


  1. astmix
    14.06.2023 16:30

    systemd?


    1. rick1177 Автор
      14.06.2023 16:30

      А что имело ввиду?


      1. astmix
        14.06.2023 16:30

        С данной задачей вполне справляется systemd. (Restart=always, если правильно помню)