Ремарка
Текущая реализация представляет собой сырой прототип, направленный исключительно на демонстрацию возможности отображения логического плана из Apache Spark
в Apache Atlas
. Lfyysq прототип, по сути, является «прототипом прототипа» и служит лишь начальной отправной точкой для более глубокого анализа и разработки.
В данной работе Автор не стремимся представить окончательное или оптимальное решение. основной фокус заключается в демонстрации принципа и наметке необходимых методов для интеграции логических планов с метаданными в Apache Atlas
.
Автор не призываем использовать данный подход в производственной среде в его текущем виде. Для полноценного решения задачи требуется дальнейшая проработка, включая создание специализированных библиотек, улучшение архитектуры. И все прочие прочие ...
Цель работы
Целью данной работы является создание прототипа, демонстрирующего возможность интеграции логических планов Apache Spark с метаданными в Apache Atlas
, подобно тому как это происходит в данной статье с Apache NIFI
.
Тестовая задача для иллюстрации и парсинг плана в AST
Определим небольшой файл cars.csv
со следующим содержанием:
model,manufacturer
Model S,Tesla
Model 3,Tesla
Mustang,Ford
Civic,Honda
И напишем даг выведем его логический план:
val spark = SparkSession.builder()
.appName("Logical Plan Example")
.master("local")
.getOrCreate()
import spark.implicits._
val carsCSV = spark
.read
.option("header", "true")
.csv("src/main/resources/cars.csv")
val carsSeq = List(
("i8", "BMW"),
("A4", "Audi"),
("911", "Porsche"),
("Corolla", "Toyota")
).toDF("model", "manufacturer")
val unioncars = carsCSV.union(carsSeq)
val resDF = unioncars
.where(col("manufacturer") =!= "Audi")
.select("model", "manufacturer")
.withColumn("processedDDTM", lit(LocalDateTime.now()))
val logicalPlan = resDF.queryExecution.logical
println(logicalPlan)
/* вывод
Project [model#17, manufacturer#18, 2024-09-12 13:00:46.880141 AS processedDDTM#36]
+- Project [model#17, manufacturer#18]
+- Filter NOT (manufacturer#18 = Audi)
+- Union false, false
:- Relation [model#17,manufacturer#18] csv
+- Project [_1#23 AS model#28, _2#24 AS manufacturer#29]
+- LocalRelation [_1#23, _2#24]
*/
}
Логический план представляет собой дерево, и для дальнейшей работы его необходимо преобразовать в удобную форму (AST
).
Для этого мы определим класс AST
, который будет отражать структуру плана в формате, удобном для последующей обработки.
// Определение корневого класса или типа для всех узлов дерева
sealed trait Node {
// Метод для получения имени узла на основе его типа
def getName: String = this.getClass.toString
}
// Узел типа "Проект", содержащий последовательность столбцов
case class ProjectNode(columns: Seq[String]) extends Node {
// Переопределение метода getName для возврата конкретного имени узла
override def getName: String = "Project"
}
// Узел типа "Фильтр", содержащий условие фильтрации
case class FilterNode(condition: String) extends Node {
// Переопределение метода getName для возврата конкретного имени узла
override def getName: String = "Filter"
}
// Узел типа "Объединение", указывающий, следует ли объединять все записи и по какому признаку
case class UnionNode(isAll: Boolean, byName: Boolean) extends Node {
// Переопределение метода getName для возврата конкретного имени узла
override def getName: String = "Union"
}
// Узел типа "Логическое отношение", содержащий последовательность столбцов
case class LogicalRelationNode(columns: Seq[String]) extends Node {
// Переопределение метода getName для возврата конкретного имени узла
override def getName: String = "LogicalRelation"
}
case class LocalRelationNode(columns: Seq[String]) extends Node {
override def getName: String = "LocalRelation"
}
// Узел типа "Локальное отношение", содержащий последовательность столбцов
case class LocalRelationNode(columns: Seq[String]) extends Node {
// Переопределение метода getName для возврата конкретного имени узла
override def getName: String = "LocalRelation"
}
// Класс для представления абстрактного синтаксического дерева (AST), где каждый узел имеет тип Node,
// список дочерних узлов, номер уровня и выражение уровня (необходим для индонтефикации нод на одном уровне)
case class AST(node: Node,
children: Seq[AST],
level_num: Int,
levelExpr: String)
И напишем парсер из логического плана в AST
// Объект для парсинга логических планов в AST
object ParserAST {
// Функция для преобразования логического плана в AST
// Возвращает Option[AST], где None означает, что план не может быть преобразован
private def parseAST(plan: LogicalPlan): Option[AST] = {
// Рекурсивная функция для обхода логического плана и создания узлов AST
// Параметры:
// - logicalPlan: текущий логический план для обработки
// - levelnum: уровень в дереве AST
// - levelExpr: строковое представление уровня и индекса
// Возвращает Option[AST], где None означает, что логический план не может быть преобразован
def loop(logicalPlan: LogicalPlan, levelnum: Int, levelExpr: String): Option[AST] = {
// Определение узла на основе типа логического плана
val node: Option[Node] = logicalPlan match {
case p: Project =>
// Обработка узла типа Project и создание узла AST с именем "Project"
val columns = p.projectList.map(_.sql)
Some(ProjectNode(columns))
case f: Filter =>
// Обработка узла типа Filter и создание узла AST с именем "Filter"
val condition = f.condition.sql
Some(FilterNode(condition))
case u: Union =>
// Обработка узла типа Union и создание узла AST с именем "Union"
val isAll = u.allowMissingCol
val byName = u.byName
Some(UnionNode(isAll, byName))
case lr: LocalRelation =>
// Обработка узла типа LocalRelation и создание узла AST с именем "LocalRelation"
val columns = lr.output.map(_.sql)
Some(LocalRelationNode(columns))
case lr: LogicalRelation =>
// Обработка узла типа LogicalRelation и создание узла AST с именем "LogicalRelation"
val columns = lr.output.map(_.sql)
Some(LogicalRelationNode(columns))
case _ =>
// Если логический план не совпадает ни с одним из известных типов, возвращаем None
None
}
// Если узел успешно создан, создаем AST и рекурсивно обрабатываем детей
node.map { n =>
// Создание списка дочерних узлов AST, рекурсивно обрабатывая каждый дочерний план
val children = logicalPlan.children.zipWithIndex.flatMap {
case (ch, i) => loop(ch, levelnum + 1, f"${levelnum + 1}_${i}")
}.toList
// Создание узла AST с текущим узлом и его дочерними узлами
AST(n, children, levelnum, levelExpr)
}
}
// Запуск рекурсивного обхода с начальным уровнем и строковым представлением
loop(plan, 1, "1_0")
}
// Неявное преобразование для класса LogicalPlan, добавляющее метод для получения AST
implicit class parser(lp: LogicalPlan) {
def AST(): Option[AST] = {
parseAST(lp)
}
}
}
теперь можно получать AST
следующим образом logicalPlan.AST().get
Определим сущности в Атласе для построения Lianage
Подобно тому, как в языках программирования на базе Java все классы наследуются от Object
, в Apache Atlas все сущности наследуются от Referenceable
. Однако построение lineage (линейности данных) происходит только для типов Process
и DataSet
. Если тип не наследуется от одного из этих классов (например, если наследование происходит от Asset
), то кнопка "Lineage" попросту не появится.
Кроме того, сам lineage строится на основе полей inputs
и outputs
для Process
, аналогично и для DataSet
. Здесь ничего не поделаешь — придется наследоваться от этих типов, хотя большинство полей будет оставаться пустыми.
Изначально моей целью было отразить преобразования, происходящие в Apache Spark
, но структура Apache Atlas
вынуждает окружать мои Process
сущностями DataSet
в полях inputs
и outputs
. Хотя меня изначально интересовали только Process
, эти DataSet
-ы могут быть использованы для отображения схем данных, с которыми процесс начинается и которые возвращает. Однако на данном этапе я не планирую парсить схемы и оставлю каждый DataSet
пустым.
В Apache Atlas кастомные сущности можно описывать с помощью формата JSON. При этом важно соблюдать правильную последовательность определения типов, иначе возникнет ошибка 404 при попытке сослаться на тип, который еще не существует в системе.
Сначала определим тип для DataSet
.
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "pico_spark_data_type",
"description": "A type inheriting from assets for Pico DataSet",
"superTypes": ["DataSet"],
"attributeDefs": [],
"relationshipDefs": []
}
],
"relationshipDefs": [],
"businessMetadataDefs": []
}
Комментарии:
-
enumDefs
,structDefs
,classificationDefs
:Пустые массивы, так как перечисления, структуры и классификации не используются.
-
entityDefs
:Определяет сущности в системе.
name
: Имя сущности, которая представляет тип данных.description
: Описание сущности.superTypes
: Суперклассы, от которых наследуется данная сущность.attributeDefs
: Пустой массив, так как атрибуты не указаны.relationshipDefs
: Пустой массив, так как связи не определены.
-
relationshipDefs
,businessMetadataDefs
:Пустые массивы, так как глобальные определения отношений и бизнес-метаданные не заданы.
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "pico_spark_process_type",
"description": "A type inheriting from assets for Pico Spark abstraction",
"superTypes": ["Process"],
"attributeDefs": [
{
"name": "inputs",
"description": "List of inputs for the process",
"typeName": "array<pico_spark_data_type>",
"isOptional": true
},
{
"name": "outputs",
"description": "List of outputs for the process",
"typeName": "array<pico_spark_data_type>",
"isOptional": true
}
],
"relationshipDefs": []
}
],
"relationshipDefs": [],
"businessMetadataDefs": []
}
Комментарии:
-
enumDefs
,structDefs
,classificationDefs
:Пустые массивы, так как перечисления, структуры и классификации не используются в данном определении.
-
entityDefs
:Содержит определения сущностей.
name
: Имя сущности, определяющей тип данных в контексте Pico Spark.description
: Описание сущности.superTypes
: Суперклассы, от которых сущность наследуется.attributeDefs
: Пустой массив, так как атрибуты не добавлены.relationshipDefs
: Пустой массив, так как связи не указаны.
-
relationshipDefs
,businessMetadataDefs
:Пустые массивы, так как глобальные определения отношений и бизнес-метаданные не заданы.
Для типа pico_spark_process_type
я также создаю наследников для всех типов узлов (Filter, Project, Union и т.д.) в AST
. Однако здесь я опущу это, поскольку это займет слишком много места и будет слишком однообразно.
В этих JSON-ах много пустых сущностей, но без них не обойтись, так как без них типы в Apache Atlas не создаются.
Взаимодействие с Apache Atlas по REST
Простого описания сущностей недостаточно — их нужно передать в Apache Atlas
. У Atlas есть обширное REST API
для взаимодействия с системой. Конкретно процесс создания нового типа выглядит следующим образом:
curl -X POST "http://<atlas-server-url>/api/atlas/v2/types/typedefs" \
-H "Content-Type: application/json" \
-H "Accept: application/json" \
-d '{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "pico_spark_data_type",
"description": "A type inheriting from assets for Pico DataSet",
"superTypes": ["DataSet"],
"attributeDefs": [],
"relationshipDefs": []
}
],
"relationshipDefs": [],
"businessMetadataDefs": []
}'
создаю JSON
файл где будут перечислены тела запросов для всх необходимых кастомных типов под названием EntityTypes.json
и создам метод который читает этот файл и делает запрос на каждый EntityType
val atlasServerUrl = "http://localhost:21000/api/atlas/v2"
val authHeader: String = "Basic " + java.util.Base64.getEncoder.encodeToString("admin:admin".getBytes)
def generatePicoSparkTypes(): Unit = {
// Функция для чтения содержимого файла из ресурсов
def readFileFromResources(fileName: String): String = {
val source = Source.fromResource(fileName)
try source.mkString
finally source.close()
}
// Чтение JSON из файла ресурсов
val jsonString = readFileFromResources("EntityTypes.json")
// Попытка разобрать строку JSON в структуру данных
val parsedJson: Either[ParsingFailure, Json] = parse(jsonString)
// Преобразование разобранного JSON в список объектов JSON
val jsonObjects: Option[List[Json]] = parsedJson match {
case Right(json) =>
json.as[List[Json]] match {
case Right(jsonArray) =>
Some(jsonArray)
case Left(error) =>
// Обработка ошибки разбора массива JSON
println(s"Error parsing JSON array: $error")
None
}
case Left(error) =>
// Обработка ошибки разбора JSON
println(s"Error parsing JSON: $error")
None
}
// Отправка каждого объекта JSON на сервер Atlas
jsonObjects match {
case Some(jsonArray) =>
jsonArray.foreach { jsonBody =>
// Создание POST-запроса для создания типа в Apache Atlas
val createTypeRequest = basicRequest
.method(Method.POST, uri"$atlasServerUrl/types/typedefs") // Метод POST и URL для запроса
.header("Authorization", authHeader) // Заголовок авторизации
.header("Content-Type", "application/json") // Заголовок типа содержимого
.header("Accept", "application/json") // Заголовок для принятия ответа в формате JSON
.body(jsonBody.noSpaces) // Тело запроса с JSON-данными
.response(asString) // Ожидание ответа в формате строки
// Отправка запроса и вывод результата
val response = createTypeRequest.send(backend)
println(response.body) // Печать тела ответа
println(response.code) // Печать кода ответа
}
case None =>
// Сообщение, если JSON-объекты не были найдены
println("No JSON objects found.")
}
}
комментарии:
readFileFromResources
: Функция для чтения содержимого файла JSON из ресурсов.jsonString
: Получение строки JSON из файла.parsedJson
: Попытка разобрать строку JSON в структуру данныхJson
.jsonObjects
: Преобразование разобранного JSON в список объектов JSON.jsonArray.foreach
: Для каждого объекта JSON создается и отправляется POST-запрос на сервер Atlas.createTypeRequest
: Создание POST-запроса с JSON-данными для создания типов в Apache Atlas.response
: Отправка запроса и вывод результата, включая тело ответа и код ответа.
теперь для создания всех энтити в Apache Atlas
достаточно вызвать метод generatePicoSparkTypes()
Поскольку DataSet сущности уже созданы, можно сразу приступить к созданию Process сущностей с заполненными полями inputs
и outputs
. Это важно, так как при попытках обновления сущностей через API ничего не сработало. Начнем с определения набора методов:
как видим все EntityType
созданы
Создаем DataSet Entity
Перед тем как создавать сущности процессов нужно создать сущности DataSet-тов, поскольку первые ссылаются на вторые
На данном уже определен pico_spark_data_type
который отвечает за входные / выходные схемы данных.
Для начала определимся с двумя вспомогательными методами
/**
* Создает функцию для отправки JSON данных на указанный эндпоинт в Apache Atlas.
*
* @param postfix Строка, добавляемая к базовому URL для формирования полного URL эндпоинта.
* @return Функция, принимающая JSON строку и отправляющая ее на сервер через HTTP POST запрос.
*/
def senderJsonToAtlasEndpoint(postfix: String): String => Unit = {
jsonBody => {
// Создание HTTP POST запроса для отправки JSON данных на сервер
val createTypeRequest = basicRequest
.method(Method.POST, uri"$atlasServerUrl/${postfix}")
.header("Authorization", authHeader)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.body(jsonBody)
.response(asString)
// Отправка запроса и получение ответа
val response = createTypeRequest.send(backend)
// Вывод тела ответа и кода статуса
println(response.body)
println(response.code)
}
}
/**
* Генерирует и отправляет сущности данных Spark в Apache Atlas для указанного домена.
*
* @param domain Домен, который будет использоваться в атрибутах сущностей.
* @param execJsonAtlas Функция для отправки JSON данных в Apache Atlas.
* @return Функция, принимающая AST и создающая JSON для каждой дочерней сущности.
*/
def generateSparkDataEntities(domain: String, execJsonAtlas: String => Unit): AST => Unit = {
// Локальная функция для генерации и отправки сущностей данных Spark
def generateEntities(ast: AST): Unit = {
ast.children.foreach { inast =>
// Формирование JSON тела для сущности данных Spark
val jsonBody =
f"""
|{
| "entity": {
| "typeName": "pico_spark_data_type",
| "attributes": {
| "domain": "${domain}",
| "qualifiedName": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
| "name": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
| "description": "A description for the spark_data"
| }
| }
|}
|""".stripMargin
// Отправка сформированного JSON тела на сервер
execJsonAtlas(jsonBody)
// Рекурсивный вызов для обработки дочерних узлов
generateEntities(inast)
}
}
// Возвращаем функцию для генерации сущностей
generateEntities
}
Пояснения
senderJsonToAtlasEndpoint
: Эта функция создает и возвращает другую функцию, которая отправляет JSON данные на указанный эндпоинт в Apache Atlas. Комментарии объясняют параметры, создание запроса, отправку и обработку ответа.generateSparkDataEntities
: Эта функция генерирует сущности данных Spark, формирует соответствующий JSON и отправляет его в Apache Atlas, используя переданную функцию для отправки. Комментарии описывают параметры и внутреннюю логику функции, включая рекурсивный вызов для обработки всех дочерних узлов.
Напишем еще 2 метода для запуска формирования Linage В Atlas
/**
* Преобразует AST (абстрактное синтаксическое дерево) в сущности Apache Atlas и отправляет их на сервер.
*
* @param ast Абстрактное синтаксическое дерево, представляющее структуру данных.
* @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
* @param topLevelExpr Выражение уровня, используемое для определения уровня в AST. В данном случае не используется.
*/
def ASTToAtlasEntity(ast: AST, domain: String, topLevelExpr: String): Unit = {
// Создание функции для отправки JSON данных на эндпоинт "entity" в Apache Atlas
val entitySender = senderJsonToAtlasEndpoint("entity")
// Создание функции для генерации сущностей данных Spark и отправки их в Apache Atlas
val sparkDataEntityGenerator = generateSparkDataEntities(domain, entitySender)
// Создание базовых сущностей вывода и отправка их на сервер
//ее реализацию опущу
createBaseOutput(domain, entitySender)
// Создание базовых сущностей ввода и отправка их на сервер
//ее реализацию опущу
createBaseInput(domain, entitySender)
// Генерация и отправка сущностей данных Spark на основе AST
sparkDataEntityGenerator(ast)
}
/**
* Имплементация расширения для преобразования AST в сущности Apache Atlas.
*
* @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
*/
implicit class converter(ast: AST) {
/**
* Преобразует текущее AST в сущности Apache Atlas и отправляет их на сервер.
*
* @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
*/
def EntityToAtlas(domain: String): Unit = {
ASTToAtlasEntity(ast, domain, "")
}
}
Пояснения:
ASTToAtlasEntity
: Этот метод преобразует переданное AST в сущности Apache Atlas и отправляет их на сервер. Он использует вспомогательные функции для создания базовых сущностей и генерации сущностей данных Spark, а также отправляет их на сервер через созданную функциюentitySender
.EntityToAtlas
: Это метод расширения (implicit class) для типаAST
, который упрощает вызов методаASTToAtlasEntity
с дефолтным значением дляtopLevelExpr
. Этот метод предоставляет удобный способ преобразования AST в сущности Apache Atlas, используя указанный домен.
Теперь при запуске ast.EntityToAtlas("picoDomain")
В атласе появляется data entity
так как DataSet
Entity
уже созданы, то можно создавать Process
Entity
сразу с заролнеными inputs
и outputs
, это важно поскольку сколько я не тыкалась в Api для обновления Entuty
ничего не работало.
начнем с того что определим пачку методов:
// Создает функцию для отправки сущностей в Apache Atlas
// Использует функцию преобразования AST в JSON и функцию отправки JSON
def senderEntity(nodeToAtlasCreateEntityJson: (AST, String) => String, execJsonAtlas: String => Unit): (AST, String) => Unit = {
// Возвращает функцию, которая преобразует AST в JSON и отправляет его в Atlas
(ast: AST, topLevelExpr: String) => {
val jsonBody = nodeToAtlasCreateEntityJson(ast, topLevelExpr)
execJsonAtlas(jsonBody)
}
}
// Генерирует JSON для сущностей в Atlas на основе AST и уровня
// Определяет JSON для различных типов узлов, таких как ProjectNode, FilterNode и т.д.
def generatotrProcessEntity(domain: String, qualifiedName: (Node, String) => String): (AST, String) => String = {
(ast: AST, topLevelExpr: String) => {
val node = ast.node
// Создает список входных сущностей, если есть дочерние элементы
val inputs = if (ast.children.nonEmpty) {
ast.children.map(_.levelExpr).map { expr =>
f"""
|
|{
| "typeName": "pico_spark_data_type",
| "uniqueAttributes": {
| "qualifiedName": "pico_spark_data_${ast.levelExpr}-${expr}@${domain}"
| }
|}
|
|""".stripMargin
}.mkString(", ")
} else {
f"""
| {
| "typeName": "pico_spark_data_type",
| "uniqueAttributes": {
| "qualifiedName": "pico_spark_data_input@${domain}"
| }
| }
|""".stripMargin
}
// Создает JSON для выходных сущностей, если задан topLevelExpr
val output = if (topLevelExpr.nonEmpty) {
f"""
| {
| "typeName": "pico_spark_data_type",
| "uniqueAttributes": {
| "qualifiedName": "pico_spark_data_${topLevelExpr}-${ast.levelExpr}@${domain}"
| }
| }
|""".stripMargin
} else {
f"""
| {
| "typeName": "pico_spark_data_type",
| "uniqueAttributes": {
| "qualifiedName": "pico_spark_data_output@${domain}"
| }
| }
|""".stripMargin
}
// Определяет JSON для различных типов узлов, таких как ProjectNode, FilterNode и т.д.
node match {
case p: ProjectNode =>
f"""
|{
|"entity": {
| "typeName": "pico_spark_project_type",
| "attributes": {
| "qualifiedName": "${qualifiedName(node, ast.levelExpr)}",
| "name": "pico_project_${ast.levelExpr}",
| "description": "This is an project for the pico_spark_project_type",
| "columns": [${p.columns.map(col => "\"" + col + "\"").mkString(", ")}],
| "inputs":[ ${inputs} ],
| "outputs":[ ${output} ]
| }
| }
|}
|""".stripMargin
case ...
}
}
}
// Создает функцию для генерации и отправки сущностей в Apache Atlas
// Использует предоставленные функции для создания JSON и отправки его в Atlas
def generatorDataEntities(domain: String, execJsonAtlas: String => Unit): AST => Unit = {
def sparkDataEntitys(ast: AST): Unit = {
ast.children.foreach { inast =>
val jsonBody =
f"""
|{
| "entity": {
| "typeName": "pico_spark_data_type",
| "attributes": {
| "domain": "${domain}",
| "qualifiedName": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
| "name": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
| "description": "A description for the spark_data"
| }
| }
|}
|""".stripMargin
execJsonAtlas(jsonBody)
sparkDataEntitys(inast)
}
}
// Возвращает функцию, которая генерирует и отправляет сущности данных для Spark
sparkDataEntitys
}
Пояснения:
senderEntity
: Функция, которая создает и отправляет JSON для сущностей в Apache Atlas, используя предоставленные функции преобразования и отправки.generatotrProcessEntity
: Функция, которая генерирует JSON для различных типов узлов в AST и преобразует их в формат, пригодный для Apache Atlas.generatorDataEntities
: Функция, которая создает и отправляет данные сущностей для Spark, рекурсивно обрабатывая детей узлов в AST.
И обновляем методы для работы с AST
// Преобразует AST в сущности Apache Atlas и отправляет их на указанный эндпоинт
def ASTToAtlasEntity(ast: AST, domain: String): Unit = {
// Создает функцию отправки JSON-данных для сущностей в Apache Atlas
val entitySender = senderJsonToAtlasEndpoint("entity")
// Создает функцию для генерации квалифицированного имени
val qualifiedName = generatorQualifiedName(domain)
// Создает функцию для генерации JSON-сущностей для процессов
val generatorProcessEntity = generatotrProcessEntity(domain, qualifiedName)
// Создает функцию для отправки JSON-данных сущностей в Atlas
val sendEntity = senderEntity(generatorProcessEntity, entitySender)
// Создает функцию для генерации данных сущностей и отправки их в Atlas
val generateDataEntity = generatorDataEntities(domain, entitySender)
// Обрабатывает один узел AST, отправляя его как сущность в Atlas
def processNode(ast: AST, intopLevelExpr: String): Unit = {
sendEntity(ast, intopLevelExpr)
}
// Рекурсивно проходит по всему дереву AST, обрабатывая каждый узел
def traverseAST(ast: AST, intopLevelExpr: String): Unit = {
processNode(ast, intopLevelExpr)
ast.children.foreach(ch => traverseAST(ch, ast.levelExpr))
}
// Создает базовые выходные и входные сущности для указанного домена и отправляет их в Atlas
createBaseOutput(domain, entitySender)
createBaseInput(domain, entitySender)
// Генерирует данные сущностей для AST и отправляет их в Atlas
generateDataEntity(ast)
// Запускает рекурсивное прохождение AST
traverseAST(ast, "")
}
// Обогащает класс AST функцией для преобразования его в сущности Apache Atlas
implicit class converter(ast: AST) {
// Преобразует текущий узел AST в сущности Apache Atlas и отправляет их на указанный эндпоинт
def EntityToAtlas(domain: String): Unit = {
ASTToAtlasEntity(ast, domain)
}
}
Пояснения:
-
ASTToAtlasEntity
: Основной метод, который:Создает функции для преобразования AST в JSON и отправки его в Apache Atlas.
Определяет вспомогательные функции для обработки узлов AST и рекурсивного обхода дерева.
Создает и отправляет базовые сущности (входные и выходные) в Atlas.
Рекурсивно проходит по дереву AST и отправляет каждую сущность в Atlas.
-
implicit class converter(ast: AST)
: Обогащает классAST
, добавляя метод для преобразования AST в сущности Apache Atlas.EntityToAtlas
: Использует методASTToAtlasEntity
для преобразования текущего узлаAST
в сущности Atlas и отправки их в указанный домен.
Теперь после запуска В Apache Atlas таки появиться Linage
Что ж, на изначальный logical план вроде похоже
Project [model#17, manufacturer#18, 2024-09-12 16:57:34.046609 AS processedDDTM#36]
+- Project [model#17, manufacturer#18]
+- Filter NOT (manufacturer#18 = Audi)
+- Union false, false
:- Relation [model#17,manufacturer#18] csv
+- Project [_1#23 AS model#28, _2#24 AS manufacturer#29]
+- LocalRelation [_1#23, _2#24]
P.S. код можно глянуть тут
P.P.S докер фалы для запуска Apache Atlas
можно взять тут
dolfinus
А что даст такое хранение плана dataframe в Atlas? В реальных ETL процессах постоянно происходит какие-то изменения, план сложно назвать постоянным. Задача была в изучении того, как планы меняются во времени?