Привет, Хабр!

На связи участники профессионального сообщества NTA Кухтенко Андрей, Кравец Максим и Сиянов Артем.

Навигация по посту

Постановка задачи

Любой текст — это не просто коллекция слов, он содержит мысли и намерения автора. Но вручную невозможно обработать огромное количество текстовой информации и понять какие данные они могут содержать. В таком случае нам поможет кластеризация текста, которая позволит получить представление о данных.

Существует большое количество библиотек, предоставляющих различный функционал в этой области, одной из которых является модуль ml фреймворка Apache Spark, считающегося стандартом в области обработки больших данных. Данный фреймворк входит в экосистему Hadoop — платформы для распределенного хранения и обработки большего объема данных. Несмотря на то, что его функционал не так широк, как возможности, предоставляемые, например, scikit‑learn, он позволяет в короткие сроки обработать огромное количество информации на распределенном кластере. Для работы с данным фреймворком на языке Python существует интерфейс PySpark.

Мы не нашли ни одной статьи на русском языке, которая была бы конкретно посвящена этой задаче, а в статьях на английском языке тоже было мало информации и объяснений. Поэтому мы и решили написать пост.

Статьи на английском, которые мы нашли:

Для кластеризации текста с помощью pyspark.ml мы создали таблицу, содержащую в себе sms‑сообщения, имитирующие настоящие. Сообщения будут содержать важную для нас информацию о покупках, оплатах и сбоях при выполнении этих операций. Пример данных из этой таблицы приведен ниже:

Наша задача состоит в том, чтобы разбить все сообщения на группы, каждая из которых будет содержать в себе сообщения одного типа.

Создание сессии Spark и импорт необходимых модулей

Для того чтобы создать Spark сессию мы написали следующий код:

from pyspark import SparkContext, SparkConf, HiveContext
# запуск сессии спарка
conf = SparkConf().setAppName('spark_dlab_app') 
conf.setAll(
    [
        #Укажите тут нужные параметры Spark
    ])
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

Все параметры сессии можно посмотреть здесь

Импортируем модули для дальнейшей работы:

# для создания пользовательских функций
from pyspark.sql.functions import udf 
# для использования оконных функций
from pyspark.sql.window import Window
# для работы с PySpark DataFrame
from pyspark.sql import DataFrame
# для задания типа возвращаемого udf функцией
from pyspark.sql.types import StringType
# для создания регулярных выражений
import re
# для работы с Pandas DataFrame
import pandas as pd
# для предобработки текста
from pyspark.ml.feature import HashingTF, IDF, Word2Vec,\
                               CountVectorizer, Tokenizer, StopWordsRemover
# для кластеризации
from pyspark.ml.clustering import Kmeans, BisectingKmeans
# для создания пайплайна
from pyspark.ml import Pipeline
# для подсчета частоты слов в тексте
from nltk.probability import FreqDist

Предварительная обработка текста

Первым делом создадим DataFrame из данных, которые находятся на Hadoop, в нашей сессии:

t = spark.table('data')

Поскольку в тексте содержится много информации, которая не несёт никакой смысловой нагрузки, например, различные цифры или знаки препинания, мы её удалим. Для этого написали UDF‑функцию, которая почистит текст с помощью регулярных выражений.

def text_prep(text):
   # переводим текст в нижний регистр
    text = str(text).lower()
   # убираем всё, что не русская буква, и убираем слово «баланс»
    text = re.sub('[^а-яё]|баланс',' ',text)
   # убираем всё, что начинается с «от»
    text = re.sub('от.+','',text)
   # убираем одиночные буквы
    text = re.sub('\s[а-яё]\s{0,1}','',text)
   # если пробелов больше одного заменяем их на один
    text = re.sub('\s+',' ',text)
   # убираем лишние пробелы слева и справа
    text = text.strip()
    return text
# создание пользовательской функции
prep_text_udf = udf(text_prep, StringType())

Применим нашу функцию и уберем пустые строки:

t = t.withColumn('prep_text', prep_text_udf('sms_text'))\
    .filter('prep_text <> ""')

В результате у нас остался текст, который содержит только интересующую нас информацию.

Для автоматизации векторизации (процесса преобразования текстовых данных
в числовые) текста мы создали конвейер (Pipeline), включающий в себя все необходимые для нее этапы. Для сравнения результатов мы в него добавили три популярных feature extractor, преобразующих текст в числовой вектор: CountVectorizer, HashingTF+ IDF и Word2Vec.

Данный конвейер содержит:

  • Tokenizer, разбивающий текст на последовательность токенов — самостоятельных смысловых единиц, в нашем случае это отдельные слова.

  • StopWordsRemover, удаляющий стоп‑слова русского языка. Если в предметной области существуют специфические стоп‑слова, отсутствующие в списке стоп‑слов PySpark, то их можно добавить вручную, как это сделать будет описано ниже.

  • HashingTF просто хеширует текст и заменяет слова на их частоты. Является аналогом CountVectorizer, в данном посте используется в сочетании с IDF (Inverse Document Frequency), которая является инверсией частоты. Применение IDF позволяет уменьшить вес широкоупотребительных слов. Важно отметить, что IDF не используется сама по себе, она применяется к результату подсчета частот слов (Term Frequency, в нашем случае к результату HashingTF).

  • Word2Vec — это модель, основанная на нейронной сети, позволяющая сохранять смысл слов в их векторном представлении. Слова, встречающиеся в тренировочном текстовом корпусе рядом с одинаковыми словами, будут иметь близкие по косинусному расстоянию векторные представления.

  • CountVectorizer заменяет текст на разреженный вектор (sparse vector), где колонки — все слова в корпусе, а значения — число вхождений каждого слова в документе.

Код для создания Pipeline приведен ниже:

Tokenizer = Tokenizer(inputCol = 'prep_text', outputCol = 'tokens')

# загрузим стоп-слова из pyspark
rus_stopwords = StopWordsRemover.loadDefaultStopWords('russian')
# загрузим локальные стоп-слова
with open('/home/datalab/nfs/stopwords.txt', 'r') as f:
    stopwords = [line.strip() for line in f]
# добавляем локальные стоп-слова к стоп-словам из pyspark
rus_stopwords.extend(stopwords)
# получим только уникальные значения из списка
rus_stopwords = list(set(rus_stopwords))
stopwordsRemover = StopWordsRemover(inputCol = 'tokens', 
                                    outputCol = 'clear_tokens', 
                                    stopWords = rus_stopwords)

hashingTF = HashingTF(inputCol = 'clear_tokens', outputCol = 'rawFeatures')
idf = IDF(inputCol = 'rawFeatures', outputCol = 'TfIdfFeatures', minDocFreq = 5)

word2Vec = Word2Vec(inputCol = 'clear_tokens', outputCol = 'Word2VecFeatures')

countVec = CountVectorizer(inputCol = 'clear_tokens', 
                           outputCol = 'CountVectFeatures')

pipeline = Pipeline(stages = [regexTokenizer, stopwordsRemover, 
  				     hashingTF, idf, word2Vec, countVec])
# применяем наш pipeline
pipeline_fit = pipeline.fit(t)
t = pipeline_fit.transform(t)

Теперь данные очищены от ненужной информации и переведены в числовой вид для дальнейшей их кластеризации. Текущий вид данных приведен на рисунке ниже:

Кластеризация

Перед началом кластеризации узнаем, сколько уникальных последовательностей токенов существует в наших данных, сделать это можно с помощью следующего кода:

t.groupBy('clear_tokens').count().count()

Всего у нас получилось 6294 таких последовательностей. Это количество слишком велико для того, чтобы кластеризовать sms просто по их значениям. Поэтому используем алгоритм кластеризации.

В Spark существуют 6 алгоритмов кластеризации: KMeans, BisectingKMeans, Latent Dirichlet allocation (LDA), Power iteration clustering (PIC), Gaussian mixture и StreamingKMeans.

StreamingKMeans используется для кластеризации потоковых данных; PIC — кластеризации графов; LDA находится в активной разработке и поэтому результаты в будущем могут оказаться неактуальными; Gaussian mixture разбивает данные на нормально распределенные кластеры, что не подходит для нашей задачи.

Поэтому для кластеризации используем алгоритмы KMeans и BisectingKMeans.

О KMeans

Все элементы разбиваются на заданное количество кластеров, для каждого кластера высчитывается центр, затем каждый элемент распределяется кластеру, расстояние до центра которого для него минимально. Данные итерации продолжаются, пока изменения центров не станут ниже заданного порога.

О BisectingKMeans

Создается один кластер, затем он делится на два кластера, используя KMeans. Полученные кластеры в свою очередь делятся, пока количество кластеров не достигнет заданного значения или становится невозможно их больше разделить.

Алгоритм BisectingKMeans мы используем для того, чтобы сравнить результаты методом KMeans, к тому же интересно посмотреть, поделит ли этот алгоритм наши данные, на меньшее количество кластеров, чем ему будет задано.

Перед этим мы посчитали количество кластеров методом локтя, и выяснили, что их количество 36.

Поскольку мы векторизовали наши данные тремя разными способами, то алгоритмы кластеризации мы должны применить к каждому из них, для этого создадим список, содержащий в себе названия колонок таблицы, где находятся векоризированные данные, и два цикла, где применим наши алгоритмы к этим колонкам. Код для кластеризации приведен ниже:

cols = ['TfIdfFeatures', 'Word2VecFeatures', 'CountVectFeatures']

for col in cols:
    kmeans = KMeans().setK(36)\
               .setFeaturesCol(col)\
                     .setPredictionCol(f'kmeans_clusters_{col}')
    km_model = kmeans.fit(t)
    t = km_model.transform(t)

for col in cols:
    bkm = BisectingKMeans().setK(36)\
                           .setFeaturesCol(col)\
                           .setPredictionCol(f'BisectingKMeans_clusters_{col}')
    bkm_model = bkm.fit(t)
    t = bkm_model.transform(t)

Теперь в нашей таблице содержится 6 колонок с результатами кластеризации: kmeans_clusters_TfIdfFeatures, means_clusters_Word2VecFeatures, kmeans_clusters_CountVectFeatures, BisectingKMeans_clusters_TfIdfFeatures, BisectingKMeans_clusters_Word2VecFeatures и BisectingKMeans_clusters_CountVectFeatures.

Чтобы узнать, какая информация содержится в каждом кластере, мы выполнили следующие действия:

  • создали список столбцов с кластерами, которые написаны выше;

  • для каждого кластера в столбце получили весь содержащийся в нем текст в виде списка;

  • с помощью функции FreqDist посчитали частоты для каждого уникального слова и взяли из него 5 первых значений;

  • создали словарь, в котором будет содержаться номер кластера и топ-слова;

  • преобразовали этот словарь в Pandas DataFrame, а его в PySpark DataFrame;

  • создали переменную, в которую будем записывать результат кластеризации, содержащий номер кластера и количество значений в нем, и соединим его с таблицей, содержащей топ-слова;

  • записали все эти таблицы на Hadoop, чтобы в дальнейшем с ними работать.

col_clusters = ['kmeans_clusters_TfIdfFeatures',   
                'kmeans_clusters_Word2VecFeatures',
   		    'kmeans_clusters_CountVectFeatures',
 		    'BisectingKMeans_clusters_TfIdfFeatures',
  		    'BisectingKMeans_clusters_Word2VecFeatures',
  		    'BisectingKMeans_clusters_CountVectFeatures']
for col in col_clusters:
    for i in range(36):
        ls = []
        tmp = t.select('clear_tokens',col).filter(f"{col} = {i}").collect()

        tmp = [tmp[j][0] for j in range(len(tmp))]

        for el in tmp:
            ls.extend(el)

        fdist = list(FreqDist(ls))[:5]

        d = {i:fdist}
        d = pd.DataFrame(list(d.items()), columns = [col, 'top_words'])
        d = spark.createDataFrame(d)

        tmp_t = t.groupBy(col).count()\
                                .orderBy('count', ascending = False)\
                                .join(d, [col])

        tmp_t.write.mode('append').saveAsTable(f'{col}_itog')

Результаты кластеризации

Чтобы оценить работу алгоритмов кластеризации и feature extractor, мы перенесли результаты в Excel и выделили одинаковыми цветами кластера, которые могли бы находиться в одном.

Результаты для HashingTF-IDF + KMeans и HashingTF-IDF + BisectingKMeans
Результаты для HashingTF-IDF + KMeans и HashingTF-IDF + BisectingKMeans

На рисунке выше видно, что в обоих случаях много кластеров, которые можно объединить в один, к тому же алгоритм BisectingKMeans разбил наши данные на 32 кластера, вместо заданных 36.

Результаты для CountVectorizer + KMeans и CountVectorizer + BisectingKMeans
Результаты для CountVectorizer + KMeans и CountVectorizer + BisectingKMeans

Мы видим, что CountVectorizer в связке с KMeams показал результат лучше, чем HashingTF+ IDF, но с BisectingKMeans он отработал хуже.

Результаты для Word2vec + KMeans и Word2vec + BisectingKMeans
Результаты для Word2vec + KMeans и Word2vec + BisectingKMeans

Word2Vec в связке с KMeans показал наилучший результат из всех, но с  BisectingKMeans он показал худший результат, поскольку разбил наши данные на 26 кластеров, вместо заданных 36.

Заключение

Хуже всего показал себя алгоритм кластеризации BisectingKMeans, поскольку выполнил лишнее разбиение кластеров с большим количеством элементов и, как результат, не выделил некоторые другие кластеры. Из этого мы можем сделать вывод, что BisectingKMeans лучше работает на данных, более сбалансированных по количеству элементов в кластерах. Также данный алгоритм целесообразнее использовать при количестве кластеров выше исследованного (в нашем случае выше 36).

Лучше всего себя проявил Word2Vec в связке с KMeans, потому что в результате было получено наибольшее число кластеров, а именно 34 из 36, содержащих уникальную информацию и осталось меньше всего кластеров, которые можно объединить.

В итоге используя только функции pySpark ml, мы провели кластеризацию sms‑сообщений. В результате получены 34 явно различающихся кластера, каждый из которых содержит определенный класс сообщений. Функционала pySpark ml вполне хватает для выполнения типовых операций и, в большинстве случаев, нет необходимости использовать специализированные фреймворки для машинного обучения, обрабатывающие большие данные гораздо медленней.

Комментарии (4)


  1. kemsky
    31.05.2023 17:50

    Понятно, что это пример, поэтому он довольно дубовый.

    У меня есть вопрос по близкой задаче. Нужно кластеризовать пользовательский ввод (свести к словарю имен). Каждая строка это одно или несколько названий фирм/банков/организаций, с разделителями и без, часто с сокращениями, опечатками, неполные. Что можете посоветовать? Я пробовал DbScan с разными функциями расстояний типа левенштейна, но толку не много.


    1. kodokurtn
      31.05.2023 17:50

      попробуйте получать эмбеддинги с помощью моделей трансформеров https://www.sbert.net/docs/usage/semantic_textual_similarity.html, которые ищут семантическую близость, это занимает больше времени для анализа, но результаты получаются довольно хорошие, особенно в связке с umap, если подобрать размерность.


    1. NewTechAudit Автор
      31.05.2023 17:50

      Добрый день!

      DBSCAN с Левенштейном должен был выделить явные кластеры при их наличии. Если не пробовали, точно стоит поиграться с весами операций (например сделать дешевле добавление символа, чтобы сгруппировать сокращения). Конкретнее на вопрос без более подробного описания, что не так с результатами DBSCAN, ответить не можем.


      1. kemsky
        31.05.2023 17:50

        Спасибо. Допустим есть два банка: Capital One Bank N.A. и Bank of America. И примеры пользовательского ввода: "capital one", "cap one", "capital one bank", "capital one bank na", "capital one / bofa", "bofa", "bnk of america" и так далее, иногда есть разделитель, иногда нет. Надо весь этот ввод разобать и построить список банков. В реальности этих имен там очень много, это все банки, коллекторские организации, кредитные программы, названия магазинов и тп.