Команда VK Cloud перевела статью о правилах оформления кода в PySpark. Они не обязательны для исполнения, но помогут сделать ваш код более читабельным и удобным для последующих проверок и изменений.

Импортируйте в переменную


Это помогает не загрязнять глобальное пространство имен и избегать конфликтов методов из разных пакетов.

from pyspark.sql import functions as F
from pyspark.sql import types as T

Рефракторьте сложные логические операции


В сложных запросах трудно разобраться. Лучше разбить их на логические группы и потом объединить.

# Bad
F.when( (F.col('prod_status') == 'Delivered') | (((F.datediff('deliveryDate_actual', 'current_date') < 0) & ((F.col('currentRegistration') != '') | ((F.datediff('deliveryDate_actual', 'current_date') < 0) & ((F.col('originalOperator') != '') | (F.col('currentOperator') != '')))))), 'In Service')

# Good
has_operator = ((F.col('originalOperator') != '') | (F.col('currentOperator') != ''))
delivery_date_passed = (F.datediff('deliveryDate_actual', 'current_date') < 0)
has_registration = (F.col('currentRegistration').rlike('.+'))
is_delivered = (F.col('prod_status') == 'Delivered')
is_active = (has_registration | has_operator)

F.when(is_delivered | (delivery_date_passed & is_active), 'In Service')

В примере выше мы сгруппировали условие оператора в единое целое. Здесь же мы сначала отделили друг от друга и потом объединили статус доставки, регистрацию, активность и другие условия. В этом случае выражение F.when получается лаконичным.

Откажитесь от withColumnRenamed


Если возможно, используйте alias, а не withColumnRenamed.

#bad
df.select('key', 'comments').withColumnRenamed('comments', 'num_comments')

# good
df.select('key', F.col('comments').alias('num_comments'))

Откажитесь от withColumn


В некоторых случаях для преобразования типов можно отказаться от метода withColumn и использовать select, а потом cast.

# bad
df.select('comments').withColumn('comments', F.col('comments').cast('double'))

# good
df.select(F.col('comments').cast('double'))

Используйте lit(None)


Лучше добавить None, чем оставить поле пустым или указать NA. Тогда мы можем использовать утилиты вроде isNull вместо проверки пустых строк, нулевых значений, 'NA' и т. п.

# bad
df = df.withColumn('foo', F.lit(''))
df = df.withColumn('foo', F.lit('NA'))

# good
df = df.withColumn('foo', F.lit(None))

Используйте явные операции join


# bad
flights = flights.join(aircraft, 'aircraft_id')
flights = flights.join(aircraft, 'aircraft_id', 'inner')

# good
flights = flights.join(aircraft, 'aircraft_id', how='inner')

Избегайте right-операций join


# bad
flights = aircraft.join(flights, 'aircraft_id', how='right')

# good
flights = flights.join(aircraft, 'aircraft_id', how='left')

Разберитесь с похожими столбцами


import pyspark.pandas as ps
psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
psdf.columns = ["a", "a"]

Reference 'a' is ambiguous, could be: a, a.

  • При соединении таблиц заранее избавляйтесь от похожих столбцов, если в DF есть столбцы с одинаковыми именами.
  • Если нам нужны оба столбца, лучше всего переименовать один из них до соединения.
  • Разбираться с проблемными столбцами всегда нужно до формирования датасета. После окончания преобразования вы уже не сможете их различить.
  • А еще не стоит задавать имена столбцов, чувствительные к регистру.

Используйте круглые скобки, чтобы избежать \ (явных разрывов строки)


# bad
df = df.filter(F.col('event') == 'executing')\
    .filter(F.col('has_tests') == True)\
    .drop('has_tests')

# good
df = (
  df
  .filter(F.col('event') == 'executing')
  .filter(F.col('has_tests') == True)
  .drop('has_tests')
)

Не соединяйте преобразования разных типов в цепочку


# bad
df = (
    df
    .select('a', 'b', 'c', 'key')
    .filter(F.col('a') == 'truthiness')
    .withColumn('boverc', F.col('b') / F.col('c'))
    .join(df2, 'key', how='inner')
    .join(df3, 'key', how='left')
    .drop('c')
)

# better (separating into steps)
# first: we select and trim down the data that we need
# second: we create the columns that we need to have
# third: joining with other dataframes

df = (
    df
    .select('a', 'b', 'c', 'key')
    .filter(F.col('a') == 'truthiness')
)

df = df.withColumn('boverc', F.col('b') / F.col('c'))

df = (
    df
    .join(df2, 'key', how='inner')
    .join(df3, 'key', how='left')
    .drop('c')
)

Задавайте конкретные условия при обработке и упорядочивании значений Null


ignorenulls=True
F.asc_nulls_last('num')

Избегайте пустой функции PartitionBy


Не должно быть пустых окон, поскольку из-за них Spark объединяет все данные в одну партицию и это может чрезвычайно снизить производительность.

# bad
w = W.partitionBy()
df = df.select(F.sum('num').over(w).alias('sum'))

# good
df = df.agg(F.sum('num').alias('sum'))
<h2>Оконная функция |. Используйте либо фреймы row, либо фреймы range</h2>
# bad
w1 = W.partitionBy('key')
w2 = W.partitionBy('key').orderBy('num')

# good
w1 = W.partitionBy('key').orderBy('num').rowsBetween(W.unboundedPreceding, 0)
w2 = W.partitionBy('key').orderBy('num').rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

Select для указания схемы


Используйте Select как операцию очистки для подготовки датафрейма, который будет использован:

  • в начале преобразования,
  • перед выводом результата.

Другие рекомендации


  1. Не включайте избыточные или ненужные столбцы в инструкцию select.
  2. Не используйте литеральные строки или целые числа в условиях фильтров, в новых значениях столбцов и т. п. Вместо этого присвойте их информативной переменной.
  3. Каждый раз, когда сценарием использования предусмотрено кэширование всех данных, рекомендуется использовать df.cache.count(). После завершения использования объекта выполните unpersist.
  4. По возможности не используйте циклы для датасетов.
  5. Отделяйте файл конфигурации Spark от кода.
  6. Отделяйте конфигурацию Spark Job с помощью файла .ini.
  7. Используйте класс JobContext, чтобы определять аккумуляторы, счетчики, общие данные и т. п.
  8. Отделяйте загрузку и сохранение данных от любой тематической или бизнес-логики.
  9. Не используйте функцию print для отладки, используйте логи и логирование.
Присоединяйтесь к телеграм-каналу «Данные на стероидах». В нем вы найдете все об инструментах и подходах к извлечению максимальной пользы из работы с данными: регулярные дайджесты, полезные статьи, а также анонсы конференций и вебинаров.

Комментарии (1)


  1. brake
    09.09.2023 06:46

    Не соединяйте преобразования разных типов в цепочку

    почему?

    Каждый раз, когда сценарием использования предусмотрено кэширование всех данных, рекомендуется использовать df.cache.count()

    Если так не сделать, а сделать df.cache() то что? Мы тут как бы «взламываем» ленивость Spark.

    Избегайте right-операций join

    если нет, то что за это будет?