Привет, Хабр!
Сегодня я расскажу о разработке кастомного планировщика pod'ов для HPC‑нагрузок в Kubernetes с учётом NUMA и специфичных требований. Рассмотрим код с примером для магазина корма для собачек и всеми нюансами реализации.
Но для начала... зачем вообще нужен кастомный планировщик для HPC?
Kubernetes в своём стандартном виде отлично справляется с большинством задач. Но когда речь идёт о высокопроизводительных вычислениях (HPC), стандартный scheduler может не выдержать требования:
Тщательное распределение ресурсов. HPC‑приложения чувствительны к задержкам, фрагментации памяти и неправильному распределению вычислительных ядер.
Учёт NUMA‑топологии. В архитектуре NUMA скорость доступа к памяти зависит от её физического расположения относительно процессора — и тут ошибка может стоить дорого.
Специфичные требования к CPU, памяти и даже специализированному оборудованию (GPU, RDMA и т. п.).
Поэтому напишем собственный планировщик, который:
Отбирает pod'ы с меткой hpc=true.
Анализирует аннотации узлов с описанием NUMA‑топологии.
Оценивает узлы по доступным ресурсам и специфическим требованиям pod'ов.
Надёжно связывает pod с выбранным узлом через Kubernetes API.
Основные функции нашего кастомного HPC-планировщика
Фильтрация pod'ов по меткам HPC
Не все pod'ы требуют особой обработки. Будем отбирать только те, что действительно настроены для HPC, используя метку hpc=true. Таким образом, стандартный scheduler продолжит работать для остальных задач, а наш планировщик займётся только критичными нагрузками.
Анализ NUMA‑топологии узлов
Каждый узел в кластере снабжён аннотацией, описывающей его NUMA‑топологию, например:
[
{"socketId": 0, "cores": 8, "memory": 32768},
{"socketId": 1, "cores": 8, "memory": 32768}
]
Это позволяет учитывать не только общее количество ядер и памяти, но и распределение этих ресурсов по сокетам, что критично для HPC‑задач.
Оценка узлов
После фильтрации узлов планировщик оценивает их по нескольким параметрам:
Доступность требуемых ядер и памяти.
Оптимальное распределение по NUMA‑сокетам.
Наличие дополнительных ресурсов (например, GPU).
Специфичные требования pod'а, передаваемые через аннотации (например, минимальное число ядер для приложения DogFoodShop).
Безопасное связывание pod»ов с узлами
Наконец, когда выбран наилучший узел, планировщик связывает pod с ним с помощью Kubernetes API. Обработка ошибок на каждом этапе гарантирует, что никакие сбои не останутся незамеченными.
Архитектура и реализация
Будем писать на моем любимом Go, ведь это язык, на котором написан сам Kubernetes. Структура проекта довольно проста:
Фильтрация pod'ов: отслеживание событий создания pod'ов с помощью информеров.
Оценка узлов: считывание аннотаций узлов, разбор NUMA‑топологии и вычисление баллов.
Binding: связывание pod'а с выбранным узлом через API Kubernetes.
Объявление пакета и импорт необходимых библиотек
Объявляем пакет main и подключаем все нужные пакеты. Стандартные пакеты (например, context, encoding/json, flag, log, sort и time) понадобятся для работы с контекстами, JSON, аргументами командной строки, логированием, сортировкой и таймерами. Библиотеки из k8s.io — это мост к Kubernetes API, информерам и настройке клиента.
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"sort"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
Определение структур: NumaInfo и Scheduler
В этой части кода мы объявляем структуру NumaInfo, которая описывает характеристики одного NUMA‑сокета (идентификатор сокета, количество ядер и объём памяти). Также создаём структуру Scheduler — кастомный планировщик, который будет хранить клиента Kubernetes и информер для отслеживания событий pod'ов.
apiVersion: v1
kind: Pod
metadata:
name: dogfoodshop-hpc
labels:
app: dogfoodshop
hpc: "true" # Обязательно для нашего кастомного планировщика!
annotations:
dogshop.minCores: "16" # Минимальное требование к количеству ядер
spec:
containers:
- name: dogfoodshop
image: dogfoodshop/hpc:latest
resources:
limits:
cpu: "16"
memory: "32Gi"
requests:
cpu: "16"
memory: "32Gi"
Конструктор планировщика
Здесь создаём функцию‑конструктор, которая принимает клиент Kubernetes и создаёт фабрику информеров для отслеживания pod'ов. Именно информер будет следить за событиями создания pod'ов и передавать их нашему планировщику.
apiVersion: v1
kind: Node
metadata:
name: node-1
annotations:
hpc.numa/topology: '[{"socketId":0,"cores":8,"memory":32768},{"socketId":1,"cores":8,"memory":32768}]'
Запуск планировщика и обработка событий
Функция Run запускает информер и регистрирует обработчик событий. Обработчик onAddPod
вызывается при создании нового pod'а. Здесь фильтруем pod'ы по метке hpc=true
, чтобы работать только с нужными HPC‑задачами.
// Run запускает планировщик
func (s *Scheduler) Run(stopCh <-chan struct{}) {
defer func() {
if r := recover(); r != nil {
log.Printf("Recovered from panic: %v", r)
}
}()
log.Println("Запускаем кастомный HPC-планировщик...")
s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.onAddPod,
})
s.informer.Run(stopCh)
}
// onAddPod вызывается при создании нового pod’а
func (s *Scheduler) onAddPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
return
}
// Фильтруем pod’ы, которые не предназначены для HPC
if val, ok := pod.Labels["hpc"]; !ok || val != "true" {
return
}
log.Printf("Найден HPC-pod: %s/%s", pod.Namespace, pod.Name)
// Пробуем назначить pod на подходящий узел
err := s.schedulePod(context.Background(), pod)
if err != nil {
log.Printf("Ошибка при планировании pod’а %s/%s: %v", pod.Namespace, pod.Name, err)
}
}
Выбор узла для pod’а и связывание
Здесь переходим к «умному» выбору узла: функция schedulePod
получает список узлов, фильтрует их по готовности и NUMA‑аннотациям, оценивая их баллами, чтобы выбрать лучший кандидат; при этом filterAndScoreNodes
для каждого узла проверяет его готовность, считывает NUMA‑информацию и суммирует количество ядер, учитывая специальные требования (например, для DogFoodShop
требуется минимум 16 ядер), а функция isNodeReady
просто определяет, находится ли узел в рабочем состоянии; далее getNodeNumaInfo
декодирует NUMA‑топологию, сохранённую в аннотациях узла, и, наконец, bindPod
создаёт объект binding и связывает pod с выбранным узлом через API.
// schedulePod – логика выбора узла и связывания pod’а с ним
func (s *Scheduler) schedulePod(ctx context.Context, pod *v1.Pod) error {
// Получаем список всех узлов
nodes, err := s.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("не удалось получить список узлов: %w", err)
}
// Фильтруем узлы с учётом NUMA и ресурсных требований
candidateNodes := s.filterAndScoreNodes(nodes.Items, pod)
if len(candidateNodes) == 0 {
return fmt.Errorf("нет подходящих узлов для pod %s/%s", pod.Namespace, pod.Name)
}
// Выбираем лучший узел (с максимальным баллом)
bestNode := candidateNodes[0].nodeName
log.Printf("Выбран узел '%s' для pod %s/%s (балл: %d)", bestNode, pod.Namespace, pod.Name, candidateNodes[0].score)
// Выполняем binding pod’а к выбранному узлу
return s.bindPod(ctx, pod, bestNode)
}
// nodeScore – структура для хранения оценки узла
type nodeScore struct {
nodeName string
score int
}
// filterAndScoreNodes фильтрует узлы и присваивает им баллы
func (s *Scheduler) filterAndScoreNodes(nodes []v1.Node, pod *v1.Pod) []nodeScore {
var scores []nodeScore
for _, node := range nodes {
// Пропускаем недоступные узлы
if !isNodeReady(&node) {
continue
}
// Получаем NUMA-информацию из аннотаций узла
numaInfo, err := getNodeNumaInfo(node)
if err != nil {
log.Printf("Ошибка получения NUMA-информации с узла %s: %v", node.Name, err)
continue
}
// Оцениваем узел: суммируем количество ядер всех сокетов
totalCores := 0
for _, socket := range numaInfo {
totalCores += socket.Cores
}
// Здесь можно учитывать дополнительные требования pod’а (например, память, GPU, RDMA)
score := totalCores
// Пример: если pod предназначен для DogFoodShop и требует минимум 16 ядер, узлы с меньшим количеством получат штраф
if val, ok := pod.Annotations["dogshop.minCores"]; ok && val == "16" && totalCores < 16 {
score = 0
}
if score > 0 {
scores = append(scores, nodeScore{
nodeName: node.Name,
score: score,
})
}
}
// Сортируем узлы по убыванию баллов
sort.Slice(scores, func(i, j int) bool {
return scores[i].score > scores[j].score
})
return scores
}
// isNodeReady проверяет, находится ли узел в состоянии Ready
func isNodeReady(node *v1.Node) bool {
for _, condition := range node.Status.Conditions {
if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue {
return true
}
}
return false
}
// getNodeNumaInfo декодирует NUMA-информацию из аннотаций узла
func getNodeNumaInfo(node v1.Node) ([]NumaInfo, error) {
annotation, ok := node.Annotations["hpc.numa/topology"]
if !ok {
return nil, fmt.Errorf("аннотация hpc.numa/topology не найдена")
}
var numaInfo []NumaInfo
err := json.Unmarshal([]byte(annotation), &numaInfo)
if err != nil {
return nil, fmt.Errorf("ошибка декодирования NUMA-информации: %w", err)
}
return numaInfo, nil
}
// bindPod связывает pod с выбранным узлом
func (s *Scheduler) bindPod(ctx context.Context, pod *v1.Pod, nodeName string) error {
// Создаем объект Binding
binding := &v1.Binding{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Name: pod.Name,
},
Target: v1.ObjectReference{
Kind: "Node",
Name: nodeName,
},
}
// Выполняем вызов API для binding-а
err := s.clientset.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("не удалось выполнить binding: %w", err)
}
log.Printf("Pod %s/%s успешно связан с узлом %s", pod.Namespace, pod.Name, nodeName)
return nil
}
Функция main
В функции main
настраиваем подключение к Kubernetes. Если указан kubeconfig, используем его для локальной разработки, иначе — предполагаем, что код работает внутри кластера (in‑cluster config). Затем создаём клиента, инициализируем планировщик и запускаем его в отдельном горутине. В конце блокируем выполнение, чтобы приложение не завершилось.
func main() {
// Читаем параметры командной строки
kubeconfig := flag.String("kubeconfig", "", "Путь до kubeconfig файла")
flag.Parse()
var config *rest.Config
var err error
if *kubeconfig != "" {
// Локальная разработка: используем kubeconfig
config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
log.Fatalf("Ошибка создания конфигурации из kubeconfig: %v", err)
}
} else {
// На продакшене: предполагаем работу внутри кластера
config, err = rest.InClusterConfig()
if err != nil {
log.Fatalf("Ошибка создания in-cluster конфигурации: %v", err)
}
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Ошибка создания клиента Kubernetes: %v", err)
}
scheduler := NewScheduler(clientset)
stopCh := make(chan struct{})
go scheduler.Run(stopCh)
// Блокируем выполнение, чтобы приложение не завершалось
select {}
}
Полный код в спойлере
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"sort"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
// NumaInfo описывает характеристики одного NUMA-сокета
type NumaInfo struct {
SocketID int `json:"socketId"`
Cores int `json:"cores"`
Memory int `json:"memory"` // в мегабайтах
}
// Scheduler – структура нашего кастомного планировщика
type Scheduler struct {
clientset kubernetes.Interface
informer cache.SharedIndexInformer
}
// NewScheduler создаёт новый экземпляр Scheduler
func NewScheduler(clientset kubernetes.Interface) *Scheduler {
// Создаём фабрику информеров для отслеживания pod’ов
factory := informers.NewSharedInformerFactory(clientset, time.Minute)
podInformer := factory.Core().V1().Pods().Informer()
return &Scheduler{
clientset: clientset,
informer: podInformer,
}
}
// Run запускает планировщик
func (s *Scheduler) Run(stopCh <-chan struct{}) {
defer func() {
if r := recover(); r != nil {
log.Printf("Recovered from panic: %v", r)
}
}()
log.Println("Запускаем кастомный HPC-планировщик...")
s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.onAddPod,
})
s.informer.Run(stopCh)
}
// onAddPod вызывается при создании нового pod’а
func (s *Scheduler) onAddPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
return
}
// Фильтруем pod’ы, которые не предназначены для HPC
if val, ok := pod.Labels["hpc"]; !ok || val != "true" {
return
}
log.Printf("Найден HPC-pod: %s/%s", pod.Namespace, pod.Name)
// Пробуем назначить pod на подходящий узел
err := s.schedulePod(context.Background(), pod)
if err != nil {
log.Printf("Ошибка при планировании pod’а %s/%s: %v", pod.Namespace, pod.Name, err)
}
}
// schedulePod – логика выбора узла и связывания pod’а с ним
func (s *Scheduler) schedulePod(ctx context.Context, pod *v1.Pod) error {
// Получаем список всех узлов
nodes, err := s.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("не удалось получить список узлов: %w", err)
}
// Фильтруем узлы с учётом NUMA и ресурсных требований
candidateNodes := s.filterAndScoreNodes(nodes.Items, pod)
if len(candidateNodes) == 0 {
return fmt.Errorf("нет подходящих узлов для pod %s/%s", pod.Namespace, pod.Name)
}
// Выбираем лучший узел (с максимальным баллом)
bestNode := candidateNodes[0].nodeName
log.Printf("Выбран узел '%s' для pod %s/%s (балл: %d)", bestNode, pod.Namespace, pod.Name, candidateNodes[0].score)
// Выполняем binding pod’а к выбранному узлу
return s.bindPod(ctx, pod, bestNode)
}
// nodeScore – структура для хранения оценки узла
type nodeScore struct {
nodeName string
score int
}
// filterAndScoreNodes фильтрует узлы и присваивает им баллы
func (s *Scheduler) filterAndScoreNodes(nodes []v1.Node, pod *v1.Pod) []nodeScore {
var scores []nodeScore
for _, node := range nodes {
// Пропускаем недоступные узлы
if !isNodeReady(&node) {
continue
}
// Получаем NUMA-информацию из аннотаций узла
numaInfo, err := getNodeNumaInfo(node)
if err != nil {
log.Printf("Ошибка получения NUMA-информации с узла %s: %v", node.Name, err)
continue
}
// Оцениваем узел: суммируем количество ядер всех сокетов
totalCores := 0
for _, socket := range numaInfo {
totalCores += socket.Cores
}
// Здесь можно учитывать дополнительные требования pod’а (например, память, GPU, RDMA)
score := totalCores
// Пример: если pod предназначен для DogFoodShop и требует минимум 16 ядер, узлы с меньшим количеством получат штраф
if val, ok := pod.Annotations["dogshop.minCores"]; ok && val == "16" && totalCores < 16 {
score = 0
}
if score > 0 {
scores = append(scores, nodeScore{
nodeName: node.Name,
score: score,
})
}
}
// Сортируем узлы по убыванию баллов
sort.Slice(scores, func(i, j int) bool {
return scores[i].score > scores[j].score
})
return scores
}
// isNodeReady проверяет, находится ли узел в состоянии Ready
func isNodeReady(node *v1.Node) bool {
for _, condition := range node.Status.Conditions {
if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue {
return true
}
}
return false
}
// getNodeNumaInfo декодирует NUMA-информацию из аннотаций узла
func getNodeNumaInfo(node v1.Node) ([]NumaInfo, error) {
annotation, ok := node.Annotations["hpc.numa/topology"]
if !ok {
return nil, fmt.Errorf("аннотация hpc.numa/topology не найдена")
}
var numaInfo []NumaInfo
err := json.Unmarshal([]byte(annotation), &numaInfo)
if err != nil {
return nil, fmt.Errorf("ошибка декодирования NUMA-информации: %w", err)
}
return numaInfo, nil
}
// bindPod связывает pod с выбранным узлом
func (s *Scheduler) bindPod(ctx context.Context, pod *v1.Pod, nodeName string) error {
// Создаем объект Binding
binding := &v1.Binding{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Name: pod.Name,
},
Target: v1.ObjectReference{
Kind: "Node",
Name: nodeName,
},
}
// Выполняем вызов API для binding-а
err := s.clientset.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("не удалось выполнить binding: %w", err)
}
log.Printf("Pod %s/%s успешно связан с узлом %s", pod.Namespace, pod.Name, nodeName)
return nil
}
func main() {
// Читаем параметры командной строки
kubeconfig := flag.String("kubeconfig", "", "Путь до kubeconfig файла")
flag.Parse()
var config *rest.Config
var err error
if *kubeconfig != "" {
// Локальная разработка: используем kubeconfig
config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
log.Fatalf("Ошибка создания конфигурации из kubeconfig: %v", err)
}
} else {
// На продакшене: предполагаем работу внутри кластера
config, err = rest.InClusterConfig()
if err != nil {
log.Fatalf("Ошибка создания in-cluster конфигурации: %v", err)
}
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Ошибка создания клиента Kubernetes: %v", err)
}
scheduler := NewScheduler(clientset)
stopCh := make(chan struct{})
go scheduler.Run(stopCh)
// Блокируем выполнение, чтобы приложение не завершалось
select {}
}
Пример: магазин корма для собачек с HPC-нагрузками
Представим, что есть приложение для магазина корма для собачек — DogFoodShop. Манифест для pod'а DogFoodShop:
apiVersion: v1
kind: Pod
metadata:
name: dogfoodshop-hpc
labels:
app: dogfoodshop
hpc: "true" # Обязательно для нашего кастомного планировщика!
annotations:
dogshop.minCores: "16" # Минимальное требование к количеству ядер
spec:
containers:
- name: dogfoodshop
image: dogfoodshop/hpc:latest
resources:
limits:
cpu: "16"
memory: "32Gi"
requests:
cpu: "16"
memory: "32Gi"
Для корректной работы scheduler ожидает, что узлы будут иметь аннотации с NUMA‑топологией, например:
apiVersion: v1
kind: Node
metadata:
name: node-1
annotations:
hpc.numa/topology: '[{"socketId":0,"cores":8,"memory":32768},{"socketId":1,"cores":8,"memory":32768}]'
Работали ли вы с чем‑то подобным? Делитесь в комментариях.
Уже сегодня, 20 февраля в 20:00, пройдёт открытый урок, посвящённый CI/CD. Всего за 100 секунд вы увидите, как можно перейти от пустого проекта к работающей CI/CD-платформе. После этого мы разберём каждый этап: создание пайплайнов, настройку тестирования, автоматический деплой, обработку ошибок и масштабирование.
Записаться можно на странице курса «Инфраструктурная платформа на основе Kubernetes».