Одним из первых моих проектов было распознавание рукописных анкет и перевод их в формат Excel для загрузки в CRM в качестве лидов. Сейчас анкеты обрабатываются операторами, в пиковом сезоне время обработки достигает трех недель. За это время лид успевает остыть, как следствие - упущенная прибыль для бизнеса. Появилась идея обрабатывать рукописные анкеты с помощью моделей OCR. Это существенно время обработки информации и взаимодействия с потенциальным клиентом. В качестве пользовательского интерфейса используем Телеграм-бот, с которым взаимодействует оператор. На вход подается PDF файл с анкетами, на выход получаем CSV файл для заливки в CRM систему. От написания своей модели OCR на данном этапе отказались и выбрали готовую технологию Yandex OCR. Взаимодействие с моделью происходит посредством post-запроса, его подробно описывать не буду, на это есть документация от Yandex, там все подробно рассказано. Отмечу только, что результат распознания мы получаем в виде структурированного JSON распознанных блоков текста, строк и слов с указанием их местоположения на изображении.

Пример анкеты
Пример анкеты

Казалось бы задача распознавания текста достаточно тривиальная и давно решенная, но стоит погрузиться в современные подходы к сканированию и передаче информации, проанализировать текущие бизнес-процессы, как все становится не так однозначно.

Первое с чем я столкнулся - это то, что сканером (большой ящик с крышкой и стеклом) никто уже почти не пользуется. Представитель, который занимается сбором анкет, формирует PDF файл, посредством сканирования через телефон. Отсюда возникает ряд проблем:

  • Разрешение полученных изображений из PDF разное

  • Проблемы с перспективой и горизонталью изображения

  • На одной странице может быть несколько анкет, но в список на загрузку они должны попасть как отдельные элементы

Также необходимо было предусмотреть ситуацию для развития проекта и оптимизации бизнес- процесса, когда представитель делает снимок документа и отправляет его через блок распознания непосредственно в CRM. В этом случае тип файла для обработки будет не PDF, а IMAGE.

Собрав воедино все требования по универсальности ожидаемого решения, я начал искать в интернете примеры реализации алгоритмов разбора распознанного текста и его структуризации. Не найдя ничего подходящего, начал прорабатывать идею сам и решил поделиться своим опытом с сообществом. Надеюсь он будет кому-то полезен.

Первое с чего я начал и в итоге получил хорошую оптимизацию - это загрузка файлов и перевод их в тип IMAGE, для дальнейшей отправки на распознавание. Дело в том, что у Yandex несколько базовых моделей OCR: для работы с изображением, для работы с одностраничным PDF и для многостраничного PDF. Последний вариант использует двухступенчатый запрос: сначала файл сохраняется на сервере, затем с помощью get - запроса отправляется в OCR - модель.

А в моем случае, по требованиям, на вход мы можем получить как многостраничный PDF так и фото анкеты. Что тогда делать? Определять на входе тип файла и в зависимости от типа отправлять в подходящий блок запроса?

А что если пользоваться одной моделью OCR, которая работает с изображениями и обрабатывать PDF внутри своей программы, деля его на страницы с дальнейшим переводом в IMAGE?.

Решая эту задачу я нашел отличную библиотеку Fitz (PyMuPDF - актуальное название пакета), которая принимает на вход не только PDF но и изображения, переводит если надо полученный файл в изображение и сохраняет его в папке. Таким образом получается легкий универсальный обработчик, и далее мы работаем только с одной моделью Yandex OCR, которая обрабатывает изображения:

def file_to_png(file_path, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    pdf_document = fitz.open(file_path)

    # Проходим по всем страницам PDF и сохраняем их в формате PNG
    for page_number in range(len(pdf_document)):
        page = pdf_document.load_page(page_number)
        image = page.get_pixmap()

        
        image.save(os.path.join(output_folder, f'img{page_number+1}.png'))

    pdf_document.close()

Далее можно переводить изображения в формат base64 и отправлять запрос на распознавание, но стоит вспомнить вот о чем: изображение может быть повернуто относительно горизонтали. В этом случае покоординатный разбор результата усложнится, и на выходе мы не получим качественных структурированных данных. Это был для меня неочевидный момент, который я понял, пройдя весь исследовательский путь разработки до получения финального результата. В написании статьи я буду двигаться последовательно, по отработанному алгоритму. Поэтому напишем функцию, которая использует библиотеку OpenCV, находит на изображении линии, близкие к горизонтальным, вычисляет средний угол, и делает поворот изображения на минус средний угол. Тем самым исходное изображение выравнивается относительно горизонтали:

def average_slope(lines):
    slopes = []
    for line in lines:
        for x1, y1, x2, y2 in line:
            slope = np.arctan2(y2 - y1, x2 - x1) * (180 / np.pi)
            slopes.append(slope)
    # Фильтруем вертикальные линии, которые имеют угол ~90 или ~-90 градусов
    slopes = [slope for slope in slopes if not np.isclose(abs(slope), 90, atol=10)]
    return np.mean(slopes) if len(slopes) > 0 else 0

def rotate_image(image_path, rotate_folder):
    if not os.path.exists(rotate_folder):
        os.makedirs(rotate_folder)
    
    img = cv2.imread(image_path)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    # Обнаружение краев на изображении
    edges = cv2.Canny(gray, 50, 150, apertureSize=3)

    # Обнаружение линий на изображении
    lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=100, minLineLength=100, maxLineGap=10)

    # Вычисление среднего угла наклона линий
    angle = average_slope(lines)

    # Получение размеров изображения
    (h, w) = img.shape[:2]

    # Вычисление центра изображения
    center = (w // 2, h // 2)

    # Применение поворота
    M = cv2.getRotationMatrix2D(center, angle, 1.0)
    rotated = cv2.warpAffine(img, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)

    # Формирование уникального имени файла для каждого фрагмента
    output_path = os.path.join(rotate_folder, f'rotated_{os.path.splitext(os.path.basename(image_path))[0]}.jpg')

    # Сохранение результата
    cv2.imwrite(output_path, rotated)

После выравнивания изображений по горизонтали, можем кодировать их в base64 и отправлять в Yandex OCR:

# Кодировка файла в base 64
def encode_file(file):
  with open(file, 'rb') as file:
    file_content = file.read()
    return base64.b64encode(file_content).decode('UTF-8')

Далее, поскольку мы работаем с многостраничным файлом, создаем пустой список, передаем постранично наши изображения в блок распознания и записываем полученные JSON - файлы в список:

result = []
for img in os.listdir('proc/rotated'):
    image_path = os.path.join('proc/rotated', img)
    
    # Заголовки запроса
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {IAM_TOKEN}',
        'x-folder-id': folder_id,
        'x-data-logging-enabled': 'true'
    }

    body = {
        "mimeType": "image",
        "languageCodes": ["ru", "en"],  
        "model": "handwritten",
        "content": encode_file(image_path)
    }
   
    # Отправка POST-запроса
    response = requests.post(VISION_URL, headers=headers, json=body)
    result.append(response)

Получив массив JSON-ов для дальнейшего разбора, необходимо извлечь текст с координатами:

proc_list = []

for res in result:
    data = json.loads(res.content.decode('UTF-8'))
    elenco= []
    for block in data["result"]["textAnnotation"]["blocks"]:
        width = data["result"]["textAnnotation"]["width"]  # Получаем ширину
        height = data["result"]["textAnnotation"]["height"]  # Получаем высоту
        for line in block['lines']:
            #x_coord_line = [(vertex['x']) for vertex in line['boundingBox']['vertices']]
            #y_coord_line = [(vertex['y']) for vertex in line['boundingBox']['vertices']]
            for word in line['words']:
                text = word['text']
                x_coord = [(vertex['x']) for vertex in word['boundingBox']['vertices']]
                y_coord = [(vertex['y']) for vertex in word['boundingBox']['vertices']]
                elenco.append({"text": text, "x_coord": x_coord, "y_coord": y_coord, "width": width, "height": height})
    proc_list.append(elenco)

Сформировав список словарей можно переходить к покоординатному разбору результата и записи его в датафрейм. Но тут возникает один нюанс: если на отсканированном листе был один документ, то проблем не возникает, каждый словарь из списка содержит результат распознавания одного документа. Но если документов было несколько, то для корректного разбора по ключевым полям и записи в датафрейм в формате: одна строка - один документ, необходимо делить результат распознавания по количеству документов на одной странице и записывать их в отдельные словари.

Чтобы решить эту проблему можно было изначально определять сколько на листе анкет, и с помощью OpenCV делать вырезку и сохранять эти анкеты в отдельные файлы и после этого передавать их в блок распознания. Попробовав на практике решить эту задачу, я от этого подхода отказался, потому что он не получился полностью универсальным: для хорошего результата вырезки нужна была явная рамка вокруг анкеты.

А какой способ более универсальный? В любой анкете есть слово "Анкета", которое написано крупным шрифтом и в случае с Yandex OCR распознается в 100 % случаев. Соответственно мы можем обработать словари и разделить их, если ключевое слово встречается несколько раз. Тем самым мы делим документы, расположенные на одной странице изображения уже после обработки модулем OCR:

def split_on_keyword(data, keyword):
    result = []
    current_list = []

    for item in data:
        if item['text'].lower().strip() == keyword:
            if current_list:
                result.append(current_list)
            current_list = [item]
        else:
            current_list.append(item)
    
    # Добавляем последний список, если он не пустой
    if current_list:
        result.append(current_list)

    return result

Теперь рабочий список сформирован и можно переходить к покоординатному разбору и записи результата в датафрейм.

Алгоритмов разбора было опробовано несколько типов: самый очевидный - это для каждого типа анкеты прописать маску разбора по относительным координатам, указав в каком конкретно месте находится распознанное значение, соответствующее заданному ключу. Но здесь опять же теряется универсальность, необходимо явно указывать тип анкеты и для каждого типа писать свою маску.

Но что если сначала определить список всех встречающихся полей в анкетах, а затем из нашего массива слов с координатами отобрать те, которые встречаются в этом списке и определить их как ключи словаря с координатами начала и конца диапазона, в котором будет лежать распознанное значение?

field_list = [
    'Фамилия',
    'ФИО',
    'ФИО родителя',
    'Имя',
    'Отчество',
    'Дата Рождения',
    'E-mail',
    'Телефон',
    'Телефон родителя',
    'Мобильный телефон',
    'Город',
    'Хобби',
    'Регион',
    'Населенный пункт',
    'Адрес',
    'Паспорт',
    'Дата выдачи',
    'Название учебного заведения',
    'Школа/Колледж',
    'Курс\класс'
]

Как видно выше из списка, ключевые поля могут состоять из нескольких слов. В результате работы блока OCR, мы убедились, что последовательные слова в строке, также последовательно записываются в массив. Значит можно написать рекурсивную функцию, которая будет сравнивать текущее и последующее поле слова и записывать конкатенированное значение в новый список если было найдено соответствие с field_list до тех пор, пока не будет больше найдено ни одного соответствия.

Для сопоставления строк используем небольшую функцию нормализации и делаем небольшой допуск в один символ на ошибки распознания с помощью расстояния Левинштейна:

# Функция нормализации строки для поиска ключей в анкете
def normalize_string(string):
    # Удаляем все неалфавитные символы и приводим строку к нижнему регистру
    return re.sub(r'[^a-zA-Zа-яА-Я0-9]', '', string).lower()


  def merge_texts(sublist, field_list):
    new_list = []
    miss_next = False
    found_next = False
    for i in range(0, len(sublist) - 1):
        if miss_next:
            miss_next = False
            continue
        found = False
        merged_text = sublist[i]['text'] + ' ' + sublist[i + 1]['text']
        for el in field_list:
            if normalize_string(merged_text) in normalize_string(el) or\
                levenshtein_distance(normalize_string(merged_text),normalize_string(el))<=1:
            
                new_dict = {
                    'text': merged_text,
                    'x_coord': sublist[i + 1]['x_coord'],
                    'y_coord': sublist[i + 1]['y_coord'],
                    'width': sublist[i + 1]['width'],
                    'height': sublist[i + 1]['height']
                }
                new_list.append(new_dict)
                found = True
                found_next = True
                miss_next = True
                break
            
        if not found:
            new_list.append(sublist[i])
    
    # Добавляем последний элемент, если он не был объединен
    if not miss_next and len(sublist) > 0:
        new_list.append(sublist[-1])
    
    # Если были найдены объединения, рекурсивно вызываем функцию с новым списком
    if found_next:
        return merge_texts(new_list, field_list)
    else:
        return new_list

После того, как все сочетания полей были найдены, разделяем список на два: в один запишем все ключи с своими координатами, а так же координатой конца диапазона поля (это нужно потому, что на одной строке может располагаться несколько полей и нужно поэтому ограничить область поиска результата), в другом оставим значения, которые далее мы будем сопоставлять с ключами.

def process_text_fields(spisok, field_list):
    ext_list = {}
    for item in spisok:
        for field in field_list:
            if levenshtein_distance(normalize_string(item['text']),normalize_string(field))<=1 or\
                normalize_string(item['text']) == normalize_string(field) and len(item['text']) >= 3:
            
                if field in ext_list:
                    continue
                else:
                    ext_list[field] = {
                                    'x_coord': item['x_coord'],
                                    'y_coord': item['y_coord'],
                                    'width': item['width'],
                                    'height': item['height']}
                    break

    # Находим окончание поля для рукописного текста
    for key in ext_list: 
        found = False
        for other_key in ext_list:
            if key != other_key:
                if abs(int(ext_list[key]['y_coord'][1])/int(ext_list[key]['height'])*100 - int(ext_list[other_key]['y_coord'][1])/int(ext_list[other_key]['height'])*100) <= 1.2 and\
                int(ext_list[key]['x_coord'][1])< int(ext_list[other_key]['x_coord'][1]):
                    ext_list[key]['end_segment'] = ext_list[other_key]['x_coord'][1]
                    found = True
                    break
        if not found:
            ext_list[key]['end_segment'] = ext_list[key]['width']
    
    # после того, как все ключи найдены и записаны в ext_list, формируем очищенный список значений filtred_spisok для дальнейшего разбора
    filtered_spisok = []

    for i in spisok:
        # Устанавливаем флаг, указывающий на необходимость удаления элемента
        remove_item = False
        # Перебираем ключи в ext_list
        for key in ext_list:
            # Проверяем условие, если текст элемента i входит в ключ key
            if normalize_string(i['text']) in normalize_string(key) and len(i['text']) >= 3:
                remove_item = True
                break  # Если условие выполнено, выходим из внутреннего цикла
        # Если флаг не установлен, добавляем элемент в новый список
        if not remove_item:
            filtered_spisok.append(i)
    return ext_list, filtered_spisok

И далее сопоставляем эти два списка и формируем по координатному диапазону ключа, строку значений

def process_extract_text(text_fields, extracted_data):
    for key in extracted_data:
        proc_string = ''
        for item in text_fields:
            if abs(int(item['y_coord'][1])/int(item['height'])*100 - int(extracted_data[key]['y_coord'][1])/int(extracted_data[key]['height'])*100) <=1.2 and\
            int(extracted_data[key]['x_coord'][2]) <= int(item['x_coord'][1]) < int(extracted_data[key]['end_segment'])-5:
                proc_string += item['text']+' ' 
        extracted_data[key]= proc_string

Все результаты работы функции process_extract_text, записываются в финальный датафрейм, который может передаваться в качестве шаблона на загрузку в CRM

Заключение

В результате такого подхода мы получили универсальный алгоритм разбора результата работы модуля OCR для структурированных документов, который не зависит от типов документов, разрешения изображения, расположения на странице и количества документов на одной странице. Интегрируем описанные функции в Телеграм-бот (я писал его на Aiogram) и получаем готовый продукт в довольно сжатые сроки, удобный для использования и без потребности писать front end. Еще один блок этого проекта, достойный описания -это определение и вывод результатов распознавания чек-боксов. Об этом я напишу в следующей статье.

Комментарии (0)