Ремарка

Текущая реализация представляет собой сырой прототип, направленный исключительно на демонстрацию возможности отображения логического плана из Apache Spark в Apache Atlas. Lfyysq прототип, по сути, является «прототипом прототипа» и служит лишь начальной отправной точкой для более глубокого анализа и разработки.

В данной работе Автор не стремимся представить окончательное или оптимальное решение. основной фокус заключается в демонстрации принципа и наметке необходимых методов для интеграции логических планов с метаданными в Apache Atlas.

Автор не призываем использовать данный подход в производственной среде в его текущем виде. Для полноценного решения задачи требуется дальнейшая проработка, включая создание специализированных библиотек, улучшение архитектуры. И все прочие прочие ...

Цель работы

Целью данной работы является создание прототипа, демонстрирующего возможность интеграции логических планов Apache Spark с метаданными в Apache Atlas , подобно тому как это происходит в данной статье с Apache NIFI .

Тестовая задача для иллюстрации и парсинг плана в AST

Определим небольшой файл cars.csv со следующим содержанием:

model,manufacturer
Model S,Tesla
Model 3,Tesla
Mustang,Ford
Civic,Honda

И напишем даг выведем его логический план:

  val spark = SparkSession.builder()
    .appName("Logical Plan Example")
    .master("local")
    .getOrCreate()

  import spark.implicits._

  val carsCSV = spark
    .read
    .option("header", "true")
    .csv("src/main/resources/cars.csv")

  val carsSeq = List(
    ("i8", "BMW"),
    ("A4", "Audi"),
    ("911", "Porsche"),
    ("Corolla", "Toyota")
  ).toDF("model", "manufacturer")

  val unioncars = carsCSV.union(carsSeq)

  val resDF = unioncars
    .where(col("manufacturer") =!= "Audi")
    .select("model", "manufacturer")
    .withColumn("processedDDTM", lit(LocalDateTime.now()))

  val logicalPlan = resDF.queryExecution.logical

  println(logicalPlan)
/* вывод
    Project [model#17, manufacturer#18, 2024-09-12 13:00:46.880141 AS processedDDTM#36]
      +- Project [model#17, manufacturer#18]
         +- Filter NOT (manufacturer#18 = Audi)
            +- Union false, false
               :- Relation [model#17,manufacturer#18] csv
               +- Project [_1#23 AS model#28, _2#24 AS manufacturer#29]
                  +- LocalRelation [_1#23, _2#24]
   */
}

Логический план представляет собой дерево, и для дальнейшей работы его необходимо преобразовать в удобную форму (AST).

Для этого мы определим класс AST, который будет отражать структуру плана в формате, удобном для последующей обработки.

// Определение корневого класса или типа для всех узлов дерева
sealed trait Node {
  // Метод для получения имени узла на основе его типа
  def getName: String = this.getClass.toString
}

// Узел типа "Проект", содержащий последовательность столбцов
case class ProjectNode(columns: Seq[String]) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "Project"
}

// Узел типа "Фильтр", содержащий условие фильтрации
case class FilterNode(condition: String) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "Filter"
}

// Узел типа "Объединение", указывающий, следует ли объединять все записи и по какому признаку
case class UnionNode(isAll: Boolean, byName: Boolean) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "Union"
}

// Узел типа "Логическое отношение", содержащий последовательность столбцов
case class LogicalRelationNode(columns: Seq[String]) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "LogicalRelation"
}

case class LocalRelationNode(columns: Seq[String]) extends Node {
  override def getName: String = "LocalRelation"
}

// Узел типа "Локальное отношение", содержащий последовательность столбцов
case class LocalRelationNode(columns: Seq[String]) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "LocalRelation"
}

// Класс для представления абстрактного синтаксического дерева (AST), где каждый узел имеет тип Node,
// список дочерних узлов, номер уровня и выражение уровня (необходим для индонтефикации нод на одном уровне)
case class AST(node: Node,
               children: Seq[AST],
               level_num: Int,
               levelExpr: String)

И напишем парсер из логического плана в AST

// Объект для парсинга логических планов в AST
object ParserAST {

  // Функция для преобразования логического плана в AST
  // Возвращает Option[AST], где None означает, что план не может быть преобразован
  private def parseAST(plan: LogicalPlan): Option[AST] = {

    // Рекурсивная функция для обхода логического плана и создания узлов AST
    // Параметры:
    // - logicalPlan: текущий логический план для обработки
    // - levelnum: уровень в дереве AST
    // - levelExpr: строковое представление уровня и индекса
    // Возвращает Option[AST], где None означает, что логический план не может быть преобразован
    def loop(logicalPlan: LogicalPlan, levelnum: Int, levelExpr: String): Option[AST] = {

      // Определение узла на основе типа логического плана
      val node: Option[Node] = logicalPlan match {
        case p: Project =>
          // Обработка узла типа Project и создание узла AST с именем "Project"
          val columns = p.projectList.map(_.sql)
          Some(ProjectNode(columns))
          
        case f: Filter =>
          // Обработка узла типа Filter и создание узла AST с именем "Filter"
          val condition = f.condition.sql
          Some(FilterNode(condition))
          
        case u: Union =>
          // Обработка узла типа Union и создание узла AST с именем "Union"
          val isAll = u.allowMissingCol
          val byName = u.byName
          Some(UnionNode(isAll, byName))
          
        case lr: LocalRelation =>
          // Обработка узла типа LocalRelation и создание узла AST с именем "LocalRelation"
          val columns = lr.output.map(_.sql)
          Some(LocalRelationNode(columns))
          
        case lr: LogicalRelation =>
          // Обработка узла типа LogicalRelation и создание узла AST с именем "LogicalRelation"
          val columns = lr.output.map(_.sql)
          Some(LogicalRelationNode(columns))
          
        case _ =>
          // Если логический план не совпадает ни с одним из известных типов, возвращаем None
          None
      }

      // Если узел успешно создан, создаем AST и рекурсивно обрабатываем детей
      node.map { n =>
        // Создание списка дочерних узлов AST, рекурсивно обрабатывая каждый дочерний план
        val children = logicalPlan.children.zipWithIndex.flatMap {
          case (ch, i) => loop(ch, levelnum + 1, f"${levelnum + 1}_${i}")
        }.toList
        // Создание узла AST с текущим узлом и его дочерними узлами
        AST(n, children, levelnum, levelExpr)
      }
    }

    // Запуск рекурсивного обхода с начальным уровнем и строковым представлением
    loop(plan, 1, "1_0")
  }

  // Неявное преобразование для класса LogicalPlan, добавляющее метод для получения AST
  implicit class parser(lp: LogicalPlan) {
    def AST(): Option[AST] = {
      parseAST(lp)
    }
  }
}

теперь можно получать AST следующим образом logicalPlan.AST().get

Определим сущности в Атласе для построения Lianage

таблица наследовательности в Apche Atlas
таблица наследовательности в Apche Atlas

Подобно тому, как в языках программирования на базе Java все классы наследуются от Object, в Apache Atlas все сущности наследуются от Referenceable. Однако построение lineage (линейности данных) происходит только для типов Process и DataSet. Если тип не наследуется от одного из этих классов (например, если наследование происходит от Asset), то кнопка "Lineage" попросту не появится.

Кроме того, сам lineage строится на основе полей inputs и outputs для Process, аналогично и для DataSet. Здесь ничего не поделаешь — придется наследоваться от этих типов, хотя большинство полей будет оставаться пустыми.

Изначально моей целью было отразить преобразования, происходящие в Apache Spark, но структура Apache Atlas вынуждает окружать мои Process сущностями DataSet в полях inputs и outputs. Хотя меня изначально интересовали только Process, эти DataSet-ы могут быть использованы для отображения схем данных, с которыми процесс начинается и которые возвращает. Однако на данном этапе я не планирую парсить схемы и оставлю каждый DataSet пустым.

В Apache Atlas кастомные сущности можно описывать с помощью формата JSON. При этом важно соблюдать правильную последовательность определения типов, иначе возникнет ошибка 404 при попытке сослаться на тип, который еще не существует в системе.

Сначала определим тип для DataSet.

  {
    "enumDefs": [],
    "structDefs": [],
    "classificationDefs": [],
    "entityDefs": [
      {
        "name": "pico_spark_data_type", 
        "description": "A type inheriting from assets for Pico DataSet", 
        "superTypes": ["DataSet"],
        "attributeDefs": [],
        "relationshipDefs": [] 
      }
    ],
    "relationshipDefs": [],
    "businessMetadataDefs": []
  }

Комментарии:

  1. enumDefs, structDefs, classificationDefs:

    • Пустые массивы, так как перечисления, структуры и классификации не используются.

  2. entityDefs:

    • Определяет сущности в системе.

    • name: Имя сущности, которая представляет тип данных.

    • description: Описание сущности.

    • superTypes: Суперклассы, от которых наследуется данная сущность.

    • attributeDefs: Пустой массив, так как атрибуты не указаны.

    • relationshipDefs: Пустой массив, так как связи не определены.

  3. relationshipDefs, businessMetadataDefs:

    • Пустые массивы, так как глобальные определения отношений и бизнес-метаданные не заданы.

{
    "enumDefs": [],
    "structDefs": [],
    "classificationDefs": [],
    "entityDefs": [
      {
        "name": "pico_spark_process_type",
        "description": "A type inheriting from assets for Pico Spark abstraction",
        "superTypes": ["Process"],
        "attributeDefs": [
          {
            "name": "inputs",
            "description": "List of inputs for the process",
            "typeName": "array<pico_spark_data_type>",
            "isOptional": true
          },
          {
            "name": "outputs",
            "description": "List of outputs for the process",
            "typeName": "array<pico_spark_data_type>",
            "isOptional": true
          }
        ],
        "relationshipDefs": []
      }
    ],
    "relationshipDefs": [],
    "businessMetadataDefs": []
  }

Комментарии:

  1. enumDefs, structDefs, classificationDefs:

    • Пустые массивы, так как перечисления, структуры и классификации не используются в данном определении.

  2. entityDefs:

    • Содержит определения сущностей.

    • name: Имя сущности, определяющей тип данных в контексте Pico Spark.

    • description: Описание сущности.

    • superTypes: Суперклассы, от которых сущность наследуется.

    • attributeDefs: Пустой массив, так как атрибуты не добавлены.

    • relationshipDefs: Пустой массив, так как связи не указаны.

  3. relationshipDefs, businessMetadataDefs:

    • Пустые массивы, так как глобальные определения отношений и бизнес-метаданные не заданы.

Для типа pico_spark_process_type я также создаю наследников для всех типов узлов (Filter, Project, Union и т.д.) в AST. Однако здесь я опущу это, поскольку это займет слишком много места и будет слишком однообразно.

В этих JSON-ах много пустых сущностей, но без них не обойтись, так как без них типы в Apache Atlas не создаются.

Взаимодействие с Apache Atlas по REST

Простого описания сущностей недостаточно — их нужно передать в Apache Atlas. У Atlas есть обширное REST API для взаимодействия с системой. Конкретно процесс создания нового типа выглядит следующим образом:

curl -X POST "http://<atlas-server-url>/api/atlas/v2/types/typedefs" \
     -H "Content-Type: application/json" \
     -H "Accept: application/json" \
     -d '{
           "enumDefs": [],
           "structDefs": [],
           "classificationDefs": [],
           "entityDefs": [
             {
               "name": "pico_spark_data_type",
               "description": "A type inheriting from assets for Pico DataSet",
               "superTypes": ["DataSet"],
               "attributeDefs": [],
               "relationshipDefs": []
             }
           ],
           "relationshipDefs": [],
           "businessMetadataDefs": []
         }'

создаю JSON файл где будут перечислены тела запросов для всх необходимых кастомных типов под названием EntityTypes.json
и создам метод который читает этот файл и делает запрос на каждый EntityType

  val atlasServerUrl = "http://localhost:21000/api/atlas/v2"
  val authHeader: String = "Basic " + java.util.Base64.getEncoder.encodeToString("admin:admin".getBytes)

def generatePicoSparkTypes(): Unit = {

  // Функция для чтения содержимого файла из ресурсов
  def readFileFromResources(fileName: String): String = {
    val source = Source.fromResource(fileName)
    try source.mkString
    finally source.close()
  }

  // Чтение JSON из файла ресурсов
  val jsonString = readFileFromResources("EntityTypes.json")

  // Попытка разобрать строку JSON в структуру данных
  val parsedJson: Either[ParsingFailure, Json] = parse(jsonString)

  // Преобразование разобранного JSON в список объектов JSON
  val jsonObjects: Option[List[Json]] = parsedJson match {
    case Right(json) =>
      json.as[List[Json]] match {
        case Right(jsonArray) =>
          Some(jsonArray)
        case Left(error) =>
          // Обработка ошибки разбора массива JSON
          println(s"Error parsing JSON array: $error")
          None
      }
    case Left(error) =>
      // Обработка ошибки разбора JSON
      println(s"Error parsing JSON: $error")
      None
  }

  // Отправка каждого объекта JSON на сервер Atlas
  jsonObjects match {
    case Some(jsonArray) =>
      jsonArray.foreach { jsonBody =>
        // Создание POST-запроса для создания типа в Apache Atlas
        val createTypeRequest = basicRequest
          .method(Method.POST, uri"$atlasServerUrl/types/typedefs") // Метод POST и URL для запроса
          .header("Authorization", authHeader) // Заголовок авторизации
          .header("Content-Type", "application/json") // Заголовок типа содержимого
          .header("Accept", "application/json") // Заголовок для принятия ответа в формате JSON
          .body(jsonBody.noSpaces) // Тело запроса с JSON-данными
          .response(asString) // Ожидание ответа в формате строки

        // Отправка запроса и вывод результата
        val response = createTypeRequest.send(backend)
        println(response.body) // Печать тела ответа
        println(response.code) // Печать кода ответа
      }
    case None =>
      // Сообщение, если JSON-объекты не были найдены
      println("No JSON objects found.")
  }

}

комментарии:

  1. readFileFromResources: Функция для чтения содержимого файла JSON из ресурсов.

  2. jsonString: Получение строки JSON из файла.

  3. parsedJson: Попытка разобрать строку JSON в структуру данных Json.

  4. jsonObjects: Преобразование разобранного JSON в список объектов JSON.

  5. jsonArray.foreach: Для каждого объекта JSON создается и отправляется POST-запрос на сервер Atlas.

  6. createTypeRequest: Создание POST-запроса с JSON-данными для создания типов в Apache Atlas.

  7. response: Отправка запроса и вывод результата, включая тело ответа и код ответа.

теперь для создания всех энтити в Apache Atlas достаточно вызвать метод
generatePicoSparkTypes()


Поскольку DataSet сущности уже созданы, можно сразу приступить к созданию Process сущностей с заполненными полями inputs и outputs. Это важно, так как при попытках обновления сущностей через API ничего не сработало. Начнем с определения набора методов:

EntityTypes в Apache Atlas
EntityTypes в Apache Atlas

как видим все EntityType созданы

Создаем DataSet Entity

Перед тем как создавать сущности процессов нужно создать сущности DataSet-тов, поскольку первые ссылаются на вторые

На данном уже определен pico_spark_data_type который отвечает за входные / выходные схемы данных.

Для начала определимся с двумя вспомогательными методами

/**
 * Создает функцию для отправки JSON данных на указанный эндпоинт в Apache Atlas.
 *
 * @param postfix Строка, добавляемая к базовому URL для формирования полного URL эндпоинта.
 * @return Функция, принимающая JSON строку и отправляющая ее на сервер через HTTP POST запрос.
 */
def senderJsonToAtlasEndpoint(postfix: String): String => Unit = {

  jsonBody => {
    // Создание HTTP POST запроса для отправки JSON данных на сервер
    val createTypeRequest = basicRequest
      .method(Method.POST, uri"$atlasServerUrl/${postfix}")
      .header("Authorization", authHeader)
      .header("Content-Type", "application/json")
      .header("Accept", "application/json")
      .body(jsonBody)
      .response(asString)

    // Отправка запроса и получение ответа
    val response = createTypeRequest.send(backend)
    
    // Вывод тела ответа и кода статуса
    println(response.body)
    println(response.code)
  }
}

/**
 * Генерирует и отправляет сущности данных Spark в Apache Atlas для указанного домена.
 *
 * @param domain Домен, который будет использоваться в атрибутах сущностей.
 * @param execJsonAtlas Функция для отправки JSON данных в Apache Atlas.
 * @return Функция, принимающая AST и создающая JSON для каждой дочерней сущности.
 */
def generateSparkDataEntities(domain: String, execJsonAtlas: String => Unit): AST => Unit = {

  // Локальная функция для генерации и отправки сущностей данных Spark
  def generateEntities(ast: AST): Unit = {
    ast.children.foreach { inast =>
      // Формирование JSON тела для сущности данных Spark
      val jsonBody =
        f"""
           |{
           |  "entity": {
           |    "typeName": "pico_spark_data_type",
           |    "attributes": {
           |      "domain": "${domain}",
           |      "qualifiedName": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
           |      "name": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
           |      "description": "A description for the spark_data"
           |    }
           |  }
           |}
           |""".stripMargin

      // Отправка сформированного JSON тела на сервер
      execJsonAtlas(jsonBody)
      
      // Рекурсивный вызов для обработки дочерних узлов
      generateEntities(inast)
    }
  }

  // Возвращаем функцию для генерации сущностей
  generateEntities
}

Пояснения

  • senderJsonToAtlasEndpoint: Эта функция создает и возвращает другую функцию, которая отправляет JSON данные на указанный эндпоинт в Apache Atlas. Комментарии объясняют параметры, создание запроса, отправку и обработку ответа.

  • generateSparkDataEntities: Эта функция генерирует сущности данных Spark, формирует соответствующий JSON и отправляет его в Apache Atlas, используя переданную функцию для отправки. Комментарии описывают параметры и внутреннюю логику функции, включая рекурсивный вызов для обработки всех дочерних узлов.

Напишем еще 2 метода для запуска формирования Linage В Atlas

/**
 * Преобразует AST (абстрактное синтаксическое дерево) в сущности Apache Atlas и отправляет их на сервер.
 *
 * @param ast Абстрактное синтаксическое дерево, представляющее структуру данных.
 * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
 * @param topLevelExpr Выражение уровня, используемое для определения уровня в AST. В данном случае не используется.
 */
def ASTToAtlasEntity(ast: AST, domain: String, topLevelExpr: String): Unit = {

  // Создание функции для отправки JSON данных на эндпоинт "entity" в Apache Atlas
  val entitySender = senderJsonToAtlasEndpoint("entity")
  
  // Создание функции для генерации сущностей данных Spark и отправки их в Apache Atlas
  val sparkDataEntityGenerator = generateSparkDataEntities(domain, entitySender)

  // Создание базовых сущностей вывода и отправка их на сервер
  //ее реализацию опущу
  createBaseOutput(domain, entitySender)
  
  // Создание базовых сущностей ввода и отправка их на сервер
  //ее реализацию опущу
  createBaseInput(domain, entitySender)
  
  // Генерация и отправка сущностей данных Spark на основе AST
  sparkDataEntityGenerator(ast)
}

/**
 * Имплементация расширения для преобразования AST в сущности Apache Atlas.
 *
 * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
 */
implicit class converter(ast: AST) {

  /**
   * Преобразует текущее AST в сущности Apache Atlas и отправляет их на сервер.
   *
   * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
   */
  def EntityToAtlas(domain: String): Unit = {
    ASTToAtlasEntity(ast, domain, "")
  }
}

Пояснения:

  • ASTToAtlasEntity: Этот метод преобразует переданное AST в сущности Apache Atlas и отправляет их на сервер. Он использует вспомогательные функции для создания базовых сущностей и генерации сущностей данных Spark, а также отправляет их на сервер через созданную функцию entitySender.

  • EntityToAtlas: Это метод расширения (implicit class) для типа AST, который упрощает вызов метода ASTToAtlasEntity с дефолтным значением для topLevelExpr. Этот метод предоставляет удобный способ преобразования AST в сущности Apache Atlas, используя указанный домен.

Теперь при запуске ast.EntityToAtlas("picoDomain")В атласе появляется data entity

скриншот с web UI
скриншот с web UI

так как DataSet Entity уже созданы, то можно создавать Process Entity сразу с заролнеными inputs и outputs, это важно поскольку сколько я не тыкалась в Api для обновления Entuty ничего не работало.

начнем с того что определим пачку методов:

  // Создает функцию для отправки сущностей в Apache Atlas
  // Использует функцию преобразования AST в JSON и функцию отправки JSON
  def senderEntity(nodeToAtlasCreateEntityJson: (AST, String) => String, execJsonAtlas: String => Unit): (AST, String) => Unit = {
    // Возвращает функцию, которая преобразует AST в JSON и отправляет его в Atlas
    (ast: AST, topLevelExpr: String) => {
      val jsonBody = nodeToAtlasCreateEntityJson(ast, topLevelExpr)
      execJsonAtlas(jsonBody)
    }
  }

  // Генерирует JSON для сущностей в Atlas на основе AST и уровня
  // Определяет JSON для различных типов узлов, таких как ProjectNode, FilterNode и т.д.
  def generatotrProcessEntity(domain: String, qualifiedName: (Node, String) => String): (AST, String) => String = {
    (ast: AST, topLevelExpr: String) => {
      val node = ast.node

      // Создает список входных сущностей, если есть дочерние элементы
      val inputs = if (ast.children.nonEmpty) {
        ast.children.map(_.levelExpr).map { expr =>
          f"""
             |
             |{
             |  "typeName": "pico_spark_data_type",
             |  "uniqueAttributes": {
             |    "qualifiedName": "pico_spark_data_${ast.levelExpr}-${expr}@${domain}"
             |  }
             |}
             |
             |""".stripMargin
        }.mkString(", ")
      } else {
        f"""
           | {
           |  "typeName": "pico_spark_data_type",
           |   "uniqueAttributes": {
           |    "qualifiedName": "pico_spark_data_input@${domain}"
           |   }
           | }
           |""".stripMargin
      }

      // Создает JSON для выходных сущностей, если задан topLevelExpr
      val output = if (topLevelExpr.nonEmpty) {
        f"""
           | {
           |  "typeName": "pico_spark_data_type",
           |   "uniqueAttributes": {
           |      "qualifiedName": "pico_spark_data_${topLevelExpr}-${ast.levelExpr}@${domain}"
           |   }
           | }
           |""".stripMargin
      } else {
        f"""
           | {
           |  "typeName": "pico_spark_data_type",
           |   "uniqueAttributes": {
           |    "qualifiedName": "pico_spark_data_output@${domain}"
           |   }
           | }
           |""".stripMargin
      }

      // Определяет JSON для различных типов узлов, таких как ProjectNode, FilterNode и т.д.
      node match {
        case p: ProjectNode =>
          f"""
             |{
             |"entity": {
             |      "typeName": "pico_spark_project_type",
             |      "attributes": {
             |        "qualifiedName": "${qualifiedName(node, ast.levelExpr)}",
             |        "name": "pico_project_${ast.levelExpr}",
             |        "description": "This is an project for the pico_spark_project_type",
             |        "columns": [${p.columns.map(col => "\"" + col + "\"").mkString(", ")}],
             |        "inputs":[ ${inputs} ],
             |        "outputs":[ ${output} ]
             |      }
             |    }
             |}
             |""".stripMargin
        case ...

      }
    }
  }

  // Создает функцию для генерации и отправки сущностей в Apache Atlas
  // Использует предоставленные функции для создания JSON и отправки его в Atlas
  def generatorDataEntities(domain: String, execJsonAtlas: String => Unit): AST => Unit = {

    def sparkDataEntitys(ast: AST): Unit = {
      ast.children.foreach { inast =>
        val jsonBody =
          f"""
             |{
             |  "entity": {
             |    "typeName": "pico_spark_data_type",
             |    "attributes": {
             |      "domain": "${domain}",
             |      "qualifiedName": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
             |      "name": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
             |      "description": "A description for the spark_data"
             |    }
             |  }
             |}
             |""".stripMargin

        execJsonAtlas(jsonBody)
        sparkDataEntitys(inast)
      }
    }

    // Возвращает функцию, которая генерирует и отправляет сущности данных для Spark
    sparkDataEntitys
  }

Пояснения:

  • senderEntity: Функция, которая создает и отправляет JSON для сущностей в Apache Atlas, используя предоставленные функции преобразования и отправки.

  • generatotrProcessEntity: Функция, которая генерирует JSON для различных типов узлов в AST и преобразует их в формат, пригодный для Apache Atlas.

  • generatorDataEntities: Функция, которая создает и отправляет данные сущностей для Spark, рекурсивно обрабатывая детей узлов в AST.

И обновляем методы для работы с AST

   // Преобразует AST в сущности Apache Atlas и отправляет их на указанный эндпоинт
  def ASTToAtlasEntity(ast: AST, domain: String): Unit = {

    // Создает функцию отправки JSON-данных для сущностей в Apache Atlas
    val entitySender = senderJsonToAtlasEndpoint("entity")

    // Создает функцию для генерации квалифицированного имени
    val qualifiedName = generatorQualifiedName(domain)

    // Создает функцию для генерации JSON-сущностей для процессов
    val generatorProcessEntity = generatotrProcessEntity(domain, qualifiedName)

    // Создает функцию для отправки JSON-данных сущностей в Atlas
    val sendEntity = senderEntity(generatorProcessEntity, entitySender)

    // Создает функцию для генерации данных сущностей и отправки их в Atlas
    val generateDataEntity = generatorDataEntities(domain, entitySender)

    // Обрабатывает один узел AST, отправляя его как сущность в Atlas
    def processNode(ast: AST, intopLevelExpr: String): Unit = {
      sendEntity(ast, intopLevelExpr)
    }

    // Рекурсивно проходит по всему дереву AST, обрабатывая каждый узел
    def traverseAST(ast: AST, intopLevelExpr: String): Unit = {
      processNode(ast, intopLevelExpr)
      ast.children.foreach(ch => traverseAST(ch, ast.levelExpr))
    }

    // Создает базовые выходные и входные сущности для указанного домена и отправляет их в Atlas
    createBaseOutput(domain, entitySender)
    createBaseInput(domain, entitySender)

    // Генерирует данные сущностей для AST и отправляет их в Atlas
    generateDataEntity(ast)

    // Запускает рекурсивное прохождение AST
    traverseAST(ast, "")
  }

  // Обогащает класс AST функцией для преобразования его в сущности Apache Atlas
  implicit class converter(ast: AST) {

    // Преобразует текущий узел AST в сущности Apache Atlas и отправляет их на указанный эндпоинт
    def EntityToAtlas(domain: String): Unit = {
      ASTToAtlasEntity(ast, domain)
    }

  }

Пояснения:

  • ASTToAtlasEntity: Основной метод, который:

    • Создает функции для преобразования AST в JSON и отправки его в Apache Atlas.

    • Определяет вспомогательные функции для обработки узлов AST и рекурсивного обхода дерева.

    • Создает и отправляет базовые сущности (входные и выходные) в Atlas.

    • Рекурсивно проходит по дереву AST и отправляет каждую сущность в Atlas.

  • implicit class converter(ast: AST): Обогащает класс AST, добавляя метод для преобразования AST в сущности Apache Atlas.

    • EntityToAtlas: Использует метод ASTToAtlasEntity для преобразования текущего узла AST в сущности Atlas и отправки их в указанный домен.

Теперь после запуска В Apache Atlas таки появиться Linage

Что ж, на изначальный logical план вроде похоже

Project [model#17, manufacturer#18, 2024-09-12 16:57:34.046609 AS processedDDTM#36]
+- Project [model#17, manufacturer#18]
   +- Filter NOT (manufacturer#18 = Audi)
      +- Union false, false
         :- Relation [model#17,manufacturer#18] csv
         +- Project [_1#23 AS model#28, _2#24 AS manufacturer#29]
            +- LocalRelation [_1#23, _2#24]

P.S. код можно глянуть тут
P.P.S докер фалы для запуска Apache Atlas можно взять тут

Комментарии (1)


  1. dolfinus
    12.09.2024 18:23
    +1

    А что даст такое хранение плана dataframe в Atlas? В реальных ETL процессах постоянно происходит какие-то изменения, план сложно назвать постоянным. Задача была в изучении того, как планы меняются во времени?