Добрый день, уважаемые читатели! Не открою для большинства секрета, если скажу, что большая часть задач в материалах к учебным курсам сформулирована шаблонно. Какие-то вопросы в принципе могут представлять интерес, но очень оторваны от реальных потребностей бизнеса. Какие-то моменты выдернуты из книг, поэтому лучше знакомиться с ними, читая первоисточник. Но есть кейсы, которые на первый взгляд хоть и кажутся простыми и стереотипными, но, если присмотреться к ним более пристально, могут дать пищу для размышления. Вот на одной из таких полезных задач мне хотелось бы заострить внимание в данной заметке. Формулируется вопрос следующим образом: «Необходимо определить количество пар товаров в продуктовых чеках. Вывести 10 самых частых сочетаний». Пример, чек 1 содержит товар 1, товар 2, товар 3, а чек 2 -  товар 1, товар 2, товар 5. Следовательно, комбинация «товар 1, товар 2» встречается 2 раза, «товар 1 , товар 3» один раз и т.д.

В исходнике решать данный кейс предлагалось силами Python. Но реальная жизнь может потребовать от аналитика данных умения выполнять данное упражнение как с помощью SQL, так и Spark. Следовательно, рассмотрим три подхода, оставив за скобками разговора четвертый вариант – расчеты на платформах BI.

Вариант 1. Python

Вопрос не подкреплялся подходящим датасетом, поэтому я нашел массив данных на портале Kaggle. В нем 541909 записей, 25900 уникальных чеков и 4070 разных кодов товаров. Вы можете взять любой датасет, главное, чтобы в нем были два поля: номер чека и коды товаров либо их наименования. Данные можно сгенерировать и рандомно, но при этом нужно учитывать объем информации. На сформированном массиве в несколько сотен записей вы можете апробировать ваши решения, но вот смогут ли скрипты справиться с сотнями тысяч или даже миллионами строк, вопрос будет оставаться открытым.

Теперь можно приступить к описанию логики, которую затем мы постараемся отразить в коде. В каждом чеке могут встречаться дубликаты товарных кодов, от них нужно избавиться. Группируясь по кодам чеков, мы формируем массивы со списками покупок. Из элементов каждого такого массива составляем уникальные пары кодов. При этом учитываем важный нюанс, чтобы пары товаров не дублировались, сортируем коды в паре по возрастанию. Иначе мы неизбежно столкнемся с ситуацией, что «100-101» и «101-100» это разные варианты. Полученные пары товаров и количество этих пар нужно где-то хранить. В данном случае, на мой взгляд, оптимальнее всего использовать словарь, где ключом будет выступать кортеж из кодов. Один из возможных вариантов такого подхода.

df = pd.read_csv('/content/data_two_columns.csv')
df_group = df.groupby(by=['invoiceno'])['stockcode'].apply(set).reset_index(name='list_stockcode')
dict_couple_products_amount = {}

for current_set in df_group['list_stockcode']:
  if len(current_set)>1:
    list_combinations = [tuple(sorted(_)) for _ in list(itertools.combinations(current_set, 2))]
    for element_list_combinations in list_combinations:
      if dict_couple_products_amount.get(element_list_combinations) is None:
        dict_couple_products_amount[element_list_combinations] = 1
      else:
        dict_couple_products_amount[element_list_combinations] = dict_couple_products_amount[element_list_combinations] + 1

df_final = pd.DataFrame(dict_couple_products_amount.items(),columns=['couple_stockcode','count'])
df_final = df_final.sort_values(by=['count'], ascending=False)
df_final.head()

Можно, конечно, применить хеш-функцию для формирования ключей, но это автоматически приведет к усложнению решения. С полной версией ноутбука можно ознакомиться по указанной ссылке. Время заполнения словаря 45 секунд. Забегая вперед, хочу сказать, что это лучшее время по сравнению с другими вариантами.

Вариант 2. Spark (PySpark)

Код, написанный под данный инструмент аналитики, оказался самым медленным. Время работы либо около 2 минут либо более 4. Тестировал на платформах Google Colab и Community Databricks.

spark = SparkSession.builder.master('local[*]').appName("CoupleCodes").getOrCreate()

df = spark.read.format('csv') \
                             .options(inferSchema='true', delimiter=',', header='true') \
                             .load('/content/data_two_columns.csv')

df_group = df.groupBy('invoiceno').agg(F.collect_set('stockcode').alias('list_stockcode'))
df_group = df_group.filter(size(df_group.list_stockcode)>1)

def create_combinations_stockcode(list_stockcode:List)->List: 
 list_combinations = list(itertools.combinations(sorted(list_stockcode), 2))
 return [str(i[0])+', '+str(i[1]) for i in list_combinations]

combinations_stockcode_udf = udf(lambda x: create_combinations_stockcode(x),T.ArrayType(T.StringType()))
df_group_combinations = df_group.withColumn("combinations_stockcode", combinations_stockcode_udf(col("list_stockcode")))

df_group_combinations.select(explode(col('combinations_stockcode')).alias("couple_stockcode")) \
        .groupBy('couple_stockcode') \
        .count() \
        .sort(col("count").desc()) \
        .toPandas() \
        .head()

Краткие комментарии. Во-первых, я перевожу итоговый результат в датафрейм Pandas (toPandas().head()). Я сознательно пошел на такой шаг, чтобы улучшить отображение вывода, но практического смысла в этом нет. Во-вторых, пришлось применять функцию udf, внутри которой работает стандартная библиотека itertools, аналога которой я не нашел на PySpark. По факту в Spark перекочевала логика решения на Python. Исключением является только момент, где отрабатывает explode(), происходит разворот массива с парами кодов в колонку, а не их складирование в словарь. Но это может быть не оптимально! Мне кажется, что если лучше разобраться с возможностями PySpark, то можно написать более элегантное и быстрое решение. Но получилось, как получилось… В целом подход рабочий, но медленный.

Вариант 2. Spark (Scala)

Время - 5 минут. Тестировал только на Community Databricks. Тут без комментариев, так как в Scala не силен. Но опять же под подозрением функция udf.

val path = "/FileStore/tables/data_two_columns.csv"
val df = spark.read.option("delimiter", ",").option("header", "true").csv(path)

val df_group = df.groupBy("invoiceno").agg(collect_set("stockcode").as("list_stockcode"))

val df_group_filter = df_group.filter(size(df_group("list_stockcode"))>1)

val combinationsStockСodeUDF= udf((l: Array[String]) => l.sorted.toSeq.combinations(2).toList)
val df_group_combinations = df_group_filter.select(col("invoiceno"),combinationsStockСodeUDF(col("list_stockcode")).as("combinations_stockcode"))

val df_final = df_group_combinations.select(explode(col("combinations_stockcode")).as("couple_stockcode"))
df_final.groupBy("couple_stockcode").count().sort(col("count").desc).show(10)

Вариант 3. SQL

Несмотря на то, что большинство аналитиков довольно уверенно владеют языком запросов, данная задача может вызвать ряд затруднений. Дело все в том, что основная комбинаторика в SQL достигается за счет JOIN-ов. Если их использовать в лоб, тривиальный код не позволит обсчитать даже демонстрационные данные. Допустим, мы возьмем все  уникальные коды товаров. Чтобы получить пары применим CROSS JOIN c условием WHERE tbl1.stockcode < tbl2.stockcode, чтобы избавиться от повторов. Применяя декартово произведение на всем многообразии кодов, мы попадаем в ловушку. Образуется слишком много сочетаний, часть из которых не будет представлена в реальных чеках. Дальше к этому столбцу нужно будет делать LEFT JOIN по сложному условию так, чтобы и первый и второй код из пары были в сгруппированном массиве кодов товаров в чеке. Технически это можно достигнуть через функцию ANY. Что получается в итоге. Массив из пар кодов (больше 8 миллионов комбинаций) соединяется с другой таблицей, при этом в худшем случае можно ожидать более 800 соответствий для одной пары кодов. Выполнить до конца такой скрипт в разумное время у меня не получилось. Хотя маленькие блоки информации так анализировать вполне возможно.

Какой тогда выход? В принципе не нужно совсем отказываться от комбинаторики, но необходимо проводить ее на уровне чека, чтобы в пары кодов не попадали случайные сочетания. Ключевую роль здесь сыграют две не самые популярные функции ARRAY_AGG() и UNNEST() из арсенала PostgreSQL. Одна из них собирает значения столбца в массив при группировке, а вторая наоборот разворачивает элементы массива в строки. Работа данных функций не является уникальной, поэтому их аналоги должны быть и в других базах данных.

sql = """with tbl_no_duplicates as (select s.invoiceno, 
                                           s.stockcode 
                                    from sales as s 
                                    group by s.invoiceno, 
                                             s.stockcode),
      
              tbl_list_code_combinations as (select t.invoiceno, (select array_agg(concat(cast(t1.* as text),', ',cast(t2.* as text)))
                                                                  from unnest(array_agg(t.stockcode)) as t1 
                                                                                     cross join unnest(array_agg(t.stockcode)) as t2
                                                                  where t1.* < t2.*) as agg
                                             from tbl_no_duplicates as t
                                            group by t.invoiceno)

select unnest(t.agg) as couple_stockcode, count(*)
from tbl_list_code_combinations as t
group by  unnest(t.agg) 
order by count(*) desc
limit 10"""

Весь остальной код не требует особых комментариев. Остается лишь добавить, что скрипт полностью отработал за 55 секунд, лишь немного уступив варианту на Python. При этом важным преимуществом такого подхода является то, что результаты можно получить прямо в базе данных, минуя выгрузку.

На этом все. Всем здоровья, удачи и профессиональных успехов!

P.S. Раздел с SQL вызвал больше всего вопросов у читателей и это хорошо. Датасет это E-Commerce Data (Actual transactions from UK retailer) с Kaggle. Изначально индексы не применялись. Все присланные варианты я свел в один скрипт, так как, в принципе, они говорили об одном и том же. Для сравнения с уже представленным вариантом подготовил два ноутбука с PostgreSQL 10 и PostgreSQL 14. Время выполнения запросов получилось диаметрально противоположным для одинаковых скриптов (его можно увидеть, если открыть ноутбук через nbviewer). По всей видимости сказывается то, что разные версии БД строят разные планы запросов.

Комментарии (4)


  1. Akina
    27.03.2022 13:06
    +3

    Вопрос не подкреплялся подходящим датасетом, поэтому я нашел массив данных на портале Kaggle. В нем 541909 записей, 25900 уникальных чеков и 4070 разных кодов товаров. Вы можете взять любой датасет

    И Вы не нашли ничего лучше, чем последовать за авторами вопроса и точно так же оставить читателя без массива данных для воспроизведения написанного?

    Забегая вперед, хочу сказать, что это лучшее время по сравнению с другими вариантами.

    Ничего не могу сказать ни про Питон, ни про Спарк. Но вот SQL-вариант... он ужасен. И неудивительно, что он проиграл (справедливости ради следует отметить - не так уж и много) варианту на питоне. Хотя какие-то вшивые полмиллиона записей, без какой-либо обработки - да при правильно построенном запросе должно было уделать питоновский вариант приблизительно на порядок (да-да, не в два раза, и не в три - на порядок), даже при отсутствии подходящих индексов.


  1. DuD
    27.03.2022 13:43
    +2

    В Spark переходя на python вы попадаете как минимум на сериализацию/десериализацию данных. По сути вы получили скорость работы локального Python скрипта + время поднятия/запуска spark job.

    Spark вообще c local[*] имеет смысл запускать только для целей отладки. Spark это не локальная утилита для расчетов. Он про распределенные вычисления на кластерах и большие массивы данных. Так же там нужно озаботиться правильным шардированием данных по нодам, чтобы соблюдать data locality и избегать shuffle.

    Это как взять камаз для доставки спичечного коробка по высокоскоростной магистрали. Да, легковушка довезет быстрее. Вот только если вам этих спичек нужно будет привезти тонну, производительность камаза даже с его 80км/ч будет гораздо выше.


    1. sshikov
      27.03.2022 13:53

      А тут еще и udf, с которыми в pyspark плохо. Не верится что-то в такие измерения от слова совсем.


  1. itille
    28.03.2022 09:16
    +2

    В части SQL полная ахинея.

    SELECT sc1, sc2, Count(*) as kol FROM
    
    (
    
    SELECT a.invoiceno, a.stockcode as sc1, b.stockcode  as SC2
    
    FROM sales a JOIN sales b ON a.invoiceno=b.invoiceno AND a.stockcode < b.stockcode 
    
    GROUP BY a.invoiceno, a.stockcode, b.stockcode
    
    ) AA
    
    GROUP BY sc1, sc2
    
    ORDER BY kol DESC

    При соответствующих индексах на такой выборке будет около секунды