Команда VK Cloud перевела статью о правилах оформления кода в PySpark. Они не обязательны для исполнения, но помогут сделать ваш код более читабельным и удобным для последующих проверок и изменений.

Импортируйте в переменную


Это помогает не загрязнять глобальное пространство имен и избегать конфликтов методов из разных пакетов.

from pyspark.sql import functions as F
from pyspark.sql import types as T

Рефракторьте сложные логические операции


В сложных запросах трудно разобраться. Лучше разбить их на логические группы и потом объединить.

# Bad
F.when( (F.col('prod_status') == 'Delivered') | (((F.datediff('deliveryDate_actual', 'current_date') < 0) & ((F.col('currentRegistration') != '') | ((F.datediff('deliveryDate_actual', 'current_date') < 0) & ((F.col('originalOperator') != '') | (F.col('currentOperator') != '')))))), 'In Service')

# Good
has_operator = ((F.col('originalOperator') != '') | (F.col('currentOperator') != ''))
delivery_date_passed = (F.datediff('deliveryDate_actual', 'current_date') < 0)
has_registration = (F.col('currentRegistration').rlike('.+'))
is_delivered = (F.col('prod_status') == 'Delivered')
is_active = (has_registration | has_operator)

F.when(is_delivered | (delivery_date_passed & is_active), 'In Service')

В примере выше мы сгруппировали условие оператора в единое целое. Здесь же мы сначала отделили друг от друга и потом объединили статус доставки, регистрацию, активность и другие условия. В этом случае выражение F.when получается лаконичным.

Откажитесь от withColumnRenamed


Если возможно, используйте alias, а не withColumnRenamed.

#bad
df.select('key', 'comments').withColumnRenamed('comments', 'num_comments')

# good
df.select('key', F.col('comments').alias('num_comments'))

Откажитесь от withColumn


В некоторых случаях для преобразования типов можно отказаться от метода withColumn и использовать select, а потом cast.

# bad
df.select('comments').withColumn('comments', F.col('comments').cast('double'))

# good
df.select(F.col('comments').cast('double'))

Используйте lit(None)


Лучше добавить None, чем оставить поле пустым или указать NA. Тогда мы можем использовать утилиты вроде isNull вместо проверки пустых строк, нулевых значений, 'NA' и т. п.

# bad
df = df.withColumn('foo', F.lit(''))
df = df.withColumn('foo', F.lit('NA'))

# good
df = df.withColumn('foo', F.lit(None))

Используйте явные операции join


# bad
flights = flights.join(aircraft, 'aircraft_id')
flights = flights.join(aircraft, 'aircraft_id', 'inner')

# good
flights = flights.join(aircraft, 'aircraft_id', how='inner')

Избегайте right-операций join


# bad
flights = aircraft.join(flights, 'aircraft_id', how='right')

# good
flights = flights.join(aircraft, 'aircraft_id', how='left')

Разберитесь с похожими столбцами


import pyspark.pandas as ps
psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
psdf.columns = ["a", "a"]

Reference 'a' is ambiguous, could be: a, a.

  • При соединении таблиц заранее избавляйтесь от похожих столбцов, если в DF есть столбцы с одинаковыми именами.
  • Если нам нужны оба столбца, лучше всего переименовать один из них до соединения.
  • Разбираться с проблемными столбцами всегда нужно до формирования датасета. После окончания преобразования вы уже не сможете их различить.
  • А еще не стоит задавать имена столбцов, чувствительные к регистру.

Используйте круглые скобки, чтобы избежать \ (явных разрывов строки)


# bad
df = df.filter(F.col('event') == 'executing')\
    .filter(F.col('has_tests') == True)\
    .drop('has_tests')

# good
df = (
  df
  .filter(F.col('event') == 'executing')
  .filter(F.col('has_tests') == True)
  .drop('has_tests')
)

Не соединяйте преобразования разных типов в цепочку


# bad
df = (
    df
    .select('a', 'b', 'c', 'key')
    .filter(F.col('a') == 'truthiness')
    .withColumn('boverc', F.col('b') / F.col('c'))
    .join(df2, 'key', how='inner')
    .join(df3, 'key', how='left')
    .drop('c')
)

# better (separating into steps)
# first: we select and trim down the data that we need
# second: we create the columns that we need to have
# third: joining with other dataframes

df = (
    df
    .select('a', 'b', 'c', 'key')
    .filter(F.col('a') == 'truthiness')
)

df = df.withColumn('boverc', F.col('b') / F.col('c'))

df = (
    df
    .join(df2, 'key', how='inner')
    .join(df3, 'key', how='left')
    .drop('c')
)

Задавайте конкретные условия при обработке и упорядочивании значений Null


ignorenulls=True
F.asc_nulls_last('num')

Избегайте пустой функции PartitionBy


Не должно быть пустых окон, поскольку из-за них Spark объединяет все данные в одну партицию и это может чрезвычайно снизить производительность.

# bad
w = W.partitionBy()
df = df.select(F.sum('num').over(w).alias('sum'))

# good
df = df.agg(F.sum('num').alias('sum'))
<h2>Оконная функция |. Используйте либо фреймы row, либо фреймы range</h2>
# bad
w1 = W.partitionBy('key')
w2 = W.partitionBy('key').orderBy('num')

# good
w1 = W.partitionBy('key').orderBy('num').rowsBetween(W.unboundedPreceding, 0)
w2 = W.partitionBy('key').orderBy('num').rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

Select для указания схемы


Используйте Select как операцию очистки для подготовки датафрейма, который будет использован:

  • в начале преобразования,
  • перед выводом результата.

Другие рекомендации


  1. Не включайте избыточные или ненужные столбцы в инструкцию select.
  2. Не используйте литеральные строки или целые числа в условиях фильтров, в новых значениях столбцов и т. п. Вместо этого присвойте их информативной переменной.
  3. Каждый раз, когда сценарием использования предусмотрено кэширование всех данных, рекомендуется использовать df.cache.count(). После завершения использования объекта выполните unpersist.
  4. По возможности не используйте циклы для датасетов.
  5. Отделяйте файл конфигурации Spark от кода.
  6. Отделяйте конфигурацию Spark Job с помощью файла .ini.
  7. Используйте класс JobContext, чтобы определять аккумуляторы, счетчики, общие данные и т. п.
  8. Отделяйте загрузку и сохранение данных от любой тематической или бизнес-логики.
  9. Не используйте функцию print для отладки, используйте логи и логирование.
Сейчас наша команда готовит к релизу облачный сервис Spark в K8s, подписывайтесь на телеграм-канал «Данные на стероидах», чтобы не пропустить анонс!

Комментарии (1)


  1. vba
    12.09.2023 12:49

    Нда, вроде перевод этого гайда уже есть где-то на хабре. Меня занятно улыбнуло:

    # good
    df = (
      df .... 
    )
    

    А что хорошего в экономии переменных? Это как то повышает читаемость кода, когда вы постоянно перезаписываете одну и ту же переменную, да еще с таким говорящим названием df?