Команда VK Cloud перевела статью о правилах оформления кода в PySpark. Они не обязательны для исполнения, но помогут сделать ваш код более читабельным и удобным для последующих проверок и изменений.
Импортируйте в переменную
Это помогает не загрязнять глобальное пространство имен и избегать конфликтов методов из разных пакетов.
from pyspark.sql import functions as F
from pyspark.sql import types as T
Рефракторьте сложные логические операции
В сложных запросах трудно разобраться. Лучше разбить их на логические группы и потом объединить.
# Bad
F.when( (F.col('prod_status') == 'Delivered') | (((F.datediff('deliveryDate_actual', 'current_date') < 0) & ((F.col('currentRegistration') != '') | ((F.datediff('deliveryDate_actual', 'current_date') < 0) & ((F.col('originalOperator') != '') | (F.col('currentOperator') != '')))))), 'In Service')
# Good
has_operator = ((F.col('originalOperator') != '') | (F.col('currentOperator') != ''))
delivery_date_passed = (F.datediff('deliveryDate_actual', 'current_date') < 0)
has_registration = (F.col('currentRegistration').rlike('.+'))
is_delivered = (F.col('prod_status') == 'Delivered')
is_active = (has_registration | has_operator)
F.when(is_delivered | (delivery_date_passed & is_active), 'In Service')
В примере выше мы сгруппировали условие оператора в единое целое. Здесь же мы сначала отделили друг от друга и потом объединили статус доставки, регистрацию, активность и другие условия. В этом случае выражение
F.when
получается лаконичным.Откажитесь от withColumnRenamed
Если возможно, используйте alias, а не
withColumnRenamed
.#bad
df.select('key', 'comments').withColumnRenamed('comments', 'num_comments')
# good
df.select('key', F.col('comments').alias('num_comments'))
Откажитесь от withColumn
В некоторых случаях для преобразования типов можно отказаться от метода
withColumn
и использовать select
, а потом cast
.# bad
df.select('comments').withColumn('comments', F.col('comments').cast('double'))
# good
df.select(F.col('comments').cast('double'))
Используйте lit(None)
Лучше добавить
None
, чем оставить поле пустым или указать NA
. Тогда мы можем использовать утилиты вроде isNull
вместо проверки пустых строк, нулевых значений, 'NA'
и т. п.# bad
df = df.withColumn('foo', F.lit(''))
df = df.withColumn('foo', F.lit('NA'))
# good
df = df.withColumn('foo', F.lit(None))
Используйте явные операции join
# bad
flights = flights.join(aircraft, 'aircraft_id')
flights = flights.join(aircraft, 'aircraft_id', 'inner')
# good
flights = flights.join(aircraft, 'aircraft_id', how='inner')
Избегайте right-операций join
# bad
flights = aircraft.join(flights, 'aircraft_id', how='right')
# good
flights = flights.join(aircraft, 'aircraft_id', how='left')
Разберитесь с похожими столбцами
import pyspark.pandas as ps
psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
psdf.columns = ["a", "a"]
Reference 'a' is ambiguous, could be: a, a.
- При соединении таблиц заранее избавляйтесь от похожих столбцов, если в DF есть столбцы с одинаковыми именами.
- Если нам нужны оба столбца, лучше всего переименовать один из них до соединения.
- Разбираться с проблемными столбцами всегда нужно до формирования датасета. После окончания преобразования вы уже не сможете их различить.
- А еще не стоит задавать имена столбцов, чувствительные к регистру.
Используйте круглые скобки, чтобы избежать \ (явных разрывов строки)
# bad
df = df.filter(F.col('event') == 'executing')\
.filter(F.col('has_tests') == True)\
.drop('has_tests')
# good
df = (
df
.filter(F.col('event') == 'executing')
.filter(F.col('has_tests') == True)
.drop('has_tests')
)
Не соединяйте преобразования разных типов в цепочку
# bad
df = (
df
.select('a', 'b', 'c', 'key')
.filter(F.col('a') == 'truthiness')
.withColumn('boverc', F.col('b') / F.col('c'))
.join(df2, 'key', how='inner')
.join(df3, 'key', how='left')
.drop('c')
)
# better (separating into steps)
# first: we select and trim down the data that we need
# second: we create the columns that we need to have
# third: joining with other dataframes
df = (
df
.select('a', 'b', 'c', 'key')
.filter(F.col('a') == 'truthiness')
)
df = df.withColumn('boverc', F.col('b') / F.col('c'))
df = (
df
.join(df2, 'key', how='inner')
.join(df3, 'key', how='left')
.drop('c')
)
Задавайте конкретные условия при обработке и упорядочивании значений Null
ignorenulls=True
F.asc_nulls_last('num')
Избегайте пустой функции PartitionBy
Не должно быть пустых окон, поскольку из-за них Spark объединяет все данные в одну партицию и это может чрезвычайно снизить производительность.
# bad
w = W.partitionBy()
df = df.select(F.sum('num').over(w).alias('sum'))
# good
df = df.agg(F.sum('num').alias('sum'))
<h2>Оконная функция |. Используйте либо фреймы row, либо фреймы range</h2>
# bad
w1 = W.partitionBy('key')
w2 = W.partitionBy('key').orderBy('num')
# good
w1 = W.partitionBy('key').orderBy('num').rowsBetween(W.unboundedPreceding, 0)
w2 = W.partitionBy('key').orderBy('num').rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
Select для указания схемы
Используйте Select как операцию очистки для подготовки датафрейма, который будет использован:
- в начале преобразования,
- перед выводом результата.
Другие рекомендации
- Не включайте избыточные или ненужные столбцы в инструкцию
select
.
- Не используйте литеральные строки или целые числа в условиях фильтров, в новых значениях столбцов и т. п. Вместо этого присвойте их информативной переменной.
- Каждый раз, когда сценарием использования предусмотрено кэширование всех данных, рекомендуется использовать
df.cache.count()
. После завершения использования объекта выполнитеunpersist
.
- По возможности не используйте циклы для датасетов.
- Отделяйте файл конфигурации Spark от кода.
- Отделяйте конфигурацию Spark Job с помощью файла .ini.
- Используйте класс
JobContext
, чтобы определять аккумуляторы, счетчики, общие данные и т. п.
- Отделяйте загрузку и сохранение данных от любой тематической или бизнес-логики.
- Не используйте функцию
print
для отладки, используйте логи и логирование.
Присоединяйтесь к телеграм-каналу «Данные на стероидах». В нем вы найдете все об инструментах и подходах к извлечению максимальной пользы из работы с данными: регулярные дайджесты, полезные статьи, а также анонсы конференций и вебинаров.
brake
почему?
Если так не сделать, а сделать df.cache() то что? Мы тут как бы «взламываем» ленивость Spark.
если нет, то что за это будет?