Привет, Хабр!

Меня зовут Жеронкин Антон, я Data Engineer и участник профессионального сообщества NTA.

Иногда специфику работы дата‑инженера можно описать следующей картинкой:

И сейчас я расскажу почему: в одном проекте необходимо было использовать датасет, представляющий из себя около 5 млн. статей и связанных с ними сущностей (авторы, издательства, и т. д.).

Отмечу, что перед командой стояла задача адаптировать и загрузить этот датасет в PostgreSQL. При адаптации датасета необходимо было сохранить все связи между сущностями «Статья»‑»Автор», «Статья»‑»Издательство», «Статья»‑»Ключевое слово». Можно поспорить, что такой датасет можно сразу загрузить в MongoDB, ведь она является документоориентированной СУБД и гораздо лучше заточена для работы со слабоструктурированными сущностями, но в нашей команде не было достаточных навыков работы с ней.

Датасеты поставляются в формате одного JSON‑файла. Ниже привожу выдержку из описания датасета, составленного его авторами — с полной версией можно ознакомиться на странице датасета.

Какой вывод можно сделать из этого описания? У датасета есть структура, и она, на первый взгляд, кажется простой — создаётся впечатление, что датасет можно легко трансформировать в любой нужный нам вид. При его преобразовании в валидный для реляционных СУБД вид, достаточно будет извлечь отдельные столбцы, образовать связи между таблицами путем извлечения пар их ID. Так думал и я, решив, что преобразование этого датасета не займет много времени.

Первое, с чем мне пришлось столкнуться — впечатляющий размер датасета (17 ГБ) и встречающиеся иногда невалидные (нарушен синтаксис, баланс скобок) JSON‑записи.

Второе — записи в датасете слабоструктурированы. Слабоструктурированные сущности в данном контексте — это сущности, под хранение которых нет чётко и однозначно описанной структуры данных, как в тех же реляционных СУБД. В соседних записях половина атрибутов может быть общей для всех, а другая половина — быть уникальной для данной конкретной записи.

Таким образом, требовалось понять, что датасет из себя представляет и провести разведочный анализ. Было решено подгружать датасет в pandas с разбивкой по группам строк (чанкам), т.к.:

  • Сырой JSON‑файл размером в 17 ГБ целиком не загрузить как в текстовый редактор, так и в словарь через библиотеку json. В pandas через доп.параметр chunksize этот момент можно контролировать

  • Неясно, сколько на самом деле атрибутов, как они форматированы в JSON, насколько сложно их будет читать в сыром виде. Pandas хотя бы на верхнем уровне может представить записи в виде таблицы с плоской структурой.

    Как и ожидалось, связанные со статьей сущности (авторы, издания), оформлены не в виде плоской структуры (одна строка — одна статья — один автор), а в виде вложенных списков внутри столбцов таблицы:

Для того, чтобы такие отношения между статьями и авторами, изданиями корректно отразить в реляционной СУБД, нам необходимо выровнять сущности, нормализовать и правильно смоделировать связь «Многие‑ко‑многим».

Если эти термины незнакомы, покажу на пальцах, как это работает. Допустим, мы имеем следующую таблицу:

И мы придерживаемся следующего порядка действий:

  1. Преобразуем таблицу в первую нормальную форму (1НФ):

В новой структуре таблицы одной связи будет соответствовать ровно одна строка.

  1. Дополнительно присвоим каждому из значений столбцов уникальный ID:

  1. Разбиваем таблицу на две, а для их связи создаём дополнительную таблицу, в которой будут храниться пары ID подписчиков и издателей:

Естественно, пример, который я привёл выше, является сильно упрощенным «объяснением на пальцах», однако он позволяет продемонстрировать, что будет происходить дальше. Теперь я реализую это на Python:

import itertools
import pandas as pd

# читаем json порциями по 10000 записей
chunks = pd.read_json(raw_data_line, chunksize=10000, lines = True)
lst_authors = []
lst_paper_author = []
lst_venues = []
lst_references = []

i=0

# цикл обработки
for chunk in chunks:
    
    chunk.fillna('', inplace = True)
    #АВТОРЫ
    

    paper_author_tuples = [([x[0]], x[1]) 
                             for x in list(zip(chunk['_id'], chunk['authors'].fillna(''))) 
                             if x[1] != '']

    paper_author_pairs = [item for x in paper_author_tuples 
                          for item in list(itertools.product(x[0],x[1]))]

    paper_author_pairs = [(x[0], {k:v for k,v in x[1].items() 
                                  if k in ['_id', 'name', 'org']}) 
                          for x in paper_author_pairs ]

    paper_author_pairs = fix_ids(paper_author_pairs)
 
    paper_author = [{'paper_id': x[0], 'author_id': x[1]['_id']} 
                                for x in paper_author_pairs]

    authors = [x[1] for x in paper_author_pairs]
    
    lst_paper_author.extend(paper_author)
    lst_authors.extend(authors)

    # Издательства (venues)
    # аналогичная обработка, только у одной статьи не много издательств, а одно
  
    # references
    
    if 'references' in chunk:
        references_tuples = [([x[0]], x[1]) 
                             for x in list(zip(chunk['_id'], chunk['references'])) 
                             if x[1] != '']
        references_pairs = [item for x in references_tuples
                          for item in list(itertools.product(x[0],x[1]))]
        lst_references.extend(references_pairs)
        
        del chunk['references']
    
    del chunk['authors']
    
    
    chunk.to_csv(f'clearsets\\papers-{i}.csv', sep='\t', encoding='utf-8')
    i+=1


# формируем csv с уникальными авторами и их id
pd.DataFrame(lst_authors).drop_duplicates(subset='_id').to_csv(f'clearsets\\authors.csv', sep='\t', encoding='utf-8')

# формируем csv со связями статей и авторов
pd.DataFrame(lst_paper_author).to_csv(f'clearsets\\paper_author.csv', sep='\t', encoding='utf-8')

# формируем csv со ссылками статей друг на друга
pd.DataFrame(lst_references).to_csv(f'clearsets\\references.csv', sep='\t', encoding='utf-8')

Пояснения к отдельным вызовам функций я прикрепил в комментариях. В целом, логика работы с датасетом соответствует той, что я описывал ранее.

# извлекаем пары значений (id статьи, [все авторы в статье])

    paper_author_tuples = [([x[0]], x[1]) 
                             for x in list(zip(chunk['_id'], chunk['authors'].fillna(''))) 
                             if x[1] != '']

    # раскрытие списков на вторых позициях кортежей
    # создание уникальных пар (один id статьи, один автор)

    paper_author_pairs = [item for x in paper_author_tuples 
                          for item in list(itertools.product(x[0],x[1]))]

    # фильтрация ключей в авторах, очистка от ненужных атрибутов

    paper_author_pairs = [(x[0], {k:v for k,v in x[1].items() 
                                  if k in ['_id', 'name', 'org']}) 
                          for x in paper_author_pairs ]
 

    # пары айдишников для связи авторов и статей
    paper_author = [{'paper_id': x[0], 'author_id': x[1]['_id']} 
                                for x in paper_author_pairs]

    authors = [x[1] for x in paper_author_pairs]
    
    lst_paper_author.extend(paper_author)
    lst_authors.extend(authors)

Попробуем выполнить этот код.

Оказывается, у некоторых авторов нет даже ID, о чем нам красноречиво сигнализирует эта ошибка.

Чтобы обойти этот недостаток датасета, напишем функцию генерации недостающих ID и интегрируем её в обработку.

import uuid

def fix_ids(pairs):    
    for pair in pairs:
        if '_id' not in pair[1]:
            pair[1]['_id'] = uuid.uuid4().hex[:24]
    return pairs


# создаем отсутствующие айдишники
paper_author_pairs = fix_ids(paper_author_pairs)

Выбор 24-символьного UUID обусловлен тем, что этот формат ID уже применяется в датасете, и у меня нет особой потребности переделывать их под какой‑то другой формат.

За скобками я оставил дополнительный скрипт, который проверяет уникальность каждого ID — поскольку ID будут использоваться как первичные ключи таблиц, необходимо исключить их дублирование. Хотя эта проверка у меня не выявила дублирования (кстати, СУБД тоже нормально приняла датасет). Этими проверками пренебрегать не стоит.

После того, как датасет обработан, остается залить его в заранее подготовленные таблицы и можно спокойно им пользоваться.

Подведем итоги

Когда мы сталкиваемся с большим ненормализованным датасетом, да еще и в формате JSON, который нужно переложить в связанные SQL‑таблицы, мы:

  • Читаем датасет по чанкам;

  • Анализируем датасет на качество, смотрим на атрибуты;

  • Нормализуем датасет, раскрываем связи между сущностями и следим за их целостностью.

В целом, знание этих пунктов позволит сразу адаптировать датасет под реляционные СУБД и значительно сократить время, затраченное на его обработку, ведь со структурной точки зрения его не потребуется переделывать бессчётное количество раз. А для всего остального — есть функционал SQL. Удачи!

Комментарии (4)


  1. Akina
    00.00.0000 00:00

    Если исходный JSON не содержал синтаксических погрешностей (с точки зрения JSON), то весь импорт можно было выполнить чисто средствами Postgre, вообще не привлекая к этому клиента на питоне.

    и встречающиеся иногда невалидные (нарушен синтаксис, баланс скобок) JSON‑записи

    Что-то я не увидел, где выполняются валидация и корректировка...


    1. NewTechAudit Автор
      00.00.0000 00:00

      Добрый день! В моем случае, JSON было необходимо почистить от подстроки «NumberInt», которая была сохранена в JSON вне формата и ломала его. Т.к. статья про извлечение реляционных датасетов из построчного – посчитал нужным опустить описание корректировки и валидации, впредь учту.

      За подсказку про импорт средствами PostgreSQL спасибо.

      Так как JSON не плоский, в полях-внешних ключах (авторы, ключевые слова, etc) будут содержатся списки JSON-сущностей, которые средствами SQL придётся раскрывать также, как и здесь на Python – если необходимо использовать эти связи и строить агрегаты. Или встроенными средствами можно раскрыть эти связи из одного JSON сразу в несколько таблиц? Буду рад, если подскажете.


  1. mst_72
    00.00.0000 00:00
    +2

    Исходный JSON - это типа одна строка - один объект? Или что там было? Судя по "lines = True" так и есть.

    А, значит, предобработку можно было или вне пандаса делать (убрать "синтаксические погрешности") на чистом питоне. Ну или вообще на спарке (было бы даже прикольнее).

    А так как-то странно... Особенно с учётом зачем-то вывода в CSV


    1. NewTechAudit Автор
      00.00.0000 00:00

      Добрый день! Спасибо за обратную связь. В данном случае чуть сложнее, чем Вы описали.. В датасете одна строка – один объект (статья), НО в каждом из этих объектов вложен список других объектов, представляющих собой отделяемые от статей сущности (авторы, ключевые слова, etc).

      Если бы было достаточно работать со записями статей без агрегатов по тем же авторам – согласен, можно было бы обойтись корректировкой и валидацией, после чего загружать в БД. Но работа со связанными сущностями в этом конкретном случае потребовалась, и пришлось придумывать решение.

      Вывод в CSV можно легко переделать в связку с pandas.to_sql() + psycopg/sqlalchemy – этот момент я оставляю на усмотрение читателя.