Исходными данными являлись 10 тыс. json-файлов, в каждом из которых было около 3 тыс. конечных узлов. При последовательной загрузке всех файлов и преобразовании в pandas при помощи json_normalize время выполнения составило 10 минут, что довольно долго. В связи с тем, что данную операцию необходимо было выполнить несколько раз на различных наборах данных, было принято решение ускорить процесс преобразования за счет реализации механизма распараллеливания и реализовать его как полноценный настраиваемый скрипт.
Для создания параллельных вычислений нам важны два условия: разделимость и независимость данных. При работе с json файлами, каждый из них представляет одну строку результирующей таблицы, что позволяет спокойно разделить их на партиции. Также для преобразования одного файла нам не нужен какой-либо другой что удовлетворяет условию независимости данных.
Условия соблюдены, можно приступать к коду.
Импортируем библиотеки:
from tqdm import tqdm
from pathlib import Path
import json
from multiprocessing import Pool, RLock
import pandas as pd
import pickle
import argparse
Весь алгоритм программы разделен на следующие блоки:
Парсинг аргументов.
Получение всех путей до файлов и разделение их на n групп (Для n подпроцессов).
Запуск процессов, выполняющих выгрузку и преобразование в pandas.
Объединение результатов выполнения подпроцессов в единый фрейм и сохранение.
Парсинг аргументов
Для того, чтобы скрипт можно было удобно переиспользовать с другими настройками, определим входные параметры скрипта с помощью библиотеки argparse. Отделим инициализацию нашего парсера аргументов от основной логики скрипта, описав ее в функции get_arg_pareser:
def get_arg_pareser():
parser = argparse.ArgumentParser(description=
'From json files creates pd.DataFrame')
parser.add_argument('-i', '--input-folder', type=str,
help='input data folder', required=True)
parser.add_argument('-o', '--output-file', type=str, default=r'output.pickle',
help='output.pickle')
parser.add_argument('-e', '--n-executors', type=int, default=8,
help='number of subprocesses (default: 8)')
return parser
Вызовем его в блоке __main__:
args = get_arg_pareser().parse_args()
N_GROUPS = args.n_executors
jsons_folder_path = args.input_folder
output_file = args.output_file
Пути до файлов
При помощи библиотеки Pathlib получим все пути до наших входных файлов:
f_paths = list(Path(jsons_folder_path).glob('*.json'))
Теперь разделим данные на n частей, пронумеровав каждую партицию (Для красивого отслеживания выполнения процесса).
in_group = len(f_paths) // N_GROUPS + 1
inp_args = [f_paths[i:i + in_group] for i in range(0, len(f_paths), in_group)]
inp_args = list(enumerate(inp_args))
Распараллеливание
В любой задаче, в которой необходимо создать распараллеливание, нам необходимо определить логику выполнения подпроцесса в отдельном блоке (функции). Создадим функцию one_process_execution, определяющей обработку одной партиции данных.
Функция в рамках одного процесса выгружает все json файлы партиции, сохраняя два массива: названия файлов (indexes) и сами данные (res_array). После выгрузки всех данных при помощи функции pd.json_normalize преобразуем список словарей в таблицу и выставим имена файлов как индекс.
Важным замечанием является то, что функцию json_normalize следует выполнять именно на массиве словарей внутри подпроцессов, а не на каждом отдельном файле. Если же поставить преобразование в pandas каждого файла отдельно и итерировано добавлять в pd.DataFrame по строчке, то это замедлит выполнение в 3 раза. Главное правило преобразования чего-либо в pandas, делать саму трансформацию как можно позднее.
def one_process_execution(pid, f_paths):
res_arr = []
indexes = []
tqdm_text = '#' + f'{pid}'.zfill(3)
with tqdm(total=len(f_paths), position=pid+1, desc=tqdm_text) as pbar:
for path in f_paths:
with open(str(path), 'r') as f:
d = json.load(f)
indexes.append(path.stem)
res_arr.append(d)
pbar.update(1)
df = pd.json_normalize(res_arr).assign(index=indexes).set_index('index')
print(f'Subproc {pid} done')
return df
Теперь, когда у нас есть разделенные данные и есть описание функции подпроцесса – время приступать к созданию пула (контейнера) процессов. Для реализации параллельных вычислений в Python используется библиотека multiprocessing. При помощи класса Pool инициализируем наш контейнер задач, указав необходимое число подпроцессов и дополнительные параметры для работы отображения статуса выполнения. Далее заполним этот пул описанной выше функции one_process_execution с входными данными, сформированными на шаге 1. Добавление задач происходит с помощью ключевого слова apply_async, определяющего поведение выполнения наших процессов.
pool = Pool(processes=N_GROUPS, initializer=tqdm.set_lock, initargs=(RLock(),))
jobs = [pool.apply_async(one_process_execution, args=x) for x in inp_args]
Следует отметить, что метод apply_async не запускает сам процесс, а лишь добавляет его в список задач. Таким образом, в переменной jobs у нас хранятся определения подпроцессов, готовых к началу выполнения.
Для запуска вычислений нам надо у каждого элемента массива вызвать метод get, который после выполнения вернет результат нашей функции, по мере выполнения. Вызовем этот метод для каждого элемента массива задач.
# run pool
df_lists = list(map(lambda x: x.get(), jobs))
Объединение и сохранение
Теперь дело за малым: осталось объединить все датафреймы в один единый и сохранить:
res_df = pd.concat(df_lists)
# save
with open(output_file, 'wb') as f:
pickle.dump(res_df, f)
Таким образом, был создан скрипт, позволяющий преобразовывать большое количество json файлов в единый датафрейм, используя технологии распараллеливания вычислений. Данная реализация преобразовала те же 10 тысяч файлов в единый датафрейм за 2 минуты, тем самым ускорив процесс в 5 раз.
Полноценный скрипт можно посмотреть на моей личной странице github. Он был реализован таким образом, что может быть запущен в двух режимах:
Как скрипт, сохраняющий результат как pickle файл.
Как подключаемый модуль, функция main которого возвращает сгенерированный pd.DataFrame.