В предыдущей статье я рассказал, как организовать систему распределенного машинного обучения на GPU NVidia, используя язык Java с фреймворками Spring, Spark ML, XGBoost, DJL в standalone кластере Spark. Особенностью поставленной задачи являлось организация системы под управлением ОС Windows 10 Pro, в Docker-контейнерах. Эксперимент оказался не вполне успешным. В данной статье я покажу, как воспользоваться имеющимися наработками и запустить Spark Jobs в Kubernetes в режимах client и cluster, опишу особенности работы с Cassandra в Spark, покажу пример обучения модели и ее дальнейшего использования. В этот раз буду использовать язык Kotlin. Репозиторий с кодом доступен на GitLab.

Данная статья представляет интерес для тех, кто интересуется системами Big Data и стремится создать систему, позволяющую, в том числе, выполнять задачи распределенного машинного обучения на Spark в Kubernetes, используя GPU NVidia и Cassandra для хранения данных.

Версии библиотек и фреймворков

Первое, что нужно принять во внимание при построении подобной системы - она довольно сложная, с рядом элементов, которые не совместимы друг с другом. Так как используется Rapids, использовать Boot 3 не получится - для него требуется Java 17, и, хоть Spark и поддерживает данную версию, ускоритель GPU-вычислений Rapids на момент написания статьи не поддерживает данную версию в полной мере. Скорее всего, Java 17 будет добавлена в версии, которая также обзаведется поддержкой Spark 3.4.0. По этой причине, последний Spark (на момент написания статьи 3.4.0) брать за основу не стоит. К тому же, DataStax так же не обновила свой cassandra connector, и данная версия не поддерживается.

Версии компонентов, с которыми система заработала:

  • JDK 8

  • Spring Boot 2.7.11

  • Spark 3.3.2

  • NVidia Rapids 23.04.0

  • Cassandra 4.1.1

  • PostgreSQL 15.3-1.pgdg110+1

  • scala-library 2.12.15

  • spark-cassandra-connector_2.12 3.3.0

  • com.fasterxml.jackson.core 2.13.5

На уровне инфраструктуры:

  • Kubernetes 1.26.3

  • ContainerD 1.7.0

  • NVidia GPU Operator 0.13.0

  • NVidia Driver 530.30.02-1 (ставится с CUDA)

  • CUDA - желательно не ниже 11.8 в базовом образе NVidia, в последних же драйверах по умолчанию 12+

  • Ubuntu 22.04.2

Конфигурация стенда:

Узел

CPU

RAM

GPU

Адрес в ЛВС

master1

Intel i7-2700k

16 Gb

NVidia GTX 1650 4 Gb

192.168.0.150

worker1

AMD 3800x

32 Gb

NVidia RTX 2600 6 Gb

192.168.0.125

Особенности

  1. Не использовать spring-boot-starter-parent. Он тянет за собой множество библиотек, которые конфликтуют с библиотеками Spark. В том числе, для последующего перехода на Java 17 (выпустят же когда-нибудь разработчики Rapids библиотеку, поддерживающую данную версию) лучше не использовать Tomcat, и использовать Undertow (на момент написания статьи были проблемы с classloader'ами Rapids и Tomcat, а на Undertow успешно запустилось). Но и тут есть особенности: необходимо подключить корректную версию Jackson (указана выше) и ряд библиотек jakarta и javax servlet:

pom.xml
<dependency>
  <groupId>org.glassfish.web</groupId>
  <artifactId>jakarta.servlet.jsp.jstl</artifactId>
  <version>3.0.1</version>
</dependency>
<dependency>
  <groupId>jakarta.servlet</groupId>
  <artifactId>jakarta.servlet-api</artifactId>
  <version>6.0.0</version>
  <scope>provided</scope>
</dependency>
  <dependency>
  <groupId>jakarta.servlet.jsp.jstl</groupId>
  <artifactId>jakarta.servlet.jsp.jstl-api</artifactId>
  <version>3.0.0</version>
</dependency>
<dependency>
  <groupId>javax.servlet</groupId>
  <artifactId>javax.servlet-api</artifactId>
  <version>4.0.1</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>ch.qos.logback</groupId>
  <artifactId>logback-classic</artifactId>
  <version>1.2.9</version>
</dependency>

  1. Подключить scala-library 2.12.15. Зависимость spark-cassandra-connector_2.12 3.3.0 использует версию 2.12.11, а Spark 3.3.2 - 2.12.15. Если не заменить, будут конфликты SerialVersionUID. Чтобы заменить, в pom.xml просто нужно поставить scala-library перед spark-cassandra-connector.

  2. Могут быть другие нюансы совместимости компонентов, следует быть готовым к отладке. Полностью рабочую конфигурацию можно посмотреть в репозитории.

Подготовка инфраструктуры

В этот раз постараюсь этот вопрос изложить покороче, но на важных моментах остановлюсь подробнее. Подготовка образа executor подробно описана в предыдущей статье, но в этой сам метод сборки будет отличаться. В первую очередь нужно подготовить кластер Kubernetes. Описывать установку не буду, скажу лишь, что взял KubeSpray и раскатал 1.26.3.

Nvidia drivers

Выбрать последнюю Cuda, драйвер поставится автоматом:

install CUDA and NVidia driver
wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64/cuda-ubuntu2204.pin
sudo mv cuda-ubuntu2204.pin /etc/apt/preferences.d/cuda-repository-pin-600
wget https://developer.download.nvidia.com/compute/cuda/12.1.0/local_installers/cuda-repo-ubuntu2204-12-1-local_1
sudo dpkg -i cuda-repo-ubuntu2204-12-1-local_12.1.0-530.30.02-1_amd64.deb
sudo cp /var/cuda-repo-ubuntu2204-12-1-local/cuda-*-keyring.gpg /usr/share/keyrings/
sudo apt-get update
sudo apt-get -y install cuda

Выполнить шаги установки, перезагрузить машину, проверить:

nvidia-smi
nvidia-smi
Sun Mar 19 20:56:18 2023       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 530.30.02              Driver Version: 530.30.02    CUDA Version: 12.1     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                  Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf            Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|=========================================+======================+======================|
|   0  NVIDIA GeForce GTX 1650         On | 00000000:02:00.0 Off |                  N/A |
|  0%   47C    P2               N/A /  75W|    332MiB /  4096MiB |      2%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                                         
+---------------------------------------------------------------------------------------+
| Processes:                                                                            |
|  GPU   GI   CI        PID   Type   Process name                            GPU Memory |
|        ID   ID                                                             Usage      |
|=======================================================================================|
|    0   N/A  N/A      1346      G   /usr/lib/xorg/Xorg                           62MiB |
|    0   N/A  N/A      1905    C+G   ...libexec/gnome-remote-desktop-daemon      132MiB |
|    0   N/A  N/A      2013      G   /usr/bin/gnome-shell                        131MiB |
+---------------------------------------------------------------------------------------+

Донастроить:

nvidia-container-toolkit
sudo -i
apt-get update && apt-get install -y nvidia-container-toolkit

distribution=$(. /etc/os-release;echo $ID$VERSION_ID)
curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | sudo apt-key add -
curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | sudo tee /etc/apt/sources.list.d/nvidia-docker.list

sudo apt-get update && sudo apt-get install -y nvidia-container-toolkit
sudo systemctl restart docker

nvidia-ctk runtime configure --runtime=docker

Не обязательно: запустить для проверки контейнеры со спарк воркерами (можно взять из прошлой статьи).

Containerd

Выполнить:

nano /etc/containerd/config.toml

Отредактировать:

config.toml
version = 2
root = "/var/lib/containerd"
state = "/run/containerd"
oom_score = 0

[grpc]
  max_recv_message_size = 16777216
  max_send_message_size = 16777216

[debug]
  level = "info"

[metrics]
  address = ""
  grpc_histogram = false

[plugins]
  [plugins."io.containerd.grpc.v1.cri"]
    sandbox_image = "registry.k8s.io/pause:3.8"
    max_container_log_line_size = -1
    enable_unprivileged_ports = false
    enable_unprivileged_icmp = false
    [plugins."io.containerd.grpc.v1.cri".containerd]
      default_runtime_name = "nvidia"
      snapshotter = "overlayfs"

      [plugins."io.containerd.grpc.v1.cri".containerd.runtimes]
        [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.nvidia]
          privileged_without_host_devices = false
          runtime_engine = ""
          runtime_root = ""
          runtime_type = "io.containerd.runc.v2"

          [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.nvidia.options]
            BinaryName = "/usr/bin/nvidia-container-runtime"

        [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc]
          runtime_type = "io.containerd.runc.v2"
          runtime_engine = ""
          runtime_root = ""
          base_runtime_spec = "/etc/containerd/cri-base.json"

          [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc.options]
            systemdCgroup = true

    [plugins."io.containerd.grpc.v1.cri".registry]
      [plugins."io.containerd.grpc.v1.cri".registry.mirrors]
        [plugins."io.containerd.grpc.v1.cri".registry.mirrors."docker.io"]
          endpoint = ["https://registry-1.docker.io"]
      [plugins."io.containerd.grpc.v1.cri".registry.configs]
        [plugins."io.containerd.grpc.v1.cri".registry.configs."registry-1.docker.io".auth]
            username = "login"
            password = "password"
            auth = ""
            identitytoken = ""

Обращаю внимание на последнюю секцию - в ней указывается ваш репозиторий, здесь я привел пример с публичным Docker Hub "registry-1.docker.io".

Перезапустить сервис:

sudo systemctl restart containerd

Kubernetes

Начиная с версии Spark 3.1.1, поддержка Spark в Kubernetes доведена production-ready. Это означает, что появлилась возможность использовать Kubernetes как менеджер кластера Spark, указывая в качестве мастера Kube API Server:

k8s://https://${KUBE_API_SERVER}:6443

или

k8s://https://kubernetes.default.svc

Теперь, при запуске Spark Job, в кластере K8S будут запускаться Spark Executors с указанным Docker образом, ресурсами и прочими необходимыми конфигами. Если образ уже находится в локальном репозитории машины, POD поднимается довольно быстро, на моих машинах в течении 5 секунд после старта приложения поднимались 2 экзекуктора, полностью готовых к работе.

В связке со Spring приложением есть возможность создать Spring Bean с JavaSparkContext/SparkSession. Однако, есть другой способ: создавать SparkSession под каждую Spark Job, чтобы высвобождать ресурсы кластера. Это хорошо для редких задач по расписанию / запросу, в том числе запросам с различающимся количеством необходимых ресурсов (количество Spark Executors, CPU, память, видеопамять и т.п.), но имеет накладные расходы на поднятие экзекуторов и время выполнения задач. На моей практике все команды для работы со Spark требовали “прогрева”, т.е. каждая первая операция выполнялась дольше, чем все последующие. Стоит иметь это в виду, и, если необходимо постоянно запускать одинаковые задачи по расписанию, имеет смысл зарезервировать ресурсы кластера под спарковые экзекуторы.

Nvidia GPU Operator

Для использования ресурсов GPU внутри кластера Kubernetes необходимо установить Nvidia GPU Operator. Следует выполнить:

Install GPU Operator
kubectl create -f https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/v0.13.0/nvidia-device-plugin.yml

helm repo add nvidia https://nvidia.github.io/gpu-operator
helm repo update

kubectl create ns gpu-operator
     
helm install --wait gpu-operator \
     -n gpu-operator \
     nvidia/gpu-operator \
     --set operator.defaultRuntime=containerd

Проверить работу можно следующим образом:

kubectl run gpu-test \
     --rm -t -i \
     --restart=Never \
     --image=nvcr.io/nvidia/cuda:10.1-base-ubuntu18.04 nvidia-smi

В случае успеха вернется результат nvidia-smi:

В целях проверки можно запустить deployment с двумя репликами заготовленного образа Spark executor:

executor deployment
---
kind: Deployment
apiVersion: apps/v1
metadata:
  name: cuda-example
spec:
  replicas: 2
  selector:
    matchLabels:
      app: cuda-example
  template:
    metadata:
      labels:
        app: cuda-example
    spec:
      containers:
        - name: cuda-jdk8
          image: repo/cuda-jdk8-spark-3.3.2:v2
          resources:
            limits:
              cpu: 2
              memory: 4Gi
              nvidia.com/gpu: 1
            requests:
              cpu: "200m"
              memory: 0.5Gi
              nvidia.com/gpu: 1

на каждой машине должно подняться по одному поду, и, если экзекнуться в них, команда nvidia-smi должна вывести результат. Обращаю внимание, что для запуска экзекуторов не нужно поднимать их вручную, K8S Scheduler сделает это сам. Манифест приведен для примера и проверки работоспособности GPU Operator, после проверки его можно удалить.

Образ Spark executor

Следует изменить spark/kubernetes/dockerfiles/Dockerfile:

Dockerfile
ARG java_image_tag=17-jre
FROM ${java_image_tag}

ARG spark_uid=1001
ARG UID_GID=1001
ENV UID=${UID_GID}
ENV GID=${UID_GID}

ENV SPARK_RAPIDS_DIR=/opt/sparkRapidsPlugin
ENV SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_DIR}/rapids-4-spark_2.12-23.04.0.jar

RUN set -ex && \
    sed -i 's/http:\/\/deb.\(.*\)/https:\/\/deb.\1/g' /etc/apt/sources.list && \
    apt-get update && \
    ln -s /lib /lib64 && \
    apt install -y bash tini libc6 libpam-modules libnss3 procps nano iputils-ping net-tools iptables sudo \
    wget software-properties-common build-essential libnss3-dev zlib1g-dev libgdbm-dev libncurses5-dev \
    libssl-dev libffi-dev libreadline-dev libsqlite3-dev libbz2-dev python3 && \
    mkdir -p /opt/spark && \
    mkdir -p /opt/spark/examples && \
    mkdir -p /opt/spark/conf && \
    mkdir -p /opt/spark/work-dir && \
    mkdir -p /opt/sparkRapidsPlugin && \
    touch /opt/spark/RELEASE && \
    rm /bin/sh && \
    ln -sv /bin/bash /bin/sh && \
    echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
    chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \
    rm -rf /var/cache/apt/*
RUN apt-get install libnccl2 libnccl-dev -y --allow-change-held-packages && rm -rf /var/cache/apt/*

COPY jars /opt/spark/jars
COPY rapids ${SPARK_RAPIDS_DIR}
COPY extraclasspath /opt/spark/extraclasspath
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY conf /opt/spark/conf
COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
COPY kubernetes/dockerfiles/spark/decom.sh /opt/
COPY kubernetes/tests /opt/spark/tests
COPY data /opt/spark/data
COPY datasets /opt/spark/

ENV SPARK_HOME /opt/spark

WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir
RUN chmod a+x /opt/decom.sh
RUN chmod a+x /opt/entrypoint.sh
RUN chmod a+x /opt/sparkRapidsPlugin/getGpusResources.sh
RUN ls -lah /opt

RUN groupadd --gid $UID appuser && useradd --uid $UID --gid appuser --shell /bin/bash --create-home appuser
RUN mkdir /var/logs && chown -R appuser:appuser /var/logs
RUN mkdir /opt/spark/logs && chown -R appuser:appuser /opt/spark/
RUN chown -R appuser:appuser /tmp

RUN ls -lah /home/appuser
RUN touch /home/appuser/.bashrc

RUN echo -e '\
export SPARK_HOME=/opt/spark\n\
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin\
' > /home/appuser/.bashrc

RUN chown -R appuser:appuser /home/appuser

EXPOSE 4040
EXPOSE 8081

USER ${spark_uid}

ENTRYPOINT [ "/opt/entrypoint.sh" ]

entrypoint.sh из прошлой статьи пригодится, но для локального standalone кластера из одного воркера (подходит для отладки в Windows). spark/kubernetes/dockerfiles/entrypoint.sh следует изменить до стандартного вида:

entryoint.sh
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# echo commands to the terminal output
set -ex

# Check whether there is a passwd entry for the container UID
myuid=$(id -u)
mygid=$(id -g)
# turn off -e for getent because it will return error code in anonymous uid case
set +e
uidentry=$(getent passwd $myuid)
set -e

# If there is no passwd entry for the container UID, attempt to create one
if [ -z "$uidentry" ] ; then
    if [ -w /etc/passwd ] ; then
	echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd
    else
	echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID"
    fi
fi

SPARK_CLASSPATH="$SPARK_CLASSPATH:${SPARK_HOME}/jars/*"
env | grep SPARK_JAVA_OPT_ | sort -t_ -k4 -n | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt

if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then
  SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH"
fi

if ! [ -z ${PYSPARK_PYTHON+x} ]; then
    export PYSPARK_PYTHON
fi
if ! [ -z ${PYSPARK_DRIVER_PYTHON+x} ]; then
    export PYSPARK_DRIVER_PYTHON
fi

# If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor.
# It does not set SPARK_DIST_CLASSPATH if already set, to avoid overriding customizations of this value from elsewhere e.g. Docker/K8s.
if [ -n "${HADOOP_HOME}"  ] && [ -z "${SPARK_DIST_CLASSPATH}"  ]; then
  export SPARK_DIST_CLASSPATH="$($HADOOP_HOME/bin/hadoop classpath)"
fi

if ! [ -z ${HADOOP_CONF_DIR+x} ]; then
  SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH";
fi

if ! [ -z ${SPARK_CONF_DIR+x} ]; then
  SPARK_CLASSPATH="$SPARK_CONF_DIR:$SPARK_CLASSPATH";
elif ! [ -z ${SPARK_HOME+x} ]; then
  SPARK_CLASSPATH="$SPARK_HOME/conf:$SPARK_CLASSPATH";
fi

case "$1" in
  driver)
    shift 1
    CMD=(
      "$SPARK_HOME/bin/spark-submit"
      --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
      --deploy-mode client
      "$@"
    )
    ;;
  executor)
    shift 1
    CMD=(
      ${JAVA_HOME}/bin/java
      "${SPARK_EXECUTOR_JAVA_OPTS[@]}"
      -Xms$SPARK_EXECUTOR_MEMORY
      -Xmx$SPARK_EXECUTOR_MEMORY
      -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH"
      org.apache.spark.executor.CoarseGrainedExecutorBackend
      --driver-url $SPARK_DRIVER_URL
      --executor-id $SPARK_EXECUTOR_ID
      --cores $SPARK_EXECUTOR_CORES
      --app-id $SPARK_APPLICATION_ID
      --hostname $SPARK_EXECUTOR_POD_IP
      --resourceProfileId $SPARK_RESOURCE_PROFILE_ID
    )
    ;;

  *)
    echo "Non-spark-on-k8s command provided, proceeding in pass-through mode..."
    CMD=("$@")
    ;;
esac

# Execute the container CMD under tini for better hygiene
exec /usr/bin/tini -s -- "${CMD[@]}"

Файл spark/kubernetes/dockerfiles/decom.sh должен выглядеть следующим образом:

decom.sh
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


set -ex
echo "Asked to decommission"
# Find the pid to signal
date | tee -a ${LOG}
WORKER_PID=$(ps -o pid -C java | tail -n 1| awk '{ sub(/^[ \t]+/, ""); print }')
echo "Using worker pid $WORKER_PID"
kill -s SIGPWR ${WORKER_PID}
# For now we expect this to timeout, since we don't start exiting the backend.
echo "Waiting for worker pid to exit"
# If the worker does exit stop blocking the cleanup.
timeout 60 tail --pid=${WORKER_PID} -f /dev/null
date
echo "Done"
date
sleep 1

Скрипт сборки образов, который собирает два образа - для чистого JVM и PySpark (на всякий случай):

build-java-8-spark-3.3.2.sh
#!/bin/bash
echo "=========== 1st stage ==========="
echo "Docker repo: $DOCKER_REPO"
echo "### build base cuda11.8 java8 image ###"
docker build -f Dockerfile-cuda-java8 -t localhost:5000/cuda11.8-jdk8:v1 .

echo "=========== 2nd stage ==========="
echo "### build Spark and PySpark images ###"
cd spark-3.3.2

./bin/docker-image-tool.sh -r localhost:5000 \
      -t jdk8-3.3.2 -b java_image_tag=localhost:5000/cuda11.8-jdk8:v1 \
      -p ./kubernetes/dockerfiles/spark/bindings/python/Dockerfile \
      -n build

cd ..

docker tag localhost:5000/spark-py:jdk8-3.3.2 $DOCKER_REPO/cuda-jdk8-spark-py-3.3.2:v2
docker tag localhost:5000/spark:jdk8-3.3.2 $DOCKER_REPO/cuda-jdk8-spark-3.3.2:v2

echo "=========== 3rt stage ==========="
echo "### push Spark and PySpark images ###"
docker push $DOCKER_REPO/cuda-jdk8-spark-py-3.3.2:v2
docker push $DOCKER_REPO/cuda-jdk8-spark-3.3.2:v2

, где $DOCKER_REPO - ваш репозиторий Docker.

Для сборки образа executor необходим образ localhost:5000/cuda-jdk8:v1. Его можно собрать со следующим Dockerfile:

Dockerfile localhost:5000/cuda11.8-jdk8:v1
FROM nvcr.io/nvidia/cuda:11.8.0-runtime-ubuntu22.04

ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' LC_ALL='en_US.UTF-8'

ARG DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt install -y bash tini libc6 libpam-modules libnss3 procps nano iputils-ping net-tools

RUN apt-get update && \
	apt-get install -y openjdk-8-jdk && \
	apt-get install -y ant && \
	apt-get clean && \
	rm -rf /var/lib/apt/lists/* && \
	rm -rf /var/cache/oracle-jdk8-installer;

RUN apt-get update && \
	apt-get install -y ca-certificates-java && \
	apt-get clean && \
	update-ca-certificates -f && \
	rm -rf /var/lib/apt/lists/* && \
	rm -rf /var/cache/oracle-jdk8-installer;

ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/
RUN export JAVA_HOME

CMD ["tail", "-f", "/dev/null"]

Команда сборки приведена в скрипте выше:

docker build -f Dockerfile-cuda-java8 -t localhost:5000/cuda-jdk8:v1 .

Требования для запуска Spark Driver

Service account

Для драйвера необходим K8S Service Account с полными правами на неймспейс (в данном примере namespace default, но в продуктивном кластере, разумеется, следует выделять отдельный namespace). Следующий манифест создает необходимы SA и CRB:

Service Account и ClusterRoleBinding
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: spark-driver
  namespace: default

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: spark-driver-crb
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: cluster-admin
subjects:
  - kind: ServiceAccount
    name: spark-driver
    namespace: default

Имя SA указывается в конфиге SparkContext:

.set("spark.kubernetes.authenticate.driver.serviceAccountName", driverSa)

В манифесте приложения так же указывается SA:

serviceAccountName: spark-driver

В идеале, для Executor необходим отдельный SA с ограниченными правами (ClusterRole “edit”), но в данном примере он не конфигурировался.

Headless service

Для того, чтобы экзекуторы могли делать запросы к драйверу, необходимо сделать headless service:

kubectl expose pod $SPARK_DRIVER_NAME --port=$SPARK_DRIVER_PORT \
  --type=ClusterIP --cluster-ip=None

Манифест такого сервиса выглядит следующим образом:

headless service
kind: Service
apiVersion: v1
metadata:
  labels:
    app: ml-app
  name: ml-app
spec:
  type: ClusterIP
  clusterIP: None
  clusterIPs:
    - None
  internalTrafficPolicy: Cluster
  ipFamilies:
    - IPv4
  ipFamilyPolicy: SingleStack
  ports:
    - name: driver-port
      port: 33139
      targetPort: 33139
      protocol: TCP
    - name: block-manager-port
      port: 45029
      targetPort: 45029
      protocol: TCP
  selector:
    app: ml-app
  sessionAffinity: None

Запуск Spring приложения

На ряду с указанием мастера, необходимо указать параметр

spark.submit.deployMode

Значением cluster или client.

Cluster mode

В режиме работы Cluster приложение (spark driver) существует вне кластера K8S, его под не создается внутри кластера. Хорошо подходит для отладки, либо когда приложение запускается в отдельном кластере/сервере.

Client mode

В режиме работы Client приложение (spark driver) существует в кластере, поднимается как POD. При этом следует указать дополнительные настройки (будет рассмотрено ниже, в разделе с конфигурацией приложения).

Манифест приложения

application.yml
---
apiVersion: v1
kind: Pod
metadata:
  labels:
    app: ml-app
  name: ml-app
spec:
  serviceAccountName: spark-driver
  containers:
    - name: ml-app
      image: repo/ml:cuda11.8-jdk8
      imagePullPolicy: Always
      env:
        - name: "SPARK_EXECUTORS"
          value: "2"
        - name: "SPARK_MODE"
          value: "client"
        - name: "POSTGRES_HOST"
          value: "192.168.0.125"
        - name: "CASSANDRA_HOST"
          value: "192.168.0.125,192.168.0.150"
          # add your own env vars from ConfigMaps/Secrets
      resources:
        limits:
          cpu: "2"
          memory: 4Gi
        requests:
          cpu: "200m"
          memory: 0.5Gi
      ports:
        - containerPort: 4040
        - containerPort: 9090
        - containerPort: 33139
        - containerPort: 45029
---
kind: Service
apiVersion: v1
metadata:
  labels:
    app: ml-app
  name: ml-app
spec:
  type: ClusterIP
  clusterIP: None
  clusterIPs:
    - None
  internalTrafficPolicy: Cluster
  ipFamilies:
    - IPv4
  ipFamilyPolicy: SingleStack
  ports:
    - name: driver-port
      port: 33139
      targetPort: 33139
      protocol: TCP
    - name: block-manager-port
      port: 45029
      targetPort: 45029
      protocol: TCP
  selector:
    app: ml-app
  sessionAffinity: None

После применения манифеста автоматически поднимаются два Spark Executor в отдельных подах:

Cassandra

Cassandra — великолепный инструмент для хранения больших объемов данных. Это OLTP БД, те. рассчитанная на частые записи. Также особенностью этой БД является то, что модель данных нужно строить исходя из того, как будут запрашиваться данные: при не корректном задании ключей партиционирования и ключей кластеризации таблиц легко прийти к запросам с ALLOW FILTERING или организовать неравномерное заполнение узлов кластера Cassandra.

Для частных запросов на чтение больше подходят аналитические (OLAP) БД. Если посмотреть в сторону wide‑column DB, то аналогом Cassandra может быть Clickhouse. Однако, бывают различные случаи принципиального выбора Cassandra как источника аналитических данных и невозможности использования Clickhouse: недостаток вычислительных ресурсов, компетенций команд разработки и администрирования, и прочие ограничения.

Результатом такого решения может быть сложность выборки необходимых данных, так как, в отличие от реляционных БД, в Cassandra нет механизма отношений и поддержки join‑операций. В таком случае можно использовать Apache Spark, который позволяет выполнять запросы к Cassandra как к реляционной базе данных. Если имеется несколько Cassandra датацентров, один из них можно настроить для аналитических операций, разгрузив при этом датацентр(ы) для транзакций записи.

Связкой Spark‑Cassandra можно реализовать ETL‑процессы, как между таблицами Cassandra, так и в другие БД, например, в тот же Clickhouse, используя различные коннекторы Spark.

Сущности Big Data системы

Случайно или нет, статья публикуется во время сезона Big Data на Habr. Поскольку статья является руководством и ориентирована в том числе для новичков в этой области, кратко рассмотрим некоторые ключевые понятия: что такое ETL, Data Lake, DWH и Data Mart.

Data Lake — это централизованное хранилище данных, которое позволяет хранить данные в исходном формате без предварительной структуризации или преобразования. Data Lake предоставляет возможность хранить большие объемы разнородных данных, включая структурированные, полуструктурированные и неструктурированные данные. Это позволяет проводить различные виды анализа, исследований и обработки данных в будущем, когда появится необходимость.

DWH — это хранилище данных, и это специально организованная структура, предназначенная для хранения и управления данными. DWH интегрирует данные из разных источников, таких как транзакционные базы данных, файлы, веб‑сервисы и другие источники данных. Он предоставляет единое и консолидированное представление данных, которое удобно для анализа и отчетности.

Data Mart (Витрина данных) представляет собой сегментированное подмножество данных из хранилища данных (т. е. является подмножеством DWH), ориентированное на конкретные потребности бизнеса или отделов компании. Data Mart содержит данные, специально организованные и структурированные для поддержки аналитических запросов и принятия решений в конкретной области или функциональном подразделении компании.

ETL (Извлечение, Трансформация и Загрузка) — это процесс, который обеспечивает передачу данных из исходных источников в целевые системы хранения данных. В процессе ETL данные извлекаются из различных источников, затем подвергаются трансформации, включающей очистку, фильтрацию, преобразование и объединение данных, и, наконец, загружаются в целевую систему хранения данных, такую как DWH или Data Mart. Процесс ETL играет важную роль в обеспечении актуальности, целостности и качества данных в аналитической системе.

Пример процесса работы с данными в рассматриваемой системе

Рассмотрим, как можно использовать получившуюся систему на конкретном примере. Предметная область — аналитика цен финансовых инструментов, торгующихся на фондовых рынках. Так система строится на основе вычислений на GPU, попробуем построить модель машинного обучения, которая предсказывает цену курса акций NVidia (тикер NVDA) на несколько периодов вперед. Анализ проводился на данных дневного таймфрейма (1 day) с предсказанием на 2 периода (2 дня) вперед.

Схематично, система выглядит следующим образом:

Красной пунктирной линией показаны ETL процессы из источников данных, которые в рассматриваемой системе можно принять за Data Lakes. Данные из них забираются сервисом Data Extractor, который развернут на k8s node pool только с CPU, и записываются в OLTP DWH кластер Cassandra.

По необходимости/расписанию/событию Analytics Service, который развернут на k8s node pool с GPU, забирает данные из нескольких таблиц DWH кластера, трансформирует их в единую структуру и записывает в Data Mart кластер Cassandra, который может быть настроен для OLAP операций. Это тоже ETL процесс, он обозначен черными сплошными линиями, выполняется посредством Spark Executors, которые назначены Analytics Service — в терминологии Spark это driver.

Говоря об инфраструктуре решения, не будет лишним упомянуть еще пару инструментов для работы в с Big Data системой:

Apache Zeppelin — мощный инструмент визуализации и анализа данных. Он предоставляет интерактивную среду для разработки, выполнения и представления результатов вычислений на основе больших объемов данных. С помощью Apache Zeppelin можно создавать и запускать ноутбуки, которые содержат код на различных языках программирования, включая Java, Scala, Kotlin и многие другие. Zeppelin обладает широким набором интегрированных визуализаций и возможностей интерактивного анализа данных, что делает его полезным инструментом для работы с результатами аналитики и машинного обучения. Zeppelin имеет возможность работы с интерпретатором Spark.

Spring Cloud Data Flow (SCDF) — это распределенная система управления потоками данных (Data Flow) в облачной среде. Она предоставляет инфраструктуру и инструменты для развертывания, управления и мониторинга сложных потоков данных между различными источниками и приемниками данных. SCDF позволяет создавать и конфигурировать потоки данных в виде графа, состоящего из различных компонентов обработки, таких как источники, преобразования, фильтры и назначения. С помощью SCDF можно управлять и масштабировать потоки данных в распределенной среде.

Оба эти инструмента, Apache Zeppelin и Spring Cloud Data Flow, могут быть полезными в контексте рассматриваемой системы для аналитики цен финансовых инструментов. Apache Zeppelin предоставит удобную среду для визуализации и анализа данных, а Spring Cloud Data Flow поможет в управлении потоками данных и обработке информации между различными компонентами системы.

Конфигурация приложения

Стоит подробнее рассмотреть конфигурацию приложения для работы со Spark.

public static JavaSparkContext sparkContext(SparkSettings sparkSettings) {
        String host = sparkSettings.getHost();
        SparkConf sparkConf = new SparkConf(true)
                .setAppName(sparkSettings.getAppName())
                .setMaster(sparkSettings.getMasterHost())
                .setJars(sparkSettings.getJars())
                ...
                .set("spark.driver.resource.gpu.amount", "0") // (1)
                ...
                // Cassandra // (2)
                .set("spark.jars.packages", "datastax:spark-cassandra-connector:3.3.0-s_2.12")
                .set("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
                .set("spark.cassandra.auth.username", sparkSettings.getCassandraUser())
                .set("spark.cassandra.auth.password", sparkSettings.getCassandraPass())
                .set("spark.cassandra.connection.host", sparkSettings.getCassandraHosts());

        if (sparkSettings.isKubernetesEnabled()) { // (3)
            sparkConf.set("spark.kubernetes.container.image", sparkSettings.getExecutorImage());

            if (sparkSettings.getDeployMode().equals("client")) {
                sparkConf
                    .set("spark.kubernetes.driver.pod.name", sparkSettings.getHostName())
                    .set("spark.kubernetes.namespace", sparkSettings.getNamespace())
                    .set("spark.kubernetes.authenticate.driver.serviceAccountName", sparkSettings.getDriverSa());
            }
        }
        return new JavaSparkContext(sparkConf);
    }

(1) — если приложение (spark driver) не использует ресурсы GPU напрямую, следует указать в данной настройке «0», чтобы под него не резервировался необходимый Spark Executor'ам ресурс.

(2) — настройки cassandra spark connector. Для подключения экзекуторов к кластеру Cassandra следует указать данные настройки. Коннектор cassandra имеет большое число настроек, все можно найти здесь;

(3) — так как приложение может работать не только в k8s, настройки, которые предназначены только для него, не стоит указывать. Здесь должен быть указан, как минимум, необходимый для работы приложения образ spark executor, имя driver k8s Pod (можно получить с помощью InetAddress.getLocalHost().getHostName();), namespace, в котором будут создаваться executor Pods, и Service Account. Все это было создано ранее.

JavaSparkContext может быть передан в объект SparkSession, который далее будет использоваться для работы с executor'ами.

Работа с Cassandra в Spark

Получение данных

В сервисе будем использовать Dataset как объект с данными. Существуют так же RDD и Dataframe, и, почему в Java/Kotlin стоит использовать Dataset, можно почитать здесь и здесь.

Базовый метод получения данных из кассандра:

AbstractCassandraRepository
abstract class AbstractCassandraRepository constructor(
    private val sparkSession: SparkSession
) {

    companion object {
        internal const val keyspace: String = "instrument_data"
    }

    fun cassandraDataset(keyspace: String, table: String): Dataset<Row> {
        val cassandraDataset: Dataset<Row> = sparkSession.read()
            .format("org.apache.spark.sql.cassandra")
            .option("keyspace", keyspace)
            .option("table", table)
            .load()

        cassandraDataset.createOrReplaceTempView(table)
        return cassandraDataset
    }
}

Здесь мы работаем со спарковой сессией: указываем ей, что Dataset<Row> должен быть прочитан из кассандры, указываем кейспейс и таблицу, указываем имя датасета, чтобы в последующих запросах можно было обращаться к колонкам таблицы по ее имени.

Естественно, никого не интересует получать все данные и делать full scan таблицы — у каждой есть Partition Key, по которому нужно искать необходимые данные.

Рассмотрим пример получения данных из таблицы instrument_data.time_series_history:

instrument_data.time_series_history
create table instrument_data.time_series_history
(
    ticker      text,
    task_number uuid,
    datetime    timestamp,
    timeframe   text,
    close       decimal,
    high        decimal,
    low         decimal,
    open        decimal,
    volume      bigint,
    primary key (ticker, task_number, datetime, timeframe)
    )
    with compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'};

create index time_series_history_datetime_index
    on instrument_data.time_series_history (datetime);

create index time_series_history_task_number_index
    on instrument_data.time_series_history (task_number);

create index time_series_history_timeframe_index
    on instrument_data.time_series_history (timeframe);

У нее имеется Partition Key ticker и три Clustering Column:

task_number - номер задания на получение и анализ данных в смежной системе;

datetime - дата и время записи;

timeframe - таймфрейм временного ряда (1day в рассматриваемом случае).

Задача — доставать данные по ticker, task_number и datetime between (dateStart, dateEnd) из всех таблиц.

AbstractIndicatorRepository
abstract class AbstractIndicatorRepository constructor(
    sparkSession: SparkSession,
    private val table : String
) : AbstractCassandraRepository(sparkSession) {

    fun getBaseDataSet(
        ticker: String,
        taskNumber : UUID,
        dateStart : LocalDate,
        dateEnd : LocalDate
    ): Dataset<Row> {
        return cassandraDataset(table)
            .filter(
                functions.col("ticker").equalTo(ticker)
                    .and(functions.col("task_number").equalTo(taskNumber.toString()))
                    .and(functions.col("datetime").between(dateStart, dateEnd)))
    }
}

Таким образом, можно обратиться к предыдущему методу и отфильтровать значения. Функционал данного метода будет аналогичен запросу данных из таблицы с указанными полями. При этом Spark не будет доставать весь датасет, затем его фильтровать — в экзекуторы попадут уже отфильтрованные данные.

Данный метод аналогичен запросу:

select *
from instrument_data.time_series_history
where ticker = ?1
  AND task_number = ?2
  AND datetime > ?3 and datetime < ?4;

Если не передавать partition key, увеличение времени запроса можно отследить на DAG запроса в Spark UI. Это лежит на поверхности, можно очень быстро разобраться, на получении каких данных запрос тормозит работу.

Чтобы выполнить select по определенным столбцам, уже следует создать репозиторий для отдельной таблицы и выполнить запрос следующим образом:

TimeSeriesRepository
@Component
class TimeSeriesRepository(
    sparkSession: SparkSession
) : AbstractIndicatorRepository(sparkSession, "time_series_history") {

    fun getDataset(
        ticker: String,
        taskNumber : UUID,
        dateStart : LocalDate,
        dateEnd : LocalDate
    ) : Dataset<Row> {
        val dataset = getBaseDataSet(ticker, taskNumber, dateStart, dateEnd)
            .selectExpr(
                "to_date(datetime) as dateTime",
                "CAST(close AS Double) as close"
            )

        dataset.createOrReplaceTempView("ts")
        return dataset
    }
}

Тогда конечный датасет вернет таблицу из двух столбцов с датой и временем и ценой закрытия, преобразованной в Double. Double необходим при передаче данных на вычисление в GPU.

Получаем все необходимые данные:

getMainDataset
private fun getMainDataset(
    ticker : String,
    taskNumber : UUID,
    dateStart : LocalDate,
    dateEnd : LocalDate,
    currentOffset : Int,
    batchSize : Int
) : Dataset<Row> {
    val timeSeries = timeSeriesRepository
        .getDataset(ticker, taskNumber, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("ts")
    val emaDataSet = emaRepository
        .getEmaDataSet(ticker, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("ema")
    val stochasticDataset = stochasticRepository
        .getStochasticDataSet(ticker, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("stoch")
    val bBandsDataset = bBandIndicatorRepository
        .getBBandsDataSet(ticker, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("bb")
    val macdDataset = macdRepository
        .getMacdDataSet(ticker, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("macd")
    val rsiDataset = rsiRepository
        .getRsiDataSet(ticker, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("rsi")
    val smaDataset = smaRepository
        .getSmaDataSet(ticker, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("sma")
    val willrDataset = willrRepository
        .getWillrDataSet(ticker, dateStart, dateEnd, currentOffset, batchSize)
        .`as`("willr")

    return combineDatasets(
        timeSeries, emaDataSet, stochasticDataset, bBandsDataset, macdDataset, rsiDataset, smaDataset, willrDataset
    )
}

Здесь у Java/Kotlin разработчика может возникнуть желание сделать запрос данных через CompletableFuture<Dataset<Row>> и выполнить их все в одном тредпуле. Однако, не стоит этого делать. Спарк сам распараллеливает работу и создает FutureTask's, программисту не стоит распараллеливать код на разные потоки.

и комбинируем:

combineDatasets
private fun combineDatasets(
    timeSeries : Dataset<Row>,
    emaDataSet : Dataset<Row>,
    stochasticDataset : Dataset<Row>,
    bBandsDataset : Dataset<Row>,
    macdDataset : Dataset<Row>,
    rsiDataset : Dataset<Row>,
    smaDataset : Dataset<Row>,
    willrDataset : Dataset<Row>
): Dataset<Row> {

    val result = timeSeries
        .join(emaDataSet, timeSeries.col("datetime")
                .equalTo(emaDataSet.col("datetime")), "leftouter")
        .join(stochasticDataset, timeSeries.col("datetime")
                .equalTo(stochasticDataset.col("datetime")), "leftouter")
        .join(bBandsDataset, timeSeries.col("datetime")
                .equalTo(bBandsDataset.col("datetime")), "leftouter")
        .join(macdDataset, timeSeries.col("datetime")
                .equalTo(macdDataset.col("datetime")), "leftouter")
        .join(rsiDataset, timeSeries.col("datetime")
                .equalTo(rsiDataset.col("datetime")), "leftouter")
        .join(smaDataset, timeSeries.col("datetime")
                .equalTo(smaDataset.col("datetime")), "leftouter")
        .join(willrDataset, timeSeries.col("datetime")
                .equalTo(willrDataset.col("datetime")), "leftouter")
        .selectExpr(
            "ts.dateTime as dateTime",
            "ema.emaTimePeriod as emaTimePeriod",
            "ema.ema as ema",
            "stoch.fastKPeriod as fastKPeriod",
            "stoch.slowKPeriod as slowKPeriod",
            "stoch.slowDPeriod as slowDPeriod",
            "stoch.slowD as slowD",
            "stoch.slowK as slowK",
            "bb.timePeriod as bbTimePeriod",
            "bb.sd as bbSd",
            "bb.lowerBand as lowerBand",
            "bb.middleBand as middleBand",
            "bb.upperBand as upperBand",
            "macd.signalPeriod as signalPeriod",
            "macd.fastPeriod as fastPeriod",
            "macd.slowPeriod as slowPeriod",
            "macd.macd as macd",
            "macd.macdHist as macdHist",
            "macd.macdSignal as macdSignal",
            "rsi.timePeriod as rsiTimePeriod",
            "rsi.rsi as rsi",
            "sma.timePeriod as smaTimePeriod",
            "sma.sma as sma",
            "willr.timePeriod as willrTimePeriod",
            "willr.willr as willr",
            "ts.close as close"
        )

    val windowSpec = Window.orderBy(functions.asc("dateTime"))
    return result
        .withColumn("id", functions.row_number().over(windowSpec))
        .filter(functions.col("dateTime").isNotNull)
}

В методе combineDatasets происходит объединение таблиц, которое Cassandra не поддерживает, а Spark предоставляет такую возможность.

В результирующий датасет добавляется колонка id, которая содержит номер строки. Здесь же предварительно сортируются данные по колонке dateTime и отфильтровываются строки с пустыми ячейками.

Стоит отметить, что есть иной способ работы с Dataset — можно обратиться к объекту sparkSession, вызвать метод sql() и передать в него обычный SQL запрос на объединение данных, и он будет работать.

Если все‑таки предпочтительно использовать SQL запросы, можно это делать, различий в скорости работы в моих без учета предварительных фильтров для датасетов не было замечено. Spark поддерживает все основные функции, с единственной оговоркой — не стоит писать свои собственные функции (UDF) и стараться выполнить запрос, это плохо влияет на производительность. За помощью в этом вопросе следует обратиться к оф. документации. Spark понимает ANSI формат. Если нужно перевести запрос, написанный на другом диалекте, можно воспользоваться инструментом.

В абзаце выше не просто так выделено полужирным с подчеркиванием одно важное условие. Допустим, мы получили датасет timeSeries, и сразу же хотим через sparkSession.sql() нативным запросом сджойнить остальные таблицы. Опытным путем установлено, что sparkSession при запросе через метод sql() с переданным нативным запросом делает фильтр только по основной таблице timeSeries, и не применяет фильтры для остальных таблиц, тем самым получая все данные из присоединяемых таблиц кассандры, что самым негативным образом сказывется на производительности — от 2 до 3 и более раз.

Использовать sparkSession.sql() можно, но только на тех данных, которые невозможно предварительно отфильтровать при выполнении основной логики джобы. А еще лучше — не использовать, и писать запросы через методы.

Рассмотрим метод:

getDatasetWithLabel
fun getDatasetWithLabel(
    ticker: String,
    taskNumber: UUID,
    dateStart: LocalDate,
    dateEnd: LocalDate,
    offset : Long,
    currentOffset : Int,
    batchSize : Int
): Dataset<Row> {
    val mainDataset = getMainDataset(ticker, taskNumber, dateStart, dateEnd, currentOffset, batchSize)
    return datasetWithLabel(mainDataset, offset)
}

Здесь происходит получение основных данных, затем они передаются в метод datasetWithLabel. Мы хотим натренировать модель машинного обучения, и нам нужен label - колонка со значениями цены закрытия торгов финансовым инструментом на дату offset позднее (в нашем случае 2 дня).

datasetWithLabel
private fun datasetWithLabel(
    mainDataset: Dataset<Row>,
    offset: Long
): Dataset<Row> {
    val labelDataset = getLabelDatasetFromMain(mainDataset, offset)

    val combinedDataset = labelDataset
        .join(
            mainDataset, labelDataset.col("id_eval")
                .equalTo(mainDataset.col("id")), LEFT_OUTER_JOIN
        )
        .withColumn("dateTimeUnix", functions.unix_timestamp(functions.col("dateTime")))
        .withColumn("labelDateTimeUnix", functions.unix_timestamp(functions.col("labelDateTime")))

    return combinedDataset
        .withColumn(
            "id",
            functions.coalesce(combinedDataset.col("id_eval"), combinedDataset.col("id"))
        )
        .drop("id_eval")
        .selectExpr(*allColumns)
        .filter(functions.col("dateTime").isNotNull)
        .orderBy(functions.asc("dateTime"))
}

private fun getLabelDatasetFromMain(
    mainDataset: Dataset<Row>,
    offset: Long
): Dataset<Row> {
    val windowSpec = Window.orderBy(functions.asc("dateTime"))
    val labelDateTimeColumn = functions.lead(mainDataset.col("dateTime"), offset.toInt()).over(windowSpec)
    val labelColumn = functions.lead(mainDataset.col("close"), offset.toInt()).over(windowSpec)

    val labelDataset = mainDataset
        .withColumn("labelDateTime", labelDateTimeColumn)
        .withColumn("label", labelColumn)
        .filter(functions.col("labelDateTime").isNotNull)
        .select("labelDateTime", "label")
        .orderBy("labelDateTime")

    return labelDataset
        .withColumn("id_eval", functions.row_number()
            .over(Window.orderBy(functions.asc("labelDateTime"))))
        .select("id_eval", "labelDateTime", "label")
}

В методе getLabelDatasetFromMain мы используем полученный исходный датасет для как источник данных для новых колонок - используя функцию lead

val labelDateTimeColumn = functions.lead(mainDataset.col("dateTime"), offset.toInt()).over(windowSpec)
val labelColumn = functions.lead(mainDataset.col("close"), offset.toInt()).over(windowSpec)

Мы определяем дату и цену финансового инструмента на offset значений вперед. Следует понимать, что на бирже ведутся не каждый день, есть выходные и праздничные дни, когда биржа не работает. Таким образом, в исходных датасетах всегда будут промежутки по датам. Можно решать это путем модификации данных — заполнять промежутки с отсутствующими датами ценой последнего закрытия. А можно воспользоваться функцией lead() на имеющемся датасете. В данном примере я пошел именно таким путем, заодно получился хороший пример использования функций spark sql и составления датасетов. В этом же методе присоединяется новая колонка с id_eval.

Затем в методе datasetWithLabel два датасета объединяются по значениям id_eval и id, в результате чего получается финальный датасет.

Нам интересно получать данные батчами и перекладывать их в новую таблицу.

У меня получился такой не хитрый код для этой задачи:

DataTransformService
@Service
class DataTransformService(
    private val dataReaderService: DataReaderService,
    private val combinedDataRepository : CombinedDataRepository
) {

    companion object {
        private val log = LogManager.getLogger(this::class.java.name)
    }

    fun transformData(
        ticker : String,
        taskNumber : UUID,
        dateStart : LocalDate,
        dateEnd : LocalDate,
        offset : Long,
        batchSize : Int
    ) {
        var currentBatchOffset = 0
        var i = 0

        var tdf: Dataset<Row>?
        do {
            log.info("Data transformation: task {}, iteration {},: currentOffset {}",
                taskNumber,i + 1, currentBatchOffset)

            tdf = dataReaderService.getDatasetWithLabel(
                ticker, taskNumber, dateStart, dateEnd, offset, currentBatchOffset, batchSize
            )
            if (tdf.isEmpty) break

            combinedDataRepository.saveData(tdf.selectExpr(*DataReaderService.allColumns), ticker, taskNumber)
            currentBatchOffset += batchSize
            i++
        } while (tdf?.isEmpty == false)

        log.info("Data transformation of task {} completed", taskNumber)
    }
}

в методе combinedDataRepository.saveData(tdf.selectExpr(*DataReaderService.allColumns), ticker, taskNumber) производится сохранение полученного батча.

Сохранение данных из Spark в Cassandra

Сохранение в Cassandra также производится через коннектор:

fun saveDataSet(dataset: Dataset<Row>) {
    dataset.write()
        .format("org.apache.spark.sql.cassandra")
        .mode("append")                             // (1)
        .option("confirm.truncate", "false")        // (2)
        .option("keyspace", keyspace)
        .option("table", table)
        .save();
}

здесь, кроме уже очевидных параметров, есть два заслуживающих внимания:

(1) - режим, при котором производится запись. append - добавляет данные;

(2) - подтверждение удаления ранее записанных данных. Здесь выставлен false, и, в сочетании с предыдущей настройкой, спарк не будет стирать данные в кассандре при записи. С настройками

.mode("overwrite")
.option("confirm.truncate", "true")

имеется возможность очистить таблицу перед записью новых данных.

Перед записью предварительно можно подшаманить датасет для совпадения столбцов и прочих нужд:

CombinedDataRepository
@Component
class CombinedDataRepository(sparkSession: SparkSession)
    : AbstractIndicatorRepository(sparkSession, "combined_data") {

    fun saveData(
        dataset : Dataset<Row>,
        ticker : String,
        taskNumber : UUID
    ) {
        val modifiedDataset = dataset
            .withColumn("dateTime",
                from_unixtime(col("dateTimeUnix")))
            .selectExpr(
            "dateTime as datetime",
            "bbTimePeriod as bbands_time_period",
            "bbSd as sd",

            "emaTimePeriod as ema_time_period",
            "ema",

            "fastKPeriod as fast_k_period",
            "slowKPeriod as slow_k_period",
            "slowDPeriod as slow_d_period",
            "slowD as slow_d",
            "slowK as slow_k",

            "lowerBand as lower_band",
            "middleBand as middle_band",
            "upperBand as upper_band",

            "signalPeriod as macd_signal_period",
            "fastPeriod as macd_fast_period",
            "slowPeriod as macd_slow_period",
            "macd",
            "macdHist as macd_hist",
            "macdSignal as macd_signal",

            "rsiTimePeriod as rsi_time_period",
            "rsi",

            "smaTimePeriod as sma_time_period",
            "sma",

            "willrTimePeriod as willr_time_period",
            "willr",

            "close")
            .withColumn("ticker", lit(ticker))
            .withColumn("task_number", lit(taskNumber.toString()))
            .withColumn("timeframe", lit("1day"))
            .na().drop()

        saveDataSet(modifiedDataset)
    }
}

Здесь происходит преобразование имен столбцов к формату целевой таблицы. Также UUID приводится к string, проставляется константа таймфрейма (потому что потому), и, если есть строки с пустыми значениями, таковые удаляются.

Маппинг результатов

Получить результат работы Dataset<Row> можно несколькими способами:

используя Encoder:

val result = joinResult.as(Encoders.bean(ModelTrainResult.class));
val resultList : List<ModelTrainResult> = result.collectAsList();

или вручную разбирая каждый объект Row:

ModelTrainResult
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
data class ModelTrainResult @JsonCreator constructor(
    @get:JsonProperty("date_time") val dateTime : LocalDateTime,
    @get:JsonProperty("close") val close : Double,
    @get:JsonProperty("label_datetime") val labelDateTime : LocalDateTime,
    @get:JsonProperty("real_result") val realResult : Double,
    @get:JsonProperty("prediction") val prediction : Double,
    @get:JsonProperty("error") val error : Double
) {
    companion object {
        private val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")

        fun listFromDataset(dataset: Dataset<Row>): List<ModelTrainResult> {
            val list = mutableListOf<ModelTrainResult>()
            val iterator = dataset.toLocalIterator()
            while (iterator.hasNext()) {
                val row = iterator.next()
                list.add(fromRow(row))
            }
            return list
        }

        private fun fromRow(row: Row): ModelTrainResult {
            return ModelTrainResult(
                LocalDateTime.parse(row.getString(0), formatter),
                row.getDouble(1),
                LocalDateTime.parse(row.getString(2), formatter),
                row.getDouble(3),
                row.getDouble(4),
                row.getDouble(5)
            )
        }
    }

По моему мнению, второй вариант предпочтительнее для сложных структур, требующих преобразования из типов данных в датасете в целевой тип данных, как, например, дата в примере выше. Но первый вариант так же имеет место быть, его можно использовать для структур, состоящих, к примеру, только из строк.

Заметки о производительности после рассмотрения запросов

Следует понимать, что операции join, groupBy, shuffle (выполняется самим спарком в момент мапинга строк датасета в java pojo, например) — очень дорогие по времени операции, обязательно требующие тюнинга. Спарк — распределенная система, и как всякая распределенная система, требует синхронизации для централизованного получения результата. Датасеты получаются на разных экзекуторах, и во время join операций экзекуторам приходится обмениваться имеющейся у них информацией, что явно влечет затраты на сериализацию, сетевой вызов, десериализацию, дисковый ввод‑вывод и непосредственно сопоставление полученных данных.

Снизить негативное влияние shuffle и join можно, есть очень хорошая статья на эту тему.

В Spark вы пишете код, который преобразует данные, этот код обрабатывается лениво и под капотом преобразуется в план запроса, который материализуется, когда вы вызываете действие, такое как collect () или write (). Spark делит данные на разделы, которые обрабатываются испол нителями, каждый из которых будет обрабатывать набор разделов. Операции, выполняемые в пределах одного раздела, называются узкими операциями и включают в себя такие функции, как map или filter. С другой стороны, агрегации — это широкие операции.которые требуют перемещения данных между узлами, что очень дорого. Сам план запроса может быть двух основных типов: логический план и физический план, который мы обсудим позже.

Следует помнить, что главное правило, касающееся производительности Spark, звучит так: Minimize Data Shuffles. Это вызвано широкими операциями в Spark, такими как соединения или агрегации, которые очень дороги из‑за перетасовки данных.

Привожу еще несколько ссылок на тему тюнинга:

Универсальный совет такой:

Но, к сожалению, зачастую, особенно, в распределенной системе, без shuffle не обойтись.

Обучение модели

Подробно останавливаться на обучении модели не вижу смысла, в предыдущей статье разобраны примеры. В этом примере покажу, как обучал XGBoostRegressor и опишу результаты и найденные проблемы.

trainModel
fun trainModel(
    ticker : String,
    taskNumber : UUID,
    dateStart : LocalDate,
    dateEnd : LocalDate,
    evalPivotPoint : Long,
    offset : Long,
    modelParameters : AnalyticsRequest.ModelParameters
) : ModelTrainResultResponse {
    val pivot = dateEnd.minusDays(evalPivotPoint)

    val tdf = dataReaderService.getDatasetWithLabel(ticker, taskNumber, dateStart, pivot, offset)
    val edf = dataReaderService.getDatasetWithLabel(ticker, taskNumber, pivot, dateEnd, offset)
        .selectExpr(*allColumns)

    val modelParams = createModelParams(modelParameters)
    val regressor = xgBoostRegressor(modelParams)

    val model: PredictionModel<Vector, XGBoostRegressionModel> = regressor.fit(tdf)
    val predictions = model.transform(edf)

    combinedDataRepository.saveData(tdf.selectExpr(*allColumns).unionAll(edf), ticker, taskNumber)
    modelService.saveModel(model, taskNumber, modelParameters)

    val result = predictions.withColumn("error", col("prediction").minus(col(labelName)))
    return ModelTrainResultResponse(ModelTrainResult.listFromDataset(result.selectExpr(*resultExp)))
}

Получаем train и eval датасеты, создаем новую модель, обучаем и сохраняем в postgres. В этом примере используется датасет из таблицы, в которую мы предварительно трансформировали данных в предыдущем разделе статьи.

saveModel
fun saveModel(
    model : PredictionModel<Vector, XGBoostRegressionModel>,
    taskId : UUID,
    modelParameters : AnalyticsRequest.ModelParameters
) {
    val byteArrayOutputStream = ByteArrayOutputStream()
    ObjectOutputStream(byteArrayOutputStream).use { it.writeObject(model) }
    val modelByteArray: ByteArray = byteArrayOutputStream.toByteArray()
    val jsonParams : JsonNode = objectMapper.convertValue(modelParameters, JsonNode::class.java)

    val entity = ModelEntity(modelByteArray, taskId, jsonParams)
    modelRepository.save(entity)
    log.info("Model for task id {} saved. Parameters map: {}, jsonNode: {}",
        taskId, modelParameters, jsonParams)
}

Здесь модель преобразуется в байтовый поток и сохраняется в ячейку с типом данных bytea.

Параметры модели преобразуются в Json и сохраняются в одну таблицу с моделью.

Использование сохраненной модели

Сохраненную модель можно загрузить для дальнейшего использования и/или переобучения.

loadModel
internal inline fun <reified T> loadModel(modelId: Long): T {
    val optional = modelRepository.findById(modelId)

    val entity = optional.get()
    val modelByteArray = entity.model

    val byteArrayInputStream = ByteArrayInputStream(modelByteArray)
    val modelObject = ObjectInputStream(byteArrayInputStream).use { it.readObject() }

    if (modelObject is T) {
        return modelObject
    } else {
        throw ServiceException.withMessage("Model id $modelId has incorrect format")
    }
}

predictWithExistingModel
fun predictWithExistingModel(
    ticker : String,
    taskNumber : UUID,
    dateStart : LocalDate,
    dateEnd : LocalDate,
    modelId : Long
): StockPredictDto {
    val model: PredictionModel<Vector, XGBoostRegressionModel> = modelService.loadModel(modelId)
    val data = dataReaderService.getMainDataset(ticker, taskNumber, dateStart, dateEnd)

    var predictions = model.transform(data)
    predictions = predictions.select("dateTime", "prediction")
    return StockPredictDto.fromDataset(predictions)
}

Проблемы

1. Я пытался реализовать инкрементальное обучение модели XGBoost, но столкнулся с тем, что его реализация на Java (в отличие от Python) не поддерживает данный тип, и модель с каждым инкрементом не апдейтится, а переобучается, поэтому требуется полный набор данных для обучения. Поэтому примера с инкрементальным и потоковым обучением показать в рамках данной статьи не получится, но есть простор для новых исследований.

Ответы разработчиков XGboost по этой теме привожу здесь:

2. В виду того, что данные преобразуются в Double для передачи на GPU, в результирующей таблице наблюдается классическая проблема с хранением типов с плавающей точкой:

способов решения данной проблемы не нашел, нужен дополнительный ресерч.

3. Определение партиций при использовании оконных функций в распределенной системе - если их не определить, то есть вероятность существенной деградации производительности. Тему мало ресерчил, нужно отдельно погружаться и много тестировать. При первом подходе получал не корректные результаты объединения таблиц.

Ладно тебе со своей биг датой, что с ценой акций?

Если вкратце, то результаты так себе (: С наскока тему аналитики движения цены финансовых инструментов не возьмешь, особенно основываясь только на технических индикаторах. В этой предметной области нужно подбирать параметры технических индикаторов, сами индикаторы, и другие данные, исходя из личной торговой стратегии. В данном примере используется по одному индикатору EMA и SMA, хотя в реальном случае их может быть больше, с различными параметрами. Также индикаторы делятся на опережающие и запаздывающие, и нужно исходить из задачи, что нужно получить, чтобы в итоге достичь результата. Дополнительно нужно делать модели, анализирующие фигуры технического анализа и потоки новостей из авторитетных источников, и комбинировать модели под свою задачу.

Особенно плохо получившаяся модель реагирует на «выбросы», так, на скриншоте можно видеть, что последний скачок цены с $305 до $379 предсказать не удалось, да и еще предикт пошел в сторону уменьшения.

Но в целом я результатом доволен: удалось реализовать инфраструктуру, обладающую широкими возможностями горизонтального масштабирования, благодаря Kubernetes, Spark и Cassandra, и при этом есть возможность писать код на Java/Kotlin, которые мне импонируют на порядок больше, чем Python.

Тесты производительности распределенной системы по сравнению с одним инстансом не проводились — возможно, будет темой отдельной статьи. Нужно найти достаточно большой датасет, подобрать параметры модели для приемлемых практических результатов и подготовить тестовые стенды.

Полезные ссылки

About client and cluster modes:

XGboost

NVidia guides

Книги по Machine Learning в финансах

Комментарии (2)


  1. barloc
    03.06.2023 15:20
    +2

    Слишком сложно, а значит дорого, и тем более бесполезно :)
    Даже для обзора - дорого. Кассандра - очень специфичный выбор, особенно для биг даты, с огромной кучей своих подводных камней.

    И даже спарк там особо ничем помочь не сможет :( У меня был опыт как раз чтения котировок из кассандры - это больно, долго и нудно. Обычная реляционка типа pg или mysql окажется гораздо лучше.


    1. Dartya Автор
      03.06.2023 15:20
      +1

      Спасибо за комментарий :)

      Действительно, система крайне сложная для запуска, и кассандра специфичный инструмент, попытка использовать ее как реляционку с помощью спарка - та еще проблема, требующая большого ресерча, и с наскока задачу не решить.

      Но у меня самого на работе была такая задача, и мне пришлось ее решать. В данной статье постарался изложить накопленный опыт. Исходил из того, что если бы мне самому попалась в свое время такая статья, мне было бы намного проще. Надеюсь, кому-нибудь поможет.