Давеча понадобилось мне в моем проекте на Flask ускорить ответ сервера. Из-за того, что во view последовательно вызывается запрос к трём удаленным веб-сервисам, время загрузки страницы с данными не из кеша доходило до 10 сек. Да, возможно, Flask не тот фреймворк, который стоило использовать, но что имеем, то имеем.
Итак, приступим. Поскольку реальный код я публиковать не могу, рассмотрю на академических примерах.

Задача 1 Имеются три функции a, b ,c, которые необходимо вызвать в отдельных потоках, дождаться результата их выполнения и выдать ответ.
Для решения задачи 1 я воспользовался этим переводом, ибо был очарован простотой использования библиотеки.
import multiprocessing.dummy as multiprocessing
import time

def a():
    time.sleep(2)
    return 'a'
def b():
    time.sleep(2)
    return 'b'
def c():
    time.sleep(1)
    return 'c'

p = multiprocessing.Pool()

results = p.map(lambda f: f(),[a,b,c])
print(results)
p.close()
p.join()



Результат выполнения кода:
['a', 'b', 'c']

Замечательно, но есть существенный минус. Время выполнения кода не ограничено, он будет ждать результата выполнения всех процедур. Изменяем формулировку задачи.

Задача 2 Имеются три функции a, b ,c, которые необходимо вызвать в отдельных потоках, и спустя интервал времени проверить, завершились они или нет, выдать результат.

Для решения используем ту же библиотеку, но уже функцию map_async. Ее отличие в том, что она возвращает объект AsyncResult.

import multiprocessing.dummy as multiprocessing
import time

def a():
 time.sleep(2)
 return 'a'

def b():
 time.sleep(2)
 return 'b'

def c():
 time.sleep(1)
 return 'c'

p = multiprocessing.Pool()

result = p.map_async(lambda f: f(),[a,b,c])

TIMEOUT  =3
print(results.get(TIMEOUT))

p.close()
p.join()



Результат выполнения при TIMEOUT>=3 такой же, как и в предыдущем случае, но если хоть одна из процедур не успевает завершится, выдается исключение TimeoutError. Однако и этот результат меня устроил не вполне. Дело в том, что в моем случае мне существенно было, чтобы успевала отработать одна функция, остальные могли и отсутствовать при выдаче.

Задача 3 Имеются три функции a, b ,c, которые необходимо вызвать в отдельных потоках, дождаться результата функции a.

import multiprocessing.dummy as multiprocessing
import time


def a():
 time.sleep(2)
 print(1)
 return 'a'


def b():
 time.sleep(3)
 print(2)
 return 'b'

def c():
 time.sleep(1)
 print(3)
 return 'c'

p = multiprocessing.Pool()

results=[]
for r  in p.imap(lambda f: f(),[a,b,c]):
 results.append(r)
 break

print(results)
p.close()
p.join()




Результат выполнения:
3
1
['a']
2


Как видно, хотя отработали 2 функции из 3, результат мы получили только для приоритетной. Чтобы получить результат второй, следует использовать imap_unordered:

results=[]
for r  in p.imap_unordered(lambda f: f(),[a,b,c]):
 results.append(r)
if  r =='a':
	break

Результат:
3
1
['c', 'a']
2


Что, если нам в основном потоке нужен результат только одного потока, наиболее быстрого? Достаточно убрать вызов p.join() из предыдущего примера и выйти из цикла по первому результату.

Теперь еще такой момент. При попытке использовать модуль multiprocessing, который работает с процессами, вместо multiprocessing.dummy, работающего с тредами будет выдана ошибка сериализации cPickle.PicklingError, поскольку при межпроцессном взаимодействии не удается сериализовать функцию. Для того, чтобы код работал, нужно ввести функцию-псевдоним, код будет не настолько красив, но:
import multiprocessing
import time

def a():
    time.sleep(2)
    return 'a'
def b():
    time.sleep(2)
    return 'b'
def c():
    time.sleep(1)
    return 'c'

def func(param):
 if param == 'a':
    return a()
 elif param == 'b':
    return b()
 elif param == 'c':
    return c()

p = multiprocessing.Pool()

results = p.map(func,['a','b','c'])
print(results)
p.close()
p.join()

Комментарии (5)


  1. andrewnester
    16.06.2015 21:52
    +1

    Извиняюсь, не знаю детали Вашего проекта и архитектуры, но почему бы для неприоритетных задач не использовать RabbitMQ например?
    Более масштабируемое и «чистое» решение


    1. veloriba
      16.06.2015 23:03
      +2

      Вероятно они приоритетны. Это может быть получение цен или другой (дополнительной) информации из разных источников. Т.к. в зависимости от архитектуры/времени/канала источники могут подтупливать, но страницу надо отрисовать за конечное кол-во времени с хоть какой нибудь информацией.


      1. random1st Автор
        17.06.2015 08:56

        Вы абсолютно правы. К тому же неприоритетные задачи все равно выполнятся, просто после ответа сервера, таким образом, те данные, которые необходимы, попадут в кеш, и в следующий раз точно уложатся в гарантированный ответ сервера. Даже, когда данные отдаются из кеша, скорость отдачи их в 2 раза выше по сравнению с однопоточным вариантом.


    1. random1st Автор
      17.06.2015 08:58

      Я не думаю, что это решение уложится в 3 строчки кода.


  1. Lovesuper
    07.07.2015 09:06
    -1

    Думаю, в любом случае, лучше было бы использовать какой-нибудь celery. Получить запрос, выплюнуть все тяжелые задачи в celery, отрисовать страницу, показать на фронденде данные по мере их возвращения.