Привет, Хабр!

Сегодня я расскажу о разработке кастомного планировщика pod'ов для HPC‑нагрузок в Kubernetes с учётом NUMA и специфичных требований. Рассмотрим код с примером для магазина корма для собачек и всеми нюансами реализации.

Но для начала... зачем вообще нужен кастомный планировщик для HPC?

Kubernetes в своём стандартном виде отлично справляется с большинством задач. Но когда речь идёт о высокопроизводительных вычислениях (HPC), стандартный scheduler может не выдержать требования:

  • Тщательное распределение ресурсов. HPC‑приложения чувствительны к задержкам, фрагментации памяти и неправильному распределению вычислительных ядер.

  • Учёт NUMA‑топологии. В архитектуре NUMA скорость доступа к памяти зависит от её физического расположения относительно процессора — и тут ошибка может стоить дорого.

  • Специфичные требования к CPU, памяти и даже специализированному оборудованию (GPU, RDMA и т. п.).

Поэтому напишем собственный планировщик, который:

  • Отбирает pod'ы с меткой hpc=true.

  • Анализирует аннотации узлов с описанием NUMA‑топологии.

  • Оценивает узлы по доступным ресурсам и специфическим требованиям pod'ов.

  • Надёжно связывает pod с выбранным узлом через Kubernetes API.

Основные функции нашего кастомного HPC-планировщика

Фильтрация pod'ов по меткам HPC

Не все pod'ы требуют особой обработки. Будем отбирать только те, что действительно настроены для HPC, используя метку hpc=true. Таким образом, стандартный scheduler продолжит работать для остальных задач, а наш планировщик займётся только критичными нагрузками.

Анализ NUMA‑топологии узлов

Каждый узел в кластере снабжён аннотацией, описывающей его NUMA‑топологию, например:

[
  {"socketId": 0, "cores": 8, "memory": 32768},
  {"socketId": 1, "cores": 8, "memory": 32768}
]

Это позволяет учитывать не только общее количество ядер и памяти, но и распределение этих ресурсов по сокетам, что критично для HPC‑задач.

Оценка узлов

После фильтрации узлов планировщик оценивает их по нескольким параметрам:

  • Доступность требуемых ядер и памяти.

  • Оптимальное распределение по NUMA‑сокетам.

  • Наличие дополнительных ресурсов (например, GPU).

  • Специфичные требования pod'а, передаваемые через аннотации (например, минимальное число ядер для приложения DogFoodShop).

Безопасное связывание pod»ов с узлами

Наконец, когда выбран наилучший узел, планировщик связывает pod с ним с помощью Kubernetes API. Обработка ошибок на каждом этапе гарантирует, что никакие сбои не останутся незамеченными.

Архитектура и реализация

Будем писать на моем любимом Go, ведь это язык, на котором написан сам Kubernetes. Структура проекта довольно проста:

  • Фильтрация pod'ов: отслеживание событий создания pod'ов с помощью информеров.

  • Оценка узлов: считывание аннотаций узлов, разбор NUMA‑топологии и вычисление баллов.

  • Binding: связывание pod'а с выбранным узлом через API Kubernetes.

Объявление пакета и импорт необходимых библиотек

Объявляем пакет main и подключаем все нужные пакеты. Стандартные пакеты (например, context, encoding/json, flag, log, sort и time) понадобятся для работы с контекстами, JSON, аргументами командной строки, логированием, сортировкой и таймерами. Библиотеки из k8s.io — это мост к Kubernetes API, информерам и настройке клиента.

package main

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"sort"
	"time"

	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
)

Определение структур: NumaInfo и Scheduler

В этой части кода мы объявляем структуру NumaInfo, которая описывает характеристики одного NUMA‑сокета (идентификатор сокета, количество ядер и объём памяти). Также создаём структуру Scheduler — кастомный планировщик, который будет хранить клиента Kubernetes и информер для отслеживания событий pod'ов.

apiVersion: v1
kind: Pod
metadata:
  name: dogfoodshop-hpc
  labels:
    app: dogfoodshop
    hpc: "true"  # Обязательно для нашего кастомного планировщика!
  annotations:
    dogshop.minCores: "16"  # Минимальное требование к количеству ядер
spec:
  containers:
    - name: dogfoodshop
      image: dogfoodshop/hpc:latest
      resources:
        limits:
          cpu: "16"
          memory: "32Gi"
        requests:
          cpu: "16"
          memory: "32Gi"

Конструктор планировщика

Здесь создаём функцию‑конструктор, которая принимает клиент Kubernetes и создаёт фабрику информеров для отслеживания pod'ов. Именно информер будет следить за событиями создания pod'ов и передавать их нашему планировщику.

apiVersion: v1
kind: Node
metadata:
  name: node-1
  annotations:
    hpc.numa/topology: '[{"socketId":0,"cores":8,"memory":32768},{"socketId":1,"cores":8,"memory":32768}]'

Запуск планировщика и обработка событий

Функция Run запускает информер и регистрирует обработчик событий. Обработчик onAddPod вызывается при создании нового pod'а. Здесь фильтруем pod'ы по метке hpc=true, чтобы работать только с нужными HPC‑задачами.

// Run запускает планировщик
func (s *Scheduler) Run(stopCh <-chan struct{}) {
	defer func() {
		if r := recover(); r != nil {
			log.Printf("Recovered from panic: %v", r)
		}
	}()
	log.Println("Запускаем кастомный HPC-планировщик...")
	s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: s.onAddPod,
	})
	s.informer.Run(stopCh)
}

// onAddPod вызывается при создании нового pod’а
func (s *Scheduler) onAddPod(obj interface{}) {
	pod, ok := obj.(*v1.Pod)
	if !ok {
		return
	}

	// Фильтруем pod’ы, которые не предназначены для HPC
	if val, ok := pod.Labels["hpc"]; !ok || val != "true" {
		return
	}

	log.Printf("Найден HPC-pod: %s/%s", pod.Namespace, pod.Name)
	// Пробуем назначить pod на подходящий узел
	err := s.schedulePod(context.Background(), pod)
	if err != nil {
		log.Printf("Ошибка при планировании pod’а %s/%s: %v", pod.Namespace, pod.Name, err)
	}
}

Выбор узла для pod’а и связывание

Здесь переходим к «умному» выбору узла: функция schedulePod получает список узлов, фильтрует их по готовности и NUMA‑аннотациям, оценивая их баллами, чтобы выбрать лучший кандидат; при этом filterAndScoreNodes для каждого узла проверяет его готовность, считывает NUMA‑информацию и суммирует количество ядер, учитывая специальные требования (например, для DogFoodShop требуется минимум 16 ядер), а функция isNodeReady просто определяет, находится ли узел в рабочем состоянии; далее getNodeNumaInfo декодирует NUMA‑топологию, сохранённую в аннотациях узла, и, наконец, bindPod создаёт объект binding и связывает pod с выбранным узлом через API.

// schedulePod – логика выбора узла и связывания pod’а с ним
func (s *Scheduler) schedulePod(ctx context.Context, pod *v1.Pod) error {
	// Получаем список всех узлов
	nodes, err := s.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
	if err != nil {
		return fmt.Errorf("не удалось получить список узлов: %w", err)
	}

	// Фильтруем узлы с учётом NUMA и ресурсных требований
	candidateNodes := s.filterAndScoreNodes(nodes.Items, pod)
	if len(candidateNodes) == 0 {
		return fmt.Errorf("нет подходящих узлов для pod %s/%s", pod.Namespace, pod.Name)
	}

	// Выбираем лучший узел (с максимальным баллом)
	bestNode := candidateNodes[0].nodeName
	log.Printf("Выбран узел '%s' для pod %s/%s (балл: %d)", bestNode, pod.Namespace, pod.Name, candidateNodes[0].score)

	// Выполняем binding pod’а к выбранному узлу
	return s.bindPod(ctx, pod, bestNode)
}

// nodeScore – структура для хранения оценки узла
type nodeScore struct {
	nodeName string
	score    int
}

// filterAndScoreNodes фильтрует узлы и присваивает им баллы
func (s *Scheduler) filterAndScoreNodes(nodes []v1.Node, pod *v1.Pod) []nodeScore {
	var scores []nodeScore

	for _, node := range nodes {
		// Пропускаем недоступные узлы
		if !isNodeReady(&node) {
			continue
		}

		// Получаем NUMA-информацию из аннотаций узла
		numaInfo, err := getNodeNumaInfo(node)
		if err != nil {
			log.Printf("Ошибка получения NUMA-информации с узла %s: %v", node.Name, err)
			continue
		}

		// Оцениваем узел: суммируем количество ядер всех сокетов
		totalCores := 0
		for _, socket := range numaInfo {
			totalCores += socket.Cores
		}

		// Здесь можно учитывать дополнительные требования pod’а (например, память, GPU, RDMA)
		score := totalCores

		// Пример: если pod предназначен для DogFoodShop и требует минимум 16 ядер, узлы с меньшим количеством получат штраф
		if val, ok := pod.Annotations["dogshop.minCores"]; ok && val == "16" && totalCores < 16 {
			score = 0
		}

		if score > 0 {
			scores = append(scores, nodeScore{
				nodeName: node.Name,
				score:    score,
			})
		}
	}

	// Сортируем узлы по убыванию баллов
	sort.Slice(scores, func(i, j int) bool {
		return scores[i].score > scores[j].score
	})

	return scores
}

// isNodeReady проверяет, находится ли узел в состоянии Ready
func isNodeReady(node *v1.Node) bool {
	for _, condition := range node.Status.Conditions {
		if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue {
			return true
		}
	}
	return false
}

// getNodeNumaInfo декодирует NUMA-информацию из аннотаций узла
func getNodeNumaInfo(node v1.Node) ([]NumaInfo, error) {
	annotation, ok := node.Annotations["hpc.numa/topology"]
	if !ok {
		return nil, fmt.Errorf("аннотация hpc.numa/topology не найдена")
	}

	var numaInfo []NumaInfo
	err := json.Unmarshal([]byte(annotation), &numaInfo)
	if err != nil {
		return nil, fmt.Errorf("ошибка декодирования NUMA-информации: %w", err)
	}
	return numaInfo, nil
}

// bindPod связывает pod с выбранным узлом
func (s *Scheduler) bindPod(ctx context.Context, pod *v1.Pod, nodeName string) error {
	// Создаем объект Binding
	binding := &v1.Binding{
		ObjectMeta: metav1.ObjectMeta{
			Namespace: pod.Namespace,
			Name:      pod.Name,
		},
		Target: v1.ObjectReference{
			Kind: "Node",
			Name: nodeName,
		},
	}

	// Выполняем вызов API для binding-а
	err := s.clientset.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
	if err != nil {
		return fmt.Errorf("не удалось выполнить binding: %w", err)
	}
	log.Printf("Pod %s/%s успешно связан с узлом %s", pod.Namespace, pod.Name, nodeName)
	return nil
}

Функция main

В функции main настраиваем подключение к Kubernetes. Если указан kubeconfig, используем его для локальной разработки, иначе — предполагаем, что код работает внутри кластера (in‑cluster config). Затем создаём клиента, инициализируем планировщик и запускаем его в отдельном горутине. В конце блокируем выполнение, чтобы приложение не завершилось.

func main() {
	// Читаем параметры командной строки
	kubeconfig := flag.String("kubeconfig", "", "Путь до kubeconfig файла")
	flag.Parse()

	var config *rest.Config
	var err error
	if *kubeconfig != "" {
		// Локальная разработка: используем kubeconfig
		config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
		if err != nil {
			log.Fatalf("Ошибка создания конфигурации из kubeconfig: %v", err)
		}
	} else {
		// На продакшене: предполагаем работу внутри кластера
		config, err = rest.InClusterConfig()
		if err != nil {
			log.Fatalf("Ошибка создания in-cluster конфигурации: %v", err)
		}
	}

	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		log.Fatalf("Ошибка создания клиента Kubernetes: %v", err)
	}

	scheduler := NewScheduler(clientset)
	stopCh := make(chan struct{})
	go scheduler.Run(stopCh)

	// Блокируем выполнение, чтобы приложение не завершалось
	select {}
}
Полный код в спойлере
package main

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"sort"
	"time"

	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
)

// NumaInfo описывает характеристики одного NUMA-сокета
type NumaInfo struct {
	SocketID int `json:"socketId"`
	Cores    int `json:"cores"`
	Memory   int `json:"memory"` // в мегабайтах
}

// Scheduler – структура нашего кастомного планировщика
type Scheduler struct {
	clientset kubernetes.Interface
	informer  cache.SharedIndexInformer
}

// NewScheduler создаёт новый экземпляр Scheduler
func NewScheduler(clientset kubernetes.Interface) *Scheduler {
	// Создаём фабрику информеров для отслеживания pod’ов
	factory := informers.NewSharedInformerFactory(clientset, time.Minute)
	podInformer := factory.Core().V1().Pods().Informer()

	return &Scheduler{
		clientset: clientset,
		informer:  podInformer,
	}
}

// Run запускает планировщик
func (s *Scheduler) Run(stopCh <-chan struct{}) {
	defer func() {
		if r := recover(); r != nil {
			log.Printf("Recovered from panic: %v", r)
		}
	}()
	log.Println("Запускаем кастомный HPC-планировщик...")
	s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: s.onAddPod,
	})
	s.informer.Run(stopCh)
}

// onAddPod вызывается при создании нового pod’а
func (s *Scheduler) onAddPod(obj interface{}) {
	pod, ok := obj.(*v1.Pod)
	if !ok {
		return
	}

	// Фильтруем pod’ы, которые не предназначены для HPC
	if val, ok := pod.Labels["hpc"]; !ok || val != "true" {
		return
	}

	log.Printf("Найден HPC-pod: %s/%s", pod.Namespace, pod.Name)
	// Пробуем назначить pod на подходящий узел
	err := s.schedulePod(context.Background(), pod)
	if err != nil {
		log.Printf("Ошибка при планировании pod’а %s/%s: %v", pod.Namespace, pod.Name, err)
	}
}

// schedulePod – логика выбора узла и связывания pod’а с ним
func (s *Scheduler) schedulePod(ctx context.Context, pod *v1.Pod) error {
	// Получаем список всех узлов
	nodes, err := s.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
	if err != nil {
		return fmt.Errorf("не удалось получить список узлов: %w", err)
	}

	// Фильтруем узлы с учётом NUMA и ресурсных требований
	candidateNodes := s.filterAndScoreNodes(nodes.Items, pod)
	if len(candidateNodes) == 0 {
		return fmt.Errorf("нет подходящих узлов для pod %s/%s", pod.Namespace, pod.Name)
	}

	// Выбираем лучший узел (с максимальным баллом)
	bestNode := candidateNodes[0].nodeName
	log.Printf("Выбран узел '%s' для pod %s/%s (балл: %d)", bestNode, pod.Namespace, pod.Name, candidateNodes[0].score)

	// Выполняем binding pod’а к выбранному узлу
	return s.bindPod(ctx, pod, bestNode)
}

// nodeScore – структура для хранения оценки узла
type nodeScore struct {
	nodeName string
	score    int
}

// filterAndScoreNodes фильтрует узлы и присваивает им баллы
func (s *Scheduler) filterAndScoreNodes(nodes []v1.Node, pod *v1.Pod) []nodeScore {
	var scores []nodeScore

	for _, node := range nodes {
		// Пропускаем недоступные узлы
		if !isNodeReady(&node) {
			continue
		}

		// Получаем NUMA-информацию из аннотаций узла
		numaInfo, err := getNodeNumaInfo(node)
		if err != nil {
			log.Printf("Ошибка получения NUMA-информации с узла %s: %v", node.Name, err)
			continue
		}

		// Оцениваем узел: суммируем количество ядер всех сокетов
		totalCores := 0
		for _, socket := range numaInfo {
			totalCores += socket.Cores
		}

		// Здесь можно учитывать дополнительные требования pod’а (например, память, GPU, RDMA)
		score := totalCores

		// Пример: если pod предназначен для DogFoodShop и требует минимум 16 ядер, узлы с меньшим количеством получат штраф
		if val, ok := pod.Annotations["dogshop.minCores"]; ok && val == "16" && totalCores < 16 {
			score = 0
		}

		if score > 0 {
			scores = append(scores, nodeScore{
				nodeName: node.Name,
				score:    score,
			})
		}
	}

	// Сортируем узлы по убыванию баллов
	sort.Slice(scores, func(i, j int) bool {
		return scores[i].score > scores[j].score
	})

	return scores
}

// isNodeReady проверяет, находится ли узел в состоянии Ready
func isNodeReady(node *v1.Node) bool {
	for _, condition := range node.Status.Conditions {
		if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue {
			return true
		}
	}
	return false
}

// getNodeNumaInfo декодирует NUMA-информацию из аннотаций узла
func getNodeNumaInfo(node v1.Node) ([]NumaInfo, error) {
	annotation, ok := node.Annotations["hpc.numa/topology"]
	if !ok {
		return nil, fmt.Errorf("аннотация hpc.numa/topology не найдена")
	}

	var numaInfo []NumaInfo
	err := json.Unmarshal([]byte(annotation), &numaInfo)
	if err != nil {
		return nil, fmt.Errorf("ошибка декодирования NUMA-информации: %w", err)
	}
	return numaInfo, nil
}

// bindPod связывает pod с выбранным узлом
func (s *Scheduler) bindPod(ctx context.Context, pod *v1.Pod, nodeName string) error {
	// Создаем объект Binding
	binding := &v1.Binding{
		ObjectMeta: metav1.ObjectMeta{
			Namespace: pod.Namespace,
			Name:      pod.Name,
		},
		Target: v1.ObjectReference{
			Kind: "Node",
			Name: nodeName,
		},
	}

	// Выполняем вызов API для binding-а
	err := s.clientset.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
	if err != nil {
		return fmt.Errorf("не удалось выполнить binding: %w", err)
	}
	log.Printf("Pod %s/%s успешно связан с узлом %s", pod.Namespace, pod.Name, nodeName)
	return nil
}

func main() {
	// Читаем параметры командной строки
	kubeconfig := flag.String("kubeconfig", "", "Путь до kubeconfig файла")
	flag.Parse()

	var config *rest.Config
	var err error
	if *kubeconfig != "" {
		// Локальная разработка: используем kubeconfig
		config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
		if err != nil {
			log.Fatalf("Ошибка создания конфигурации из kubeconfig: %v", err)
		}
	} else {
		// На продакшене: предполагаем работу внутри кластера
		config, err = rest.InClusterConfig()
		if err != nil {
			log.Fatalf("Ошибка создания in-cluster конфигурации: %v", err)
		}
	}

	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		log.Fatalf("Ошибка создания клиента Kubernetes: %v", err)
	}

	scheduler := NewScheduler(clientset)
	stopCh := make(chan struct{})
	go scheduler.Run(stopCh)

	// Блокируем выполнение, чтобы приложение не завершалось
	select {}
}

Пример: магазин корма для собачек с HPC-нагрузками

Представим, что есть приложение для магазина корма для собачек — DogFoodShop. Манифест для pod'а DogFoodShop:

apiVersion: v1
kind: Pod
metadata:
  name: dogfoodshop-hpc
  labels:
    app: dogfoodshop
    hpc: "true"  # Обязательно для нашего кастомного планировщика!
  annotations:
    dogshop.minCores: "16"  # Минимальное требование к количеству ядер
spec:
  containers:
    - name: dogfoodshop
      image: dogfoodshop/hpc:latest
      resources:
        limits:
          cpu: "16"
          memory: "32Gi"
        requests:
          cpu: "16"
          memory: "32Gi"

Для корректной работы scheduler ожидает, что узлы будут иметь аннотации с NUMA‑топологией, например:

apiVersion: v1
kind: Node
metadata:
  name: node-1
  annotations:
    hpc.numa/topology: '[{"socketId":0,"cores":8,"memory":32768},{"socketId":1,"cores":8,"memory":32768}]'

Работали ли вы с чем‑то подобным? Делитесь в комментариях.

Уже сегодня, 20 февраля в 20:00, пройдёт открытый урок, посвящённый CI/CD. Всего за 100 секунд вы увидите, как можно перейти от пустого проекта к работающей CI/CD-платформе. После этого мы разберём каждый этап: создание пайплайнов, настройку тестирования, автоматический деплой, обработку ошибок и масштабирование.

Записаться можно на странице курса «Инфраструктурная платформа на основе Kubernetes».

Комментарии (0)